Repository: cassandra Updated Branches: refs/heads/trunk d3704d8a0 -> 4d67639d3
Parallelize streaming of different keyspaces for bootstrap and rebuild patch by Corentin Chary; reviewed by jasobrown for CASSANDRA-4663 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4d67639d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4d67639d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4d67639d Branch: refs/heads/trunk Commit: 4d67639d38b3e3a6fd0a3487a99b9755abda469d Parents: d3704d8 Author: Corentin Chary <[email protected]> Authored: Wed Dec 7 11:11:06 2016 +0100 Committer: Jason Brown <[email protected]> Committed: Fri Jan 20 11:47:18 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 6 ++++++ src/java/org/apache/cassandra/config/Config.java | 1 + src/java/org/apache/cassandra/config/DatabaseDescriptor.java | 5 +++++ src/java/org/apache/cassandra/dht/BootStrapper.java | 3 ++- src/java/org/apache/cassandra/dht/RangeStreamer.java | 7 +++++-- src/java/org/apache/cassandra/service/StorageService.java | 3 ++- 7 files changed, 22 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ebf161d..86ecbc4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Parallelize streaming of different keyspaces (4663) * Improved compactions metrics (CASSANDRA-13015) * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031) * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 4891706..e728796 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -817,6 +817,12 @@ cross_node_timeout: false # times out in 10 minutes by default # streaming_keep_alive_period_in_secs: 300 +# Limit number of connections per host for streaming +# Increase this when you notice that joins are CPU-bound rather that network +# bound (for example a few nodes with big files). +# streaming_connections_per_host: 1 + + # phi value that must be reached for a host to be marked down. # most users should never need to adjust this. # phi_convict_threshold: 8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index f5a8722..6fb999e 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -101,6 +101,7 @@ public class Config @Deprecated public int streaming_socket_timeout_in_ms = 86400000; //24 hours + public Integer streaming_connections_per_host = 1; public Integer streaming_keep_alive_period_in_secs = 300; //5 minutes public boolean cross_node_timeout = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index c43672a..5aa7065 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2030,6 +2030,11 @@ public class DatabaseDescriptor return conf.streaming_keep_alive_period_in_secs; } + public static int getStreamingConnectionsPerHost() + { + return conf.streaming_connections_per_host; + } + public static String getLocalDataCenter() { return localDC; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 1e00f48..15e75fe 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -77,7 +77,8 @@ public class BootStrapper extends ProgressEventNotifierSupport useStrictConsistency, DatabaseDescriptor.getEndpointSnitch(), stateStore, - true); + true, + DatabaseDescriptor.getStreamingConnectionsPerHost()); streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 504ef7e..46ca779 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -25,6 +25,7 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,13 +148,15 @@ public class RangeStreamer boolean useStrictConsistency, IEndpointSnitch snitch, StreamStateStore stateStore, - boolean connectSequentially) + boolean connectSequentially, + int connectionsPerHost) { this.metadata = metadata; this.tokens = tokens; this.address = address; this.description = description; - this.streamPlan = new StreamPlan(description, true, connectSequentially); + this.streamPlan = new StreamPlan(description, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost, + true, false, connectSequentially); this.useStrictConsistency = useStrictConsistency; this.snitch = snitch; this.stateStore = stateStore; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ad1d978..6cb67fc 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1136,7 +1136,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE useStrictConsistency && !replacing, DatabaseDescriptor.getEndpointSnitch(), streamStateStore, - false); + false, + DatabaseDescriptor.getStreamingConnectionsPerHost()); streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); if (sourceDc != null) streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
