Repository: cassandra Updated Branches: refs/heads/trunk 9484783a8 -> 824cb768d
Add support to rebuild from targeted replicas Patch by Geoffrey Yu; Reviewed by Paulo Motta for CASSANDRA-9875 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/824cb768 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/824cb768 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/824cb768 Branch: refs/heads/trunk Commit: 824cb768d2aa45e1889653ab2c98cc0bc63e594e Parents: 9484783 Author: Geoffrey Yu <[email protected]> Authored: Tue Aug 9 21:36:30 2016 -0700 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Aug 24 17:25:47 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/dht/RangeStreamer.java | 18 ++++++++ .../cassandra/service/StorageService.java | 47 +++++++++++++++++++- .../cassandra/service/StorageServiceMBean.java | 2 +- .../org/apache/cassandra/tools/NodeProbe.java | 4 +- .../cassandra/tools/nodetool/Rebuild.java | 7 ++- 6 files changed, 73 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 12bedfa..af7d0dd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Add support to rebuild from targeted replica (CASSANDRA-9875) * Add sequence distribution type to cassandra stress (CASSANDRA-12490) * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474) http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index ee2d792..282ff04 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -122,6 +122,24 @@ public class RangeStreamer } } + /** + * Source filter which only includes endpoints contained within a provided set. + */ + public static class WhitelistedSourcesFilter implements ISourceFilter + { + private final Set<InetAddress> whitelistedSources; + + public WhitelistedSourcesFilter(Set<InetAddress> whitelistedSources) + { + this.whitelistedSources = whitelistedSources; + } + + public boolean shouldInclude(InetAddress endpoint) + { + return whitelistedSources.contains(endpoint); + } + } + public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress address, http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index bc67ac9..7eb21c0 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1114,10 +1114,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void rebuild(String sourceDc) { - rebuild(sourceDc, null, null); + rebuild(sourceDc, null, null, null); } - public void rebuild(String sourceDc, String keyspace, String tokens) + public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources) { // check on going rebuild if (!isRebuilding.compareAndSet(false, true)) @@ -1176,6 +1176,49 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (tokenScanner.hasNext()) throw new IllegalArgumentException("Unexpected string: " + tokenScanner.next()); } + + // Ensure all specified ranges are actually ranges owned by this host + Collection<Range<Token>> localRanges = getLocalRanges(keyspace); + for (Range<Token> specifiedRange : ranges) + { + boolean foundParentRange = false; + for (Range<Token> localRange : localRanges) + { + if (localRange.contains(specifiedRange)) + { + foundParentRange = true; + break; + } + } + if (!foundParentRange) + { + throw new IllegalArgumentException(String.format("The specified range %s is not a range that is owned by this node. Please ensure that all token ranges specified to be rebuilt belong to this node.", specifiedRange.toString())); + } + } + + if (specificSources != null) + { + String[] stringHosts = specificSources.split(","); + Set<InetAddress> sources = new HashSet<>(stringHosts.length); + for (String stringHost : stringHosts) + { + try + { + InetAddress endpoint = InetAddress.getByName(stringHost); + if (FBUtilities.getBroadcastAddress().equals(endpoint)) + { + throw new IllegalArgumentException("This host was specified as a source for rebuilding. Sources for a rebuild can only be other nodes in the cluster."); + } + sources.add(endpoint); + } + catch (UnknownHostException ex) + { + throw new IllegalArgumentException("Unknown host specified " + stringHost, ex); + } + } + streamer.addSourceFilter(new RangeStreamer.WhitelistedSourcesFilter(sources)); + } + streamer.addRanges(keyspace, ranges); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 0f93177..d6f6bd6 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -580,7 +580,7 @@ public interface StorageServiceMBean extends NotificationEmitter * @param tokens Range of tokens to rebuild or null to rebuild all token ranges. In the format of: * "(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]" */ - public void rebuild(String sourceDc, String keyspace, String tokens); + public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources); /** Starts a bulk load and blocks until it completes. */ public void bulkLoad(String directory); http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 89e7bda..5d6dff0 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1132,9 +1132,9 @@ public class NodeProbe implements AutoCloseable return ssProxy.describeRingJMX(keyspaceName); } - public void rebuild(String sourceDc, String keyspace, String tokens) + public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources) { - ssProxy.rebuild(sourceDc, keyspace, tokens); + ssProxy.rebuild(sourceDc, keyspace, tokens, specificSources); } public List<String> sampleKeyRange() http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java b/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java index 865f9fe..b27e674 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java @@ -41,6 +41,11 @@ public class Rebuild extends NodeToolCmd description = "Use -ts to rebuild specific token ranges, in the format of \"(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]\".") private String tokens = null; + @Option(title = "specific_sources", + name = {"-s", "--sources"}, + description = "Use -s to specify hosts that this node should stream from when -ts is used. Multiple hosts should be separated using commas (e.g. 127.0.0.1,127.0.0.2,...)") + private String specificSources = null; + @Override public void execute(NodeProbe probe) { @@ -50,6 +55,6 @@ public class Rebuild extends NodeToolCmd throw new IllegalArgumentException("Cannot specify tokens without keyspace."); } - probe.rebuild(sourceDataCenterName, keyspace, tokens); + probe.rebuild(sourceDataCenterName, keyspace, tokens, specificSources); } } \ No newline at end of file
