Repository: cassandra Updated Branches: refs/heads/trunk 85c518cb3 -> ba926ff6d
(stress) Add datacenter option to -node options patch by Christopher Batey; reviewed by Aleksey Yeschenko for CASSANDRA-11591 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ba926ff6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ba926ff6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ba926ff6 Branch: refs/heads/trunk Commit: ba926ff6d8c09834d5c45f4eae8d75d9051b1058 Parents: 85c518c Author: Christopher Batey <[email protected]> Authored: Sun Apr 17 21:15:52 2016 +0100 Committer: Aleksey Yeschenko <[email protected]> Committed: Mon Apr 18 17:15:15 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/stress/settings/SettingsNode.java | 8 +++++++- .../cassandra/stress/util/JavaDriverClient.java | 19 ++++++++++++++----- 3 files changed, 22 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba926ff6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fd8514c..7f284fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.6 + * (stress) Add datacenter option to -node options (CASSANDRA-11591) * Fix handling of empty slices (CASSANDRA-11513) * Make number of cores used by cqlsh COPY visible to testing code (CASSANDRA-11437) * Allow filtering on clustering columns for queries without secondary indexes (CASSANDRA-11310) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba926ff6/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java index ba1fcb5..89b7871 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java @@ -33,6 +33,7 @@ public class SettingsNode implements Serializable { public final List<String> nodes; public final boolean isWhiteList; + public final String datacenter; public SettingsNode(Options options) { @@ -64,8 +65,12 @@ public class SettingsNode implements Serializable } else + { nodes = Arrays.asList(options.list.value().split(",")); + } + isWhiteList = options.whitelist.setByUser(); + datacenter = options.datacenter.value(); } public Set<String> resolveAllPermitted(StressSettings settings) @@ -135,6 +140,7 @@ public class SettingsNode implements Serializable public static final class Options extends GroupedOptions { + final OptionSimple datacenter = new OptionSimple("datacenter=", ".*", null, "Datacenter used for DCAwareRoundRobinLoadPolicy", false); final OptionSimple whitelist = new OptionSimple("whitelist", "", null, "Limit communications to the provided nodes", false); final OptionSimple file = new OptionSimple("file=", ".*", null, "Node file (one per line)", false); final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", "localhost", "comma delimited list of nodes", false); @@ -142,7 +148,7 @@ public class SettingsNode implements Serializable @Override public List<? extends Option> options() { - return Arrays.asList(whitelist, file, list); + return Arrays.asList(datacenter, whitelist, file, list); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba926ff6/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java index 4f173b4..53d8786 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java @@ -24,6 +24,7 @@ import javax.net.ssl.SSLContext; import com.datastax.driver.core.*; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.WhiteListPolicy; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.Slf4JLoggerFactory; @@ -51,7 +52,7 @@ public class JavaDriverClient private final EncryptionOptions.ClientEncryptionOptions encryptionOptions; private Cluster cluster; private Session session; - private final WhiteListPolicy whitelist; + private final LoadBalancingPolicy loadBalancingPolicy; private static final ConcurrentMap<String, PreparedStatement> stmts = new ConcurrentHashMap<>(); @@ -69,10 +70,18 @@ public class JavaDriverClient this.password = settings.mode.password; this.authProvider = settings.mode.authProvider; this.encryptionOptions = encryptionOptions; + + DCAwareRoundRobinPolicy.Builder policyBuilder = DCAwareRoundRobinPolicy.builder(); + if (settings.node.datacenter != null) + policyBuilder.withLocalDc(settings.node.datacenter); + if (settings.node.isWhiteList) - whitelist = new WhiteListPolicy(DCAwareRoundRobinPolicy.builder().build(), settings.node.resolveAll(settings.port.nativePort)); + loadBalancingPolicy = new WhiteListPolicy(policyBuilder.build(), settings.node.resolveAll(settings.port.nativePort)); + else if (settings.node.datacenter != null) + loadBalancingPolicy = policyBuilder.build(); else - whitelist = null; + loadBalancingPolicy = null; + connectionsPerHost = settings.mode.connectionsPerHost == null ? 8 : settings.mode.connectionsPerHost; int maxThreadCount = 0; @@ -119,8 +128,8 @@ public class JavaDriverClient .withoutJMXReporting() .withProtocolVersion(protocolVersion) .withoutMetrics(); // The driver uses metrics 3 with conflict with our version - if (whitelist != null) - clusterBuilder.withLoadBalancingPolicy(whitelist); + if (loadBalancingPolicy != null) + clusterBuilder.withLoadBalancingPolicy(loadBalancingPolicy); clusterBuilder.withCompression(compression); if (encryptionOptions.enabled) {
