Repository: cassandra
Updated Branches:
  refs/heads/trunk b29736c27 -> fa4255f06


Make decommission operation resumable

patch by Kaide Mu; reviewed by Paulo Motta for CASSANDRA-12008


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fa4255f0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fa4255f0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fa4255f0

Branch: refs/heads/trunk
Commit: fa4255f06b0cc37051a69be20fd20e8e57798702
Parents: b29736c
Author: Kaide Mu <[email protected]>
Authored: Wed Jul 20 15:23:39 2016 +0200
Committer: Yuki Morishita <[email protected]>
Committed: Thu Aug 11 17:02:52 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  46 ++++++++
 .../apache/cassandra/dht/StreamStateStore.java  |   5 +
 .../cassandra/service/StorageService.java       | 114 +++++++++++--------
 .../apache/cassandra/streaming/StreamEvent.java |   9 ++
 .../cassandra/streaming/StreamSession.java      |   9 ++
 6 files changed, 139 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bba64c6..e215ad9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Make decommission operation resumable (CASSANDRA-12008)
  * Add support to one-way targeted repair (CASSANDRA-9876)
  * Remove clientutil jar (CASSANDRA-11635)
  * Fix compaction throughput throttle (CASSANDRA-12366)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 2bfa88d..2083d54 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -30,6 +30,7 @@ import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.SetMultimap;
 import com.google.common.io.ByteStreams;
@@ -96,6 +97,7 @@ public final class SystemKeyspace
     public static final String SSTABLE_ACTIVITY = "sstable_activity";
     public static final String SIZE_ESTIMATES = "size_estimates";
     public static final String AVAILABLE_RANGES = "available_ranges";
+    public static final String TRANSFERRED_RANGES = "transferred_ranges";
     public static final String VIEWS_BUILDS_IN_PROGRESS = 
"views_builds_in_progress";
     public static final String BUILT_VIEWS = "built_views";
     public static final String PREPARED_STATEMENTS = "prepared_statements";
@@ -248,6 +250,16 @@ public final class SystemKeyspace
                 + "ranges set<blob>,"
                 + "PRIMARY KEY ((keyspace_name)))");
 
+    private static final CFMetaData TransferredRanges =
+        compile(TRANSFERRED_RANGES,
+                "record of transferred ranges for streaming operation",
+                "CREATE TABLE %s ("
+                + "operation text,"
+                + "peer inet,"
+                + "keyspace_name text,"
+                + "ranges set<blob>,"
+                + "PRIMARY KEY ((operation, keyspace_name), peer))");
+
     private static final CFMetaData ViewsBuildsInProgress =
         compile(VIEWS_BUILDS_IN_PROGRESS,
                 "views builds current progress",
@@ -442,6 +454,7 @@ public final class SystemKeyspace
                          SSTableActivity,
                          SizeEstimates,
                          AvailableRanges,
+                         TransferredRanges,
                          ViewsBuildsInProgress,
                          BuiltViews,
                          LegacyHints,
@@ -1297,6 +1310,39 @@ public final class SystemKeyspace
         availableRanges.truncateBlocking();
     }
 
+    public static synchronized void updateTransferredRanges(String description,
+                                                         InetAddress peer,
+                                                         String keyspace,
+                                                         
Collection<Range<Token>> streamedRanges)
+    {
+        String cql = "UPDATE system.%s SET ranges = ranges + ? WHERE operation 
= ? AND peer = ? AND keyspace_name = ?";
+        Set<ByteBuffer> rangesToUpdate = new HashSet<>(streamedRanges.size());
+        for (Range<Token> range : streamedRanges)
+        {
+            rangesToUpdate.add(rangeToBytes(range));
+        }
+        executeInternal(String.format(cql, TRANSFERRED_RANGES), 
rangesToUpdate, description, peer, keyspace);
+    }
+
+    public static synchronized Map<InetAddress, Set<Range<Token>>> 
getTransferredRanges(String description, String keyspace, IPartitioner 
partitioner)
+    {
+        Map<InetAddress, Set<Range<Token>>> result = new HashMap<>();
+        String query = "SELECT * FROM system.%s WHERE operation = ? AND 
keyspace_name = ?";
+        UntypedResultSet rs = executeInternal(String.format(query, 
TRANSFERRED_RANGES), description, keyspace);
+        for (UntypedResultSet.Row row : rs)
+        {
+            InetAddress peer = row.getInetAddress("peer");
+            Set<ByteBuffer> rawRanges = rawRanges = row.getSet("ranges", 
BytesType.instance);
+            Set<Range<Token>> ranges = new HashSet<>();
+            for (ByteBuffer rawRange : rawRanges)
+            {
+                ranges.add(byteBufferToRange(rawRange, partitioner));
+            }
+            result.put(peer, ranges);
+        }
+        return ImmutableMap.copyOf(result);
+    }
+
     /**
      * Compare the release version in the system.local table with the one 
included in the distro.
      * If they don't match, snapshot all tables in the system keyspace. This 
is intended to be

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/src/java/org/apache/cassandra/dht/StreamStateStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java 
b/src/java/org/apache/cassandra/dht/StreamStateStore.java
index f6046aa..47b3072 100644
--- a/src/java/org/apache/cassandra/dht/StreamStateStore.java
+++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java
@@ -66,6 +66,11 @@ public class StreamStateStore implements StreamEventHandler
             StreamEvent.SessionCompleteEvent se = 
(StreamEvent.SessionCompleteEvent) event;
             if (se.success)
             {
+                Set<String> keyspaces = 
se.transferredRangesPerKeyspace.keySet();
+                for (String keyspace : keyspaces)
+                {
+                    SystemKeyspace.updateTransferredRanges(se.description, 
se.peer, keyspace, se.transferredRangesPerKeyspace.get(keyspace));
+                }
                 for (StreamRequest request : se.requests)
                 {
                     SystemKeyspace.updateAvailableRanges(request.keyspace, 
request.ranges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 2810e2f..3d2037a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -171,6 +171,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     private boolean isSurveyMode = 
Boolean.parseBoolean(System.getProperty("cassandra.write_survey", "false"));
     /* true if node is rebuilding and receiving data */
     private final AtomicBoolean isRebuilding = new AtomicBoolean();
