Repository: cassandra
Updated Branches:
  refs/heads/trunk a0e8de99d -> 95d927d38


Add support to rebuild from specific range

patch by Dikang Gu; reviewed by yukim for CASSANDRA-10409


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95d927d3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95d927d3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95d927d3

Branch: refs/heads/trunk
Commit: 95d927d38fbb13bfdcfc8e5a7475eb3e44082aaa
Parents: a0e8de9
Author: Dikang Gu <[email protected]>
Authored: Thu Apr 14 15:29:30 2016 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Wed Apr 20 14:09:51 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/service/StorageService.java       | 65 ++++++++++++++------
 .../cassandra/service/StorageServiceMBean.java  | 10 +++
 .../org/apache/cassandra/tools/NodeProbe.java   |  4 +-
 .../cassandra/tools/nodetool/Rebuild.java       | 22 ++++++-
 5 files changed, 79 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/95d927d3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ca679b2..6e3efb6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Add support to rebuild from specific range (CASSANDRA-10409)
  * Optimize the overlapping lookup by calculating all the
    bounds in advance (CASSANDRA-11571)
  * Support json/yaml output in noetool tablestats (CASSANDRA-5977)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95d927d3/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 8390482..6051567 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -26,23 +26,8 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -52,6 +37,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.MatchResult;
+import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import javax.management.JMX;
 import javax.management.MBeanServer;
@@ -1142,13 +1129,26 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public void rebuild(String sourceDc)
     {
+        rebuild(sourceDc, null, null);
+    }
+
+    public void rebuild(String sourceDc, String keyspace, String tokens)
+    {
         // check on going rebuild
         if (!isRebuilding.compareAndSet(false, true))
         {
             throw new IllegalStateException("Node is still rebuilding. Check 
nodetool netstats.");
         }
 
-        logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : 
sourceDc);
+        // check the arguments
+        if (keyspace == null && tokens != null)
+        {
+            throw new IllegalArgumentException("Cannot specify tokens without 
keyspace.");
+        }
+
+        logger.info("rebuild from dc: {}, {}, {}", sourceDc == null ? "(any 
dc)" : sourceDc,
+                    keyspace == null ? "(All keyspaces)" : keyspace,
+                    tokens == null ? "(All tokens)" : tokens);
 
         try
         {
@@ -1164,8 +1164,35 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             if (sourceDc != null)
                 streamer.addSourceFilter(new 
RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), 
sourceDc));
 
-            for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
-                streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
+            if (keyspace == null)
+            {
+                for (String keyspaceName : 
Schema.instance.getNonSystemKeyspaces())
+                    streamer.addRanges(keyspaceName, 
getLocalRanges(keyspaceName));
+            }
+            else if (tokens == null)
+            {
+                streamer.addRanges(keyspace, getLocalRanges(keyspace));
+            }
+            else
+            {
+                Token.TokenFactory factory = getTokenFactory();
+                List<Range<Token>> ranges = new ArrayList<>();
+                Pattern rangePattern = 
Pattern.compile("\\(\\s*(-?\\w+)\\s*,\\s*(-?\\w+)\\s*\\]");
+                try (Scanner tokenScanner = new Scanner(tokens))
+                {
+                    while (tokenScanner.findInLine(rangePattern) != null)
+                    {
+                        MatchResult range = tokenScanner.match();
+                        Token startToken = factory.fromString(range.group(1));
+                        Token endToken = factory.fromString(range.group(2));
+                        logger.info(String.format("adding range: (%s,%s]", 
startToken, endToken));
+                        ranges.add(new Range<>(startToken, endToken));
+                    }
+                    if (tokenScanner.hasNext())
+                        throw new IllegalArgumentException("Unexpected string: 
" + tokenScanner.next());
+                }
+                streamer.addRanges(keyspace, ranges);
+            }
 
             StreamResultFuture resultFuture = streamer.fetchAsync();
             // wait for result

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95d927d3/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 6598075..f48e469 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -541,6 +541,16 @@ public interface StorageServiceMBean extends 
NotificationEmitter
      */
     public void rebuild(String sourceDc);
 
+    /**
+     * Same as {@link #rebuild(String)}, but only for specified keyspace and 
ranges.
+     *
+     * @param sourceDc Name of DC from which to select sources for streaming 
or null to pick any node
+     * @param keyspace Name of the keyspace which to rebuild or null to 
rebuild all keyspaces.
+     * @param tokens Range of tokens to rebuild or null to rebuild all token 
ranges. In the format of:
+     *               
"(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]"
+     */
+    public void rebuild(String sourceDc, String keyspace, String tokens);
+
     /** Starts a bulk load and blocks until it completes. */
     public void bulkLoad(String directory);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95d927d3/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 4dc2770..dc360d4 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1118,9 +1118,9 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.describeRingJMX(keyspaceName);
     }
 
-    public void rebuild(String sourceDc)
+    public void rebuild(String sourceDc, String keyspace, String tokens)
     {
-        ssProxy.rebuild(sourceDc);
+        ssProxy.rebuild(sourceDc, keyspace, tokens);
     }
 
     public List<String> sampleKeyRange()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95d927d3/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java 
b/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java
index 8a6dbf1..865f9fe 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.tools.nodetool;
 
 import io.airlift.command.Arguments;
 import io.airlift.command.Command;
+import io.airlift.command.Option;
 
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
@@ -26,12 +27,29 @@ import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 @Command(name = "rebuild", description = "Rebuild data by streaming from other 
nodes (similarly to bootstrap)")
 public class Rebuild extends NodeToolCmd
 {
-    @Arguments(usage = "<src-dc-name>", description = "Name of DC from which 
to select sources for streaming. By default, pick any DC")
+    @Arguments(usage = "<src-dc-name>",
+               description = "Name of DC from which to select sources for 
streaming. By default, pick any DC")
     private String sourceDataCenterName = null;
 
+    @Option(title = "specific_keyspace",
+            name = {"-ks", "--keyspace"},
+            description = "Use -ks to rebuild specific keyspace.")
+    private String keyspace = null;
+
+    @Option(title = "specific_tokens",
+            name = {"-ts", "--tokens"},
+            description = "Use -ts to rebuild specific token ranges, in the 
format of 
\"(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]\".")
+    private String tokens = null;
+
     @Override
     public void execute(NodeProbe probe)
     {
-        probe.rebuild(sourceDataCenterName);
+        // check the arguments
+        if (keyspace == null && tokens != null)
+        {
+            throw new IllegalArgumentException("Cannot specify tokens without 
keyspace.");
+        }
+
+        probe.rebuild(sourceDataCenterName, keyspace, tokens);
     }
 }
\ No newline at end of file

Reply via email to