stream undelivered hints on decommission patch by Jason Brown; reviewed by jbellis for CASSANDRA-5128
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc66c73b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc66c73b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc66c73b Branch: refs/heads/trunk Commit: cc66c73ba60ec3f062e3fd699b76e352f8b10f6d Parents: 360d1a2 Author: Jonathan Ellis <[email protected]> Authored: Thu Jan 24 13:42:37 2013 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Thu Jan 24 13:42:54 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/locator/TokenMetadata.java | 17 ++- .../apache/cassandra/service/StorageService.java | 86 ++++++++++++--- .../org/apache/cassandra/streaming/StreamOut.java | 10 ++- 4 files changed, 94 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 812abdd..3a53a34 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.1 + * stream undelivered hints on decommission (CASSANDRA-5128) * GossipingPropertyFileSnitch loads saved dc/rack info if needed (CASSANDRA-5133) * drain should flush system CFs too (CASSANDRA-4446) * add inter_dc_tcp_nodelay setting (CASSANDRA-5148) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 3b5f86d..925a811 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -105,14 +105,16 @@ public class TokenMetadata public TokenMetadata() { - this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp), new Topology()); + this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp), + HashBiMap.<InetAddress, UUID>create(), + new Topology()); } - public TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, Topology topology) + private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology) { this.tokenToEndpointMap = tokenToEndpointMap; this.topology = topology; - endpointToHostIdMap = HashBiMap.create(); + endpointToHostIdMap = endpointsMap; sortedTokens = sortTokens(); } @@ -556,7 +558,9 @@ public class TokenMetadata lock.readLock().lock(); try { - return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp), new Topology(topology)); + return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp), + HashBiMap.create(endpointToHostIdMap), + new Topology(topology)); } finally { @@ -719,6 +723,11 @@ public class TokenMetadata } } + public Set<InetAddress> getAllEndpoints() + { + return endpointToHostIdMap.keySet(); + } + /** caller should not modify leavingEndpoints */ public Set<InetAddress> getLeavingEndpoints() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 2f178b6..21f7c69 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2688,12 +2688,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE setMode(Mode.LEAVING, "streaming data to other nodes", true); CountDownLatch latch = streamRanges(rangesToStream); + CountDownLatch hintsLatch = streamHints(); // wait for the transfer runnables to signal the latch. logger.debug("waiting for stream aks."); try { latch.await(); + hintsLatch.await(); } catch (InterruptedException e) { @@ -2704,6 +2706,47 @@ public class StorageService extends NotificationBroadcasterSupport implements IE onFinish.run(); } + private CountDownLatch streamHints() + { + if (HintedHandOffManager.instance.listEndpointsPendingHints().size() == 0) + return new CountDownLatch(0); + + // gather all live nodes in the cluster that aren't also leaving + List<InetAddress> candidates = new ArrayList<InetAddress>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints()); + candidates.remove(FBUtilities.getBroadcastAddress()); + for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); ) + { + InetAddress address = iter.next(); + if (!FailureDetector.instance.isAlive(address)) + iter.remove(); + } + + if (candidates.isEmpty()) + { + logger.warn("Unable to stream hints since no live endpoints seen"); + return new CountDownLatch(0); + } + else + { + // stream to the closest peer as chosen by the snitch + DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates); + InetAddress hintsDestinationHost = candidates.get(0); + + // stream all hints -- range list will be a singleton of "the entire ring" + Token token = StorageService.getPartitioner().getMinimumToken(); + List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token)); + + CountDownLatch latch = new CountDownLatch(1); + StreamOut.transferRanges(hintsDestinationHost, + Table.open(Table.SYSTEM_KS), + Collections.singletonList(Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF)), + ranges, + new CountingDownStreamCallback(latch, hintsDestinationHost), + OperationType.UNBOOTSTRAP); + return latch; + } + } + public void move(String newToken) throws IOException { try @@ -3474,27 +3517,40 @@ public class StorageService extends NotificationBroadcasterSupport implements IE final List<Range<Token>> ranges = rangesEntry.getValue(); final InetAddress newEndpoint = rangesEntry.getKey(); - final IStreamCallback callback = new IStreamCallback() - { - public void onSuccess() - { - latch.countDown(); - } - - public void onFailure() - { - logger.warn("Streaming to " + newEndpoint + " failed"); - onSuccess(); // calling onSuccess for latch countdown - } - }; - // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - StreamOut.transferRanges(newEndpoint, Table.open(table), ranges, callback, OperationType.UNBOOTSTRAP); + StreamOut.transferRanges(newEndpoint, + Table.open(table), + ranges, + new CountingDownStreamCallback(latch, newEndpoint), + OperationType.UNBOOTSTRAP); } } return latch; } + class CountingDownStreamCallback implements IStreamCallback + { + private final CountDownLatch latch; + private final InetAddress targetAddr; + + CountingDownStreamCallback(CountDownLatch latch, InetAddress targetAddr) + { + this.latch = latch; + this.targetAddr = targetAddr; + } + + public void onSuccess() + { + latch.countDown(); + } + + public void onFailure() + { + logger.warn("Streaming to " + targetAddr + " failed"); + onSuccess(); // calling onSuccess for latch countdown + } + }; + /** * Used to request ranges from endpoints in the ring (will block until all data is fetched and ready) * @param ranges ranges to fetch as map of the preferred address and range collection http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/src/java/org/apache/cassandra/streaming/StreamOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java index 7043be4..7855d6b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamOut.java +++ b/src/java/org/apache/cassandra/streaming/StreamOut.java @@ -80,8 +80,16 @@ public class StreamOut */ public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type) { + transferRanges(target, table, table.getColumnFamilyStores(), ranges, callback, type); + } + + /** + * Stream the given ranges to the target endpoint for provided CFs in the given keyspace. + */ + public static void transferRanges(InetAddress target, Table table, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type) + { StreamOutSession session = StreamOutSession.create(table.name, target, callback); - transferRanges(session, table.getColumnFamilyStores(), ranges, type); + transferRanges(session, cfses, ranges, type); } /**
