Repository: cassandra Updated Branches: refs/heads/trunk 07c6a36cc -> a018bcb7d
Establish bootstrap stream sessions sequentially patch by Paulo Motta; reviewed by yukim for CASSANDRA-6992 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a018bcb7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a018bcb7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a018bcb7 Branch: refs/heads/trunk Commit: a018bcb7d7ae74f9fa3f33cce2f0cc0deed6a442 Parents: 07c6a36 Author: Paulo Motta <[email protected]> Authored: Wed Nov 18 16:00:36 2015 -0800 Committer: Yuki Morishita <[email protected]> Committed: Tue Dec 15 15:24:22 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/dht/BootStrapper.java | 3 +- .../org/apache/cassandra/dht/RangeStreamer.java | 5 +- .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../apache/cassandra/repair/LocalSyncTask.java | 2 +- .../cassandra/repair/StreamingRepairTask.java | 2 +- .../cassandra/service/StorageService.java | 3 +- .../cassandra/streaming/StreamCoordinator.java | 57 +++++++++++++++++++- .../apache/cassandra/streaming/StreamPlan.java | 12 +++-- .../cassandra/streaming/StreamResultFuture.java | 9 ++-- .../apache/cassandra/dht/BootStrapperTest.java | 2 +- 11 files changed, 80 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 991d42a..fb6f7e9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.2 + * Establish bootstrap stream sessions sequentially (CASSANDRA-6992) * Sort compactionhistory output by timestamp (CASSANDRA-10464) * More efficient BTree removal (CASSANDRA-9991) * Make tablehistograms accept the same syntax as tablestats (CASSANDRA-10149) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 8d8f5c7..d10aa3b 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -74,7 +74,8 @@ public class BootStrapper extends ProgressEventNotifierSupport "Bootstrap", useStrictConsistency, DatabaseDescriptor.getEndpointSnitch(), - stateStore); + stateStore, + true); streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 3da6bc8..47e0c15 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -117,13 +117,14 @@ public class RangeStreamer String description, boolean useStrictConsistency, IEndpointSnitch snitch, - StreamStateStore stateStore) + StreamStateStore stateStore, + boolean connectSequentially) { this.metadata = metadata; this.tokens = tokens; this.address = address; this.description = description; - this.streamPlan = new StreamPlan(description, true); + this.streamPlan = new StreamPlan(description, true, connectSequentially); this.useStrictConsistency = useStrictConsistency; this.snitch = snitch; this.stateStore = stateStore; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 3286522..043f6fa 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler client.init(keyspace); outputHandler.output("Established connection to initial hosts"); - StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false).connectionFactory(client.getConnectionFactory()); + StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false).connectionFactory(client.getConnectionFactory()); Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); openSSTables(endpointToRanges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index daace01..e1da497 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -73,7 +73,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler isIncremental = prs.isIncremental; } Tracing.traceRepair(message); - new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this) + new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this) .flushBeforeTransfer(true) // request ranges from the remote node .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index 25ef06e..b6936b6 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -62,7 +62,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); isIncremental = prs.isIncremental; } - new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this) + new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this) .flushBeforeTransfer(true) // request ranges from the remote node .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 4e749dc..069af53 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1106,7 +1106,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE "Rebuild", !replacing && useStrictConsistency, DatabaseDescriptor.getEndpointSnitch(), - streamStateStore); + streamStateStore, + false); streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); if (sourceDc != null) streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 603366d..aac1671 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -41,19 +41,23 @@ public class StreamCoordinator // streaming is handled directly by the ConnectionHandler's incoming and outgoing threads. private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", FBUtilities.getAvailableProcessors()); + private final boolean connectSequentially; private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>(); private final int connectionsPerHost; private StreamConnectionFactory factory; private final boolean keepSSTableLevel; private final boolean isIncremental; + private Iterator<StreamSession> sessionsToConnect = null; - public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, StreamConnectionFactory factory) + public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, + StreamConnectionFactory factory, boolean connectSequentially) { this.connectionsPerHost = connectionsPerHost; this.factory = factory; this.keepSSTableLevel = keepSSTableLevel; this.isIncremental = isIncremental; + this.connectSequentially = connectSequentially; } public void setConnectionFactory(StreamConnectionFactory factory) @@ -89,12 +93,61 @@ public class StreamCoordinator return connectionsPerHost == 0; } - public void connectAllStreamSessions() + public void connect(StreamResultFuture future) + { + if (this.connectSequentially) + connectSequentially(future); + else + connectAllStreamSessions(); + } + + private void connectAllStreamSessions() { for (HostStreamingData data : peerSessions.values()) data.connectAllStreamSessions(); } + private void connectSequentially(StreamResultFuture future) + { + sessionsToConnect = getAllStreamSessions().iterator(); + future.addEventListener(new StreamEventHandler() + { + public void handleStreamEvent(StreamEvent event) + { + if (event.eventType == StreamEvent.Type.STREAM_PREPARED) + { + connectNext(); + } + } + + public void onSuccess(StreamState result) + { + + } + + public void onFailure(Throwable t) + { + + } + }); + connectNext(); + } + + private void connectNext() + { + if (sessionsToConnect == null) + return; + + if (sessionsToConnect.hasNext()) + { + StreamSession next = sessionsToConnect.next(); + logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.getHostAddress()); + streamExecutor.execute(new StreamSessionConnector(next)); + } + else + logger.debug("Finished connecting all sessions"); + } + public synchronized Set<InetAddress> getPeers() { return new HashSet<>(peerSessions.keySet()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 0d963ed..f0fdd55 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -47,19 +47,21 @@ public class StreamPlan */ public StreamPlan(String description) { - this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false); + this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false); } - public StreamPlan(String description, boolean keepSSTableLevels) + public StreamPlan(String description, boolean keepSSTableLevels, boolean connectSequentially) { - this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false); + this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially); } - public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, boolean isIncremental) + public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, + boolean isIncremental, boolean connectSequentially) { this.description = description; this.repairedAt = repairedAt; - this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory()); + this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), + connectSequentially); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 99adab0..2297c83 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -71,10 +71,12 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental) { - this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory())); + this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, + new DefaultConnectionFactory(), false)); } - static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator) + static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, + StreamCoordinator coordinator) { StreamResultFuture future = createAndRegister(planId, description, coordinator); if (listeners != null) @@ -90,7 +92,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> { session.init(future); } - coordinator.connectAllStreamSessions(); + + coordinator.connect(future); return future; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 9b1fa01..8454ec1 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -93,7 +93,7 @@ public class BootStrapperTest InetAddress myEndpoint = InetAddress.getByName("127.0.0.1"); assertEquals(numOldNodes, tmd.sortedTokens().size()); - RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore()); + RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false); IFailureDetector mockFailureDetector = new IFailureDetector() { public boolean isAlive(InetAddress ep)
