Repository: cassandra Updated Branches: refs/heads/trunk a0e8de99d -> 95d927d38
Add support to rebuild from specific range patch by Dikang Gu; reviewed by yukim for CASSANDRA-10409 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95d927d3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95d927d3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95d927d3 Branch: refs/heads/trunk Commit: 95d927d38fbb13bfdcfc8e5a7475eb3e44082aaa Parents: a0e8de9 Author: Dikang Gu <[email protected]> Authored: Thu Apr 14 15:29:30 2016 -0500 Committer: Yuki Morishita <[email protected]> Committed: Wed Apr 20 14:09:51 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/StorageService.java | 65 ++++++++++++++------ .../cassandra/service/StorageServiceMBean.java | 10 +++ .../org/apache/cassandra/tools/NodeProbe.java | 4 +- .../cassandra/tools/nodetool/Rebuild.java | 22 ++++++- 5 files changed, 79 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/95d927d3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ca679b2..6e3efb6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.6 + * Add support to rebuild from specific range (CASSANDRA-10409) * Optimize the overlapping lookup by calculating all the bounds in advance (CASSANDRA-11571) * Support json/yaml output in noetool tablestats (CASSANDRA-5977) http://git-wip-us.apache.org/repos/asf/cassandra/blob/95d927d3/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 8390482..6051567 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -26,23 +26,8 @@ import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -52,6 +37,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.MatchResult; +import java.util.regex.Pattern; import javax.annotation.Nullable; import javax.management.JMX; import javax.management.MBeanServer; @@ -1142,13 +1129,26 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void rebuild(String sourceDc) { + rebuild(sourceDc, null, null); + } + + public void rebuild(String sourceDc, String keyspace, String tokens) + { // check on going rebuild if (!isRebuilding.compareAndSet(false, true)) { throw new IllegalStateException("Node is still rebuilding. Check nodetool netstats."); } - logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc); + // check the arguments + if (keyspace == null && tokens != null) + { + throw new IllegalArgumentException("Cannot specify tokens without keyspace."); + } + + logger.info("rebuild from dc: {}, {}, {}", sourceDc == null ? "(any dc)" : sourceDc, + keyspace == null ? "(All keyspaces)" : keyspace, + tokens == null ? "(All tokens)" : tokens); try { @@ -1164,8 +1164,35 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (sourceDc != null) streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc)); - for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) - streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName)); + if (keyspace == null) + { + for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) + streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName)); + } + else if (tokens == null) + { + streamer.addRanges(keyspace, getLocalRanges(keyspace)); + } + else + { + Token.TokenFactory factory = getTokenFactory(); + List<Range<Token>> ranges = new ArrayList<>(); + Pattern rangePattern = Pattern.compile("\\(\\s*(-?\\w+)\\s*,\\s*(-?\\w+)\\s*\\]"); + try (Scanner tokenScanner = new Scanner(tokens)) + { + while (tokenScanner.findInLine(rangePattern) != null) + { + MatchResult range = tokenScanner.match(); + Token startToken = factory.fromString(range.group(1)); + Token endToken = factory.fromString(range.group(2)); + logger.info(String.format("adding range: (%s,%s]", startToken, endToken)); + ranges.add(new Range<>(startToken, endToken)); + } + if (tokenScanner.hasNext()) + throw new IllegalArgumentException("Unexpected string: " + tokenScanner.next()); + } + streamer.addRanges(keyspace, ranges); + } StreamResultFuture resultFuture = streamer.fetchAsync(); // wait for result http://git-wip-us.apache.org/repos/asf/cassandra/blob/95d927d3/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 6598075..f48e469 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -541,6 +541,16 @@ public interface StorageServiceMBean extends NotificationEmitter */ public void rebuild(String sourceDc); + /** + * Same as {@link #rebuild(String)}, but only for specified keyspace and ranges. + * + * @param sourceDc Name of DC from which to select sources for streaming or null to pick any node + * @param keyspace Name of the keyspace which to rebuild or null to rebuild all keyspaces. + * @param tokens Range of tokens to rebuild or null to rebuild all token ranges. In the format of: + * "(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]" + */ + public void rebuild(String sourceDc, String keyspace, String tokens); + /** Starts a bulk load and blocks until it completes. */ public void bulkLoad(String directory); http://git-wip-us.apache.org/repos/asf/cassandra/blob/95d927d3/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 4dc2770..dc360d4 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1118,9 +1118,9 @@ public class NodeProbe implements AutoCloseable return ssProxy.describeRingJMX(keyspaceName); } - public void rebuild(String sourceDc) + public void rebuild(String sourceDc, String keyspace, String tokens) { - ssProxy.rebuild(sourceDc); + ssProxy.rebuild(sourceDc, keyspace, tokens); } public List<String> sampleKeyRange() http://git-wip-us.apache.org/repos/asf/cassandra/blob/95d927d3/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java b/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java index 8a6dbf1..865f9fe 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java @@ -19,6 +19,7 @@ package org.apache.cassandra.tools.nodetool; import io.airlift.command.Arguments; import io.airlift.command.Command; +import io.airlift.command.Option; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; @@ -26,12 +27,29 @@ import org.apache.cassandra.tools.NodeTool.NodeToolCmd; @Command(name = "rebuild", description = "Rebuild data by streaming from other nodes (similarly to bootstrap)") public class Rebuild extends NodeToolCmd { - @Arguments(usage = "<src-dc-name>", description = "Name of DC from which to select sources for streaming. By default, pick any DC") + @Arguments(usage = "<src-dc-name>", + description = "Name of DC from which to select sources for streaming. By default, pick any DC") private String sourceDataCenterName = null; + @Option(title = "specific_keyspace", + name = {"-ks", "--keyspace"}, + description = "Use -ks to rebuild specific keyspace.") + private String keyspace = null; + + @Option(title = "specific_tokens", + name = {"-ts", "--tokens"}, + description = "Use -ts to rebuild specific token ranges, in the format of \"(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]\".") + private String tokens = null; + @Override public void execute(NodeProbe probe) { - probe.rebuild(sourceDataCenterName); + // check the arguments + if (keyspace == null && tokens != null) + { + throw new IllegalArgumentException("Cannot specify tokens without keyspace."); + } + + probe.rebuild(sourceDataCenterName, keyspace, tokens); } } \ No newline at end of file