+    private final AtomicBoolean isDecommissioning = new AtomicBoolean();
 
     private volatile boolean initialized = false;
     private volatile boolean joined = false;
@@ -3665,41 +3666,63 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             throw new UnsupportedOperationException("local node is not a 
member of the token ring yet");
         if (tokenMetadata.cloneAfterAllLeft().sortedTokens().size() < 2)
             throw new UnsupportedOperationException("no other normal nodes in 
the ring; decommission would be pointless");
-        if (operationMode != Mode.NORMAL)
+        if (operationMode != Mode.LEAVING && operationMode != Mode.NORMAL)
             throw new UnsupportedOperationException("Node in " + operationMode 
+ " state; wait for status to become normal or restart");
-
-        PendingRangeCalculatorService.instance.blockUntilFinished();
-        for (String keyspaceName : 
Schema.instance.getNonLocalStrategyKeyspaces())
-        {
-            if (tokenMetadata.getPendingRanges(keyspaceName, 
FBUtilities.getBroadcastAddress()).size() > 0)
-                throw new UnsupportedOperationException("data is currently 
moving to this node; unable to leave the ring");
-        }
+        if (isDecommissioning.compareAndSet(true, true))
+            throw new IllegalStateException("Node is still decommissioning. 
Check nodetool netstats.");
 
         if (logger.isDebugEnabled())
             logger.debug("DECOMMISSIONING");
-        startLeaving();
-        long timeout = Math.max(RING_DELAY, 
BatchlogManager.instance.getBatchlogTimeout());
-        setMode(Mode.LEAVING, "sleeping " + timeout + " ms for batch 
processing and pending range setup", true);
-        Thread.sleep(timeout);
 
-        Runnable finishLeaving = new Runnable()
+        try
         {
-            public void run()
+            PendingRangeCalculatorService.instance.blockUntilFinished();
+            for (String keyspaceName : 
Schema.instance.getNonLocalStrategyKeyspaces())
             {
-                shutdownClientServers();
-                Gossiper.instance.stop();
-                try {
-                    MessagingService.instance().shutdown();
-                } catch (IOError ioe) {
-                    logger.info("failed to shutdown message service: {}", ioe);
-                }
-                StageManager.shutdownNow();
-                
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED);
-                setMode(Mode.DECOMMISSIONED, true);
-                // let op be responsible for killing the process
+                if (tokenMetadata.getPendingRanges(keyspaceName, 
FBUtilities.getBroadcastAddress()).size() > 0)
+                    throw new UnsupportedOperationException("data is currently 
moving to this node; unable to leave the ring");
             }
-        };
-        unbootstrap(finishLeaving);
+
+            startLeaving();
+            long timeout = Math.max(RING_DELAY, 
BatchlogManager.instance.getBatchlogTimeout());
+            setMode(Mode.LEAVING, "sleeping " + timeout + " ms for batch 
processing and pending range setup", true);
+            Thread.sleep(timeout);
+
+            Runnable finishLeaving = new Runnable()
+            {
+                public void run()
+                {
+                    shutdownClientServers();
+                    Gossiper.instance.stop();
+                    try
+                    {
+                        MessagingService.instance().shutdown();
+                    }
+                    catch (IOError ioe)
+                    {
+                        logger.info("failed to shutdown message service: {}", 
ioe);
+                    }
+                    StageManager.shutdownNow();
+                    
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED);
+                    setMode(Mode.DECOMMISSIONED, true);
+                    // let op be responsible for killing the process
+                }
+            };
+            unbootstrap(finishLeaving);
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException("Node interrupted while 
decommissioning");
+        }
+        catch (ExecutionException e)
+        {
+            logger.error("Error while decommissioning node ", e.getCause());
+            throw new RuntimeException("Error while decommissioning node: " + 
e.getCause().getMessage());
+        }
+        finally
+        {
+            isDecommissioning.set(false);
+        }
     }
 
     private void leaveRing()
@@ -3714,7 +3737,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
     }
 
-    private void unbootstrap(Runnable onFinish)
+    private void unbootstrap(Runnable onFinish) throws ExecutionException, 
InterruptedException
     {
         Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new 
HashMap<>();
 
@@ -3736,14 +3759,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         // Wait for batch log to complete before streaming hints.
         logger.debug("waiting for batch log processing.");
-        try
-        {
-            batchlogReplay.get();
-        }
-        catch (ExecutionException | InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
+        batchlogReplay.get();
 
         setMode(Mode.LEAVING, "streaming hints to other nodes", true);
 
@@ -3751,15 +3767,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         // wait for the transfer runnables to signal the latch.
         logger.debug("waiting for stream acks.");
-        try
-        {
-            streamSuccess.get();
-            hintsSuccess.get();
-        }
-        catch (ExecutionException | InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
+        streamSuccess.get();
+        hintsSuccess.get();
         logger.debug("stream acks all received.");
         leaveRing();
         onFinish.run();
@@ -4541,6 +4550,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     {
         // First, we build a list of ranges to stream to each host, per table
         Map<String, Map<InetAddress, List<Range<Token>>>> 
sessionsToStreamByKeyspace = new HashMap<>();
+
         for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : 
rangesToStreamByKeyspace.entrySet())
         {
             String keyspace = entry.getKey();
@@ -4549,12 +4559,22 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             if (rangesWithEndpoints.isEmpty())
                 continue;
 
+            Map<InetAddress, Set<Range<Token>>> transferredRangePerKeyspace = 
SystemKeyspace.getTransferredRanges("Unbootstrap",
+                                                                               
                                   keyspace,
+                                                                               
                                   
StorageService.instance.getTokenMetadata().partitioner);
             Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new 
HashMap<>();
             for (Map.Entry<Range<Token>, InetAddress> endPointEntry : 
rangesWithEndpoints.entries())
             {
                 Range<Token> range = endPointEntry.getKey();
                 InetAddress endpoint = endPointEntry.getValue();
 
+                Set<Range<Token>> transferredRanges = 
transferredRangePerKeyspace.get(endpoint);
+                if (transferredRanges != null && 
transferredRanges.contains(range))
+                {
+                    logger.debug("Skipping transferred range {} of keyspace 
{}, endpoint {}", range, keyspace, endpoint);
+                    continue;
+                }
+
                 List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint);
                 if (curRanges == null)
                 {
@@ -4568,6 +4588,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
 
         StreamPlan streamPlan = new StreamPlan("Unbootstrap");
+
+        // Vinculate StreamStateStore to current StreamPlan to update 
transferred ranges per StreamSession
+        streamPlan.listeners(streamStateStore);
+
         for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : 
sessionsToStreamByKeyspace.entrySet())
         {
             String keyspaceName = entry.getKey();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java 
b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index de3db9c..49172fb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -18,11 +18,16 @@
 package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
 public abstract class StreamEvent
 {
     public static enum Type
@@ -47,6 +52,8 @@ public abstract class StreamEvent
         public final boolean success;
         public final int sessionIndex;
         public final Set<StreamRequest> requests;
+        public final String description;
+        public final Map<String, Set<Range<Token>>> 
transferredRangesPerKeyspace;
 
         public SessionCompleteEvent(StreamSession session)
         {
@@ -55,6 +62,8 @@ public abstract class StreamEvent
             this.success = session.isSuccess();
             this.sessionIndex = session.sessionIndex();
             this.requests = ImmutableSet.copyOf(session.requests);
+            this.description = session.description();
+            this.transferredRangesPerKeyspace = 
Collections.unmodifiableMap(session.transferredRangesPerKeyspace);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 11b0847..3af31f8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -142,6 +142,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     /* can be null when session is created in remote */
     private final StreamConnectionFactory factory;
 
+    public final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = 
new HashMap<>();
+
     public final ConnectionHandler handler;
 
     private AtomicBoolean isAborted = new AtomicBoolean(false);
@@ -289,6 +291,13 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         try
         {
             addTransferFiles(sections);
+            Set<Range<Token>> toBeUpdated = 
transferredRangesPerKeyspace.get(keyspace);
+            if (toBeUpdated == null)
+            {
+                toBeUpdated = new HashSet<>();
+            }
+            toBeUpdated.addAll(ranges);
+            transferredRangesPerKeyspace.put(keyspace, toBeUpdated);
         }
         finally
         {

Reply via email to