Expose repairing by a user provided range patch by scode and slebresne; reviewed by stuhood for CASSANDRA-3912
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b69fd1af Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b69fd1af Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b69fd1af Branch: refs/heads/trunk Commit: b69fd1aff7e34363298aece693c4be5a3a603c71 Parents: 3f09b79 Author: Sylvain Lebresne <[email protected]> Authored: Tue Apr 3 11:57:29 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Tue Apr 17 09:26:17 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cassandra/service/AntiEntropyService.java | 22 +++++++++-- .../apache/cassandra/service/StorageService.java | 29 +++++++++++++++ .../cassandra/service/StorageServiceMBean.java | 13 +++++++ src/java/org/apache/cassandra/tools/NodeProbe.java | 5 +++ 5 files changed, 67 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7a985ff..aa6153c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,8 @@ * identify and blacklist corrupted SSTables from future compactions (CASSANDRA-2261) * Move CfDef and KsDef validation out of thrift (CASSANDRA-4037) + * Expose repairing by a user provided range (CASSANDRA-3912) + 1.1-dev * Allow KS and CF names up to 48 characters (CASSANDRA-4157) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/service/AntiEntropyService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java index a39ed75..0c11947 100644 --- a/src/java/org/apache/cassandra/service/AntiEntropyService.java +++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java @@ -145,15 +145,29 @@ public class AntiEntropyService } /** - * Return all of the neighbors with whom we share data. + * Return all of the neighbors with whom we share the provided range. */ - static Set<InetAddress> getNeighbors(String table, Range<Token> range) + static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair) { StorageService ss = StorageService.instance; Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table); - if (!replicaSets.containsKey(range)) + Range<Token> rangeSuperSet = null; + for (Range<Token> range : ss.getLocalRanges(table)) + { + if (range.contains(toRepair)) + { + rangeSuperSet = range; + break; + } + else if (range.intersects(toRepair)) + { + throw new IllegalArgumentException("Requested range intersects a local range but is not fully contained in one; this would lead to imprecise repair"); + } + } + if (rangeSuperSet == null || !replicaSets.containsKey(toRepair)) return Collections.emptySet(); - Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(range)); + + Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet)); neighbors.remove(FBUtilities.getBroadcastAddress()); // Excluding all node with version <= 0.7 since they don't know how to // create a correct merkle tree (they build it over the full range) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5e12364..3ae8b0a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -210,6 +210,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe return getPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddress()); } + // For JMX's sake. Use getLocalPrimaryRange for internal uses + public List<String> getPrimaryRange() + { + return getLocalPrimaryRange().asList(); + } + private final Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>()); private CassandraDaemon daemon; @@ -1951,6 +1957,29 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe } } + public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException + { + if (Table.SYSTEM_TABLE.equals(tableName)) + return; + + Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken); + Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken); + + logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}", + new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies}); + AntiEntropyService.RepairFuture future = forceTableRepair(new Range<Token>(parsedBeginToken, parsedEndToken), tableName, isSequential, columnFamilies); + if (future == null) + return; + try + { + future.get(); + } + catch (Exception e) + { + logger_.error("Repair session " + future.session.getName() + " failed.", e); + } + } + public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException { ArrayList<String> names = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index be727ad..9da3896 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -136,6 +136,11 @@ public interface StorageServiceMBean public List <String> describeRingJMX(String keyspace) throws InvalidRequestException; /** + * Returns the local node's primary range. + */ + public List<String> getPrimaryRange(); + + /** * Retrieve a map of pending ranges to endpoints that describe the ring topology * @param keyspace the keyspace to get the pending range map for. * @return a map of pending ranges to endpoints @@ -241,6 +246,14 @@ public interface StorageServiceMBean */ public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException; + /** + * Perform repair of a specific range. + * + * This allows incremental repair to be performed by having an external controller submitting repair jobs. + * Note that the provided range much be a subset of one of the node local range. + */ + public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException; + public void forceTerminateAllRepairSessions(); /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 9609b65..f042353 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -208,6 +208,11 @@ public class NodeProbe ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies); } + public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException + { + ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, columnFamilies); + } + public void invalidateKeyCache() throws IOException { cacheService.invalidateKeyCache();
