Repository: cassandra Updated Branches: refs/heads/trunk b29736c27 -> fa4255f06
Make decommission operation resumable patch by Kaide Mu; reviewed by Paulo Motta for CASSANDRA-12008 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fa4255f0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fa4255f0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fa4255f0 Branch: refs/heads/trunk Commit: fa4255f06b0cc37051a69be20fd20e8e57798702 Parents: b29736c Author: Kaide Mu <[email protected]> Authored: Wed Jul 20 15:23:39 2016 +0200 Committer: Yuki Morishita <[email protected]> Committed: Thu Aug 11 17:02:52 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 46 ++++++++ .../apache/cassandra/dht/StreamStateStore.java | 5 + .../cassandra/service/StorageService.java | 114 +++++++++++-------- .../apache/cassandra/streaming/StreamEvent.java | 9 ++ .../cassandra/streaming/StreamSession.java | 9 ++ 6 files changed, 139 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bba64c6..e215ad9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Make decommission operation resumable (CASSANDRA-12008) * Add support to one-way targeted repair (CASSANDRA-9876) * Remove clientutil jar (CASSANDRA-11635) * Fix compaction throughput throttle (CASSANDRA-12366) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 2bfa88d..2083d54 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -30,6 +30,7 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.SetMultimap; import com.google.common.io.ByteStreams; @@ -96,6 +97,7 @@ public final class SystemKeyspace public static final String SSTABLE_ACTIVITY = "sstable_activity"; public static final String SIZE_ESTIMATES = "size_estimates"; public static final String AVAILABLE_RANGES = "available_ranges"; + public static final String TRANSFERRED_RANGES = "transferred_ranges"; public static final String VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress"; public static final String BUILT_VIEWS = "built_views"; public static final String PREPARED_STATEMENTS = "prepared_statements"; @@ -248,6 +250,16 @@ public final class SystemKeyspace + "ranges set<blob>," + "PRIMARY KEY ((keyspace_name)))"); + private static final CFMetaData TransferredRanges = + compile(TRANSFERRED_RANGES, + "record of transferred ranges for streaming operation", + "CREATE TABLE %s (" + + "operation text," + + "peer inet," + + "keyspace_name text," + + "ranges set<blob>," + + "PRIMARY KEY ((operation, keyspace_name), peer))"); + private static final CFMetaData ViewsBuildsInProgress = compile(VIEWS_BUILDS_IN_PROGRESS, "views builds current progress", @@ -442,6 +454,7 @@ public final class SystemKeyspace SSTableActivity, SizeEstimates, AvailableRanges, + TransferredRanges, ViewsBuildsInProgress, BuiltViews, LegacyHints, @@ -1297,6 +1310,39 @@ public final class SystemKeyspace availableRanges.truncateBlocking(); } + public static synchronized void updateTransferredRanges(String description, + InetAddress peer, + String keyspace, + Collection<Range<Token>> streamedRanges) + { + String cql = "UPDATE system.%s SET ranges = ranges + ? WHERE operation = ? AND peer = ? AND keyspace_name = ?"; + Set<ByteBuffer> rangesToUpdate = new HashSet<>(streamedRanges.size()); + for (Range<Token> range : streamedRanges) + { + rangesToUpdate.add(rangeToBytes(range)); + } + executeInternal(String.format(cql, TRANSFERRED_RANGES), rangesToUpdate, description, peer, keyspace); + } + + public static synchronized Map<InetAddress, Set<Range<Token>>> getTransferredRanges(String description, String keyspace, IPartitioner partitioner) + { + Map<InetAddress, Set<Range<Token>>> result = new HashMap<>(); + String query = "SELECT * FROM system.%s WHERE operation = ? AND keyspace_name = ?"; + UntypedResultSet rs = executeInternal(String.format(query, TRANSFERRED_RANGES), description, keyspace); + for (UntypedResultSet.Row row : rs) + { + InetAddress peer = row.getInetAddress("peer"); + Set<ByteBuffer> rawRanges = rawRanges = row.getSet("ranges", BytesType.instance); + Set<Range<Token>> ranges = new HashSet<>(); + for (ByteBuffer rawRange : rawRanges) + { + ranges.add(byteBufferToRange(rawRange, partitioner)); + } + result.put(peer, ranges); + } + return ImmutableMap.copyOf(result); + } + /** * Compare the release version in the system.local table with the one included in the distro. * If they don't match, snapshot all tables in the system keyspace. This is intended to be http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/src/java/org/apache/cassandra/dht/StreamStateStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java index f6046aa..47b3072 100644 --- a/src/java/org/apache/cassandra/dht/StreamStateStore.java +++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java @@ -66,6 +66,11 @@ public class StreamStateStore implements StreamEventHandler StreamEvent.SessionCompleteEvent se = (StreamEvent.SessionCompleteEvent) event; if (se.success) { + Set<String> keyspaces = se.transferredRangesPerKeyspace.keySet(); + for (String keyspace : keyspaces) + { + SystemKeyspace.updateTransferredRanges(se.description, se.peer, keyspace, se.transferredRangesPerKeyspace.get(keyspace)); + } for (StreamRequest request : se.requests) { SystemKeyspace.updateAvailableRanges(request.keyspace, request.ranges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 2810e2f..3d2037a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -171,6 +171,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private boolean isSurveyMode = Boolean.parseBoolean(System.getProperty("cassandra.write_survey", "false")); /* true if node is rebuilding and receiving data */ private final AtomicBoolean isRebuilding = new AtomicBoolean(); + private final AtomicBoolean isDecommissioning = new AtomicBoolean(); private volatile boolean initialized = false; private volatile boolean joined = false; @@ -3665,41 +3666,63 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new UnsupportedOperationException("local node is not a member of the token ring yet"); if (tokenMetadata.cloneAfterAllLeft().sortedTokens().size() < 2) throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless"); - if (operationMode != Mode.NORMAL) + if (operationMode != Mode.LEAVING && operationMode != Mode.NORMAL) throw new UnsupportedOperationException("Node in " + operationMode + " state; wait for status to become normal or restart"); - - PendingRangeCalculatorService.instance.blockUntilFinished(); - for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) - { - if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddress()).size() > 0) - throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring"); - } + if (isDecommissioning.compareAndSet(true, true)) + throw new IllegalStateException("Node is still decommissioning. Check nodetool netstats."); if (logger.isDebugEnabled()) logger.debug("DECOMMISSIONING"); - startLeaving(); - long timeout = Math.max(RING_DELAY, BatchlogManager.instance.getBatchlogTimeout()); - setMode(Mode.LEAVING, "sleeping " + timeout + " ms for batch processing and pending range setup", true); - Thread.sleep(timeout); - Runnable finishLeaving = new Runnable() + try { - public void run() + PendingRangeCalculatorService.instance.blockUntilFinished(); + for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) { - shutdownClientServers(); - Gossiper.instance.stop(); - try { - MessagingService.instance().shutdown(); - } catch (IOError ioe) { - logger.info("failed to shutdown message service: {}", ioe); - } - StageManager.shutdownNow(); - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED); - setMode(Mode.DECOMMISSIONED, true); - // let op be responsible for killing the process + if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddress()).size() > 0) + throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring"); } - }; - unbootstrap(finishLeaving); + + startLeaving(); + long timeout = Math.max(RING_DELAY, BatchlogManager.instance.getBatchlogTimeout()); + setMode(Mode.LEAVING, "sleeping " + timeout + " ms for batch processing and pending range setup", true); + Thread.sleep(timeout); + + Runnable finishLeaving = new Runnable() + { + public void run() + { + shutdownClientServers(); + Gossiper.instance.stop(); + try + { + MessagingService.instance().shutdown(); + } + catch (IOError ioe) + { + logger.info("failed to shutdown message service: {}", ioe); + } + StageManager.shutdownNow(); + SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED); + setMode(Mode.DECOMMISSIONED, true); + // let op be responsible for killing the process + } + }; + unbootstrap(finishLeaving); + } + catch (InterruptedException e) + { + throw new RuntimeException("Node interrupted while decommissioning"); + } + catch (ExecutionException e) + { + logger.error("Error while decommissioning node ", e.getCause()); + throw new RuntimeException("Error while decommissioning node: " + e.getCause().getMessage()); + } + finally + { + isDecommissioning.set(false); + } } private void leaveRing() @@ -3714,7 +3737,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); } - private void unbootstrap(Runnable onFinish) + private void unbootstrap(Runnable onFinish) throws ExecutionException, InterruptedException { Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<>(); @@ -3736,14 +3759,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // Wait for batch log to complete before streaming hints. logger.debug("waiting for batch log processing."); - try - { - batchlogReplay.get(); - } - catch (ExecutionException | InterruptedException e) - { - throw new RuntimeException(e); - } + batchlogReplay.get(); setMode(Mode.LEAVING, "streaming hints to other nodes", true); @@ -3751,15 +3767,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // wait for the transfer runnables to signal the latch. logger.debug("waiting for stream acks."); - try - { - streamSuccess.get(); - hintsSuccess.get(); - } - catch (ExecutionException | InterruptedException e) - { - throw new RuntimeException(e); - } + streamSuccess.get(); + hintsSuccess.get(); logger.debug("stream acks all received."); leaveRing(); onFinish.run(); @@ -4541,6 +4550,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { // First, we build a list of ranges to stream to each host, per table Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByKeyspace = new HashMap<>(); + for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : rangesToStreamByKeyspace.entrySet()) { String keyspace = entry.getKey(); @@ -4549,12 +4559,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (rangesWithEndpoints.isEmpty()) continue; + Map<InetAddress, Set<Range<Token>>> transferredRangePerKeyspace = SystemKeyspace.getTransferredRanges("Unbootstrap", + keyspace, + StorageService.instance.getTokenMetadata().partitioner); Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<>(); for (Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries()) { Range<Token> range = endPointEntry.getKey(); InetAddress endpoint = endPointEntry.getValue(); + Set<Range<Token>> transferredRanges = transferredRangePerKeyspace.get(endpoint); + if (transferredRanges != null && transferredRanges.contains(range)) + { + logger.debug("Skipping transferred range {} of keyspace {}, endpoint {}", range, keyspace, endpoint); + continue; + } + List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint); if (curRanges == null) { @@ -4568,6 +4588,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } StreamPlan streamPlan = new StreamPlan("Unbootstrap"); + + // Vinculate StreamStateStore to current StreamPlan to update transferred ranges per StreamSession + streamPlan.listeners(streamStateStore); + for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : sessionsToStreamByKeyspace.entrySet()) { String keyspaceName = entry.getKey(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/src/java/org/apache/cassandra/streaming/StreamEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java index de3db9c..49172fb 100644 --- a/src/java/org/apache/cassandra/streaming/StreamEvent.java +++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java @@ -18,11 +18,16 @@ package org.apache.cassandra.streaming; import java.net.InetAddress; +import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.UUID; import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + public abstract class StreamEvent { public static enum Type @@ -47,6 +52,8 @@ public abstract class StreamEvent public final boolean success; public final int sessionIndex; public final Set<StreamRequest> requests; + public final String description; + public final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace; public SessionCompleteEvent(StreamSession session) { @@ -55,6 +62,8 @@ public abstract class StreamEvent this.success = session.isSuccess(); this.sessionIndex = session.sessionIndex(); this.requests = ImmutableSet.copyOf(session.requests); + this.description = session.description(); + this.transferredRangesPerKeyspace = Collections.unmodifiableMap(session.transferredRangesPerKeyspace); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4255f0/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 11b0847..3af31f8 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -142,6 +142,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber /* can be null when session is created in remote */ private final StreamConnectionFactory factory; + public final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<>(); + public final ConnectionHandler handler; private AtomicBoolean isAborted = new AtomicBoolean(false); @@ -289,6 +291,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber try { addTransferFiles(sections); + Set<Range<Token>> toBeUpdated = transferredRangesPerKeyspace.get(keyspace); + if (toBeUpdated == null) + { + toBeUpdated = new HashSet<>(); + } + toBeUpdated.addAll(ranges); + transferredRangesPerKeyspace.put(keyspace, toBeUpdated); } finally {
