This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cassandra-2.2 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push: new 9705d82 Add an ability to run bootstrap / streaming tests with in-JVM dtest framework. 9705d82 is described below commit 9705d823cddfe24356ba4f3f083b9371cdbdeb4d Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Tue Jan 14 15:56:59 2020 +0100 Add an ability to run bootstrap / streaming tests with in-JVM dtest framework. Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-15497. --- .../org/apache/cassandra/service/GCInspector.java | 3 +- .../cassandra/distributed/api/IInstanceConfig.java | 4 + .../distributed/impl/AbstractCluster.java | 170 +++++++++++++++------ .../distributed/impl/DistributedTestSnitch.java | 60 +++++++- .../cassandra/distributed/impl/InstanceConfig.java | 10 +- .../distributed/impl/NetworkTopology.java | 60 +++++++- .../cassandra/distributed/test/BootstrapTest.java | 104 +++++++++++++ .../distributed/test/DistributedTestBase.java | 1 + .../distributed/test/NetworkTopologyTest.java | 5 +- 9 files changed, 357 insertions(+), 60 deletions(-) diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java index 31de151..4f93097 100644 --- a/src/java/org/apache/cassandra/service/GCInspector.java +++ b/src/java/org/apache/cassandra/service/GCInspector.java @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.sstable.SSTableDeletingTask; +import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.StatusLogger; public class GCInspector implements NotificationListener, GCInspectorMXBean @@ -147,7 +148,7 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean gcStates.put(gc.getName(), new GCState(gc, assumeGCIsPartiallyConcurrent(gc), assumeGCIsOldGen(gc))); } - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); + MBeanWrapper.instance.registerMBean(this, new ObjectName(MBEAN_NAME)); } catch (Exception e) { diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java index dd21b96..d2804c2 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java @@ -47,6 +47,10 @@ public interface IInstanceConfig */ void propagate(Object writeToConfig); + /** + * Validates whether the config properties are within range of accepted values. + */ + void validate(); Object get(String fieldName); String getString(String fieldName); int getInt(String fieldName); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index 1ee0c14..474ade8 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@ -59,7 +59,6 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.SimpleCondition; /** @@ -101,6 +100,8 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, private final List<I> instances; private final Map<InetAddressAndPort, I> instanceMap; + private final Versions.Version initialVersion; + // mutated by user-facing API private final MessageFilters filters; @@ -131,7 +132,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, private IInvokableInstance newInstance(int generation) { ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader); - return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader) + return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>) Instance::new, classLoader) .apply(config, classLoader); } @@ -210,18 +211,19 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, } } - protected AbstractCluster(File root, Versions.Version version, List<InstanceConfig> configs, + protected AbstractCluster(File root, Versions.Version initialVersion, List<InstanceConfig> configs, ClassLoader sharedClassLoader) { this.root = root; this.sharedClassLoader = sharedClassLoader; this.instances = new ArrayList<>(); this.instanceMap = new HashMap<>(); + this.initialVersion = initialVersion; int generation = AbstractCluster.generation.incrementAndGet(); for (InstanceConfig config : configs) { - I instance = newInstanceWrapper(generation, version, config); + I instance = newInstanceWrapperInternal(generation, initialVersion, config); instances.add(instance); // we use the config().broadcastAddressAndPort() here because we have not initialised the Instance I prev = instanceMap.put(instance.broadcastAddressAndPort(), instance); @@ -233,6 +235,32 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, protected abstract I newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config); + protected I newInstanceWrapperInternal(int generation, Versions.Version version, InstanceConfig config) + { + config.validate(); + return newInstanceWrapper(generation, version, config); + } + + public I bootstrap(InstanceConfig config) + { + if (!config.has(Feature.GOSSIP) || !config.has(Feature.NETWORK)) + throw new IllegalStateException("New nodes can only be bootstrapped when gossip and networking is enabled."); + + I instance = newInstanceWrapperInternal(0, initialVersion, config); + + instances.add(instance); + I prev = instanceMap.put(config.broadcastAddressAndPort(), instance); + + if (null != prev) + { + throw new IllegalStateException(String.format("This cluster already contains a node (%d) with with same address and port: %s", + config.num, + instance)); + } + + return instance; + } + /** * WARNING: we index from 1 here, for consistency with inet address! */ @@ -240,18 +268,29 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, { return instances.get(node - 1).coordinator(); } + /** * WARNING: we index from 1 here, for consistency with inet address! */ - public I get(int node) { return instances.get(node - 1); } - public I get(InetAddressAndPort addr) { return instanceMap.get(addr); } + public I get(int node) + { + return instances.get(node - 1); + } + + public I get(InetAddressAndPort addr) + { + return instanceMap.get(addr); + } public int size() { return instances.size(); } - public Stream<I> stream() { return instances.stream(); } + public Stream<I> stream() + { + return instances.stream(); + } public Stream<I> stream(String dcName) { @@ -264,13 +303,24 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, i.config().localRack().equals(rackName)); } - public void forEach(IIsolatedExecutor.SerializableRunnable runnable) { forEach(i -> i.sync(runnable)); } - public void forEach(Consumer<? super I> consumer) { forEach(instances, consumer); } - public void forEach(List<I> instancesForOp, Consumer<? super I> consumer) { instancesForOp.forEach(consumer); } + public void forEach(IIsolatedExecutor.SerializableRunnable runnable) + { + forEach(i -> i.sync(runnable)); + } + + public void forEach(Consumer<? super I> consumer) + { + forEach(instances, consumer); + } + + public void forEach(List<I> instancesForOp, Consumer<? super I> consumer) + { + instancesForOp.forEach(consumer); + } public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit) { - parallelForEach(instances, consumer, timeout, unit); + parallelForEach(instances, consumer, timeout, unit); } public void parallelForEach(List<I> instances, IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit) @@ -316,12 +366,12 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, private void updateMessagingVersions() { - for (IInstance reportTo: instances) + for (IInstance reportTo : instances) { if (reportTo.isShutdown()) continue; - for (IInstance reportFrom: instances) + for (IInstance reportFrom : instances) { if (reportFrom == reportTo || reportFrom.isShutdown()) continue; @@ -391,15 +441,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, } - /** * Will wait for a schema change AND agreement that occurs after it is created * (and precedes the invocation to waitForAgreement) - * + * <p> * Works by simply checking if all UUIDs agree after any schema version change event, * so long as the waitForAgreement method has been entered (indicating the change has * taken place on the coordinator) - * + * <p> * This could perhaps be made a little more robust, but this should more than suffice. */ public class SchemaChangeMonitor extends ChangeMonitor @@ -462,9 +511,11 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, // and then start any instances with it disabled in parallel. List<I> startSequentially = new ArrayList<>(); List<I> startParallel = new ArrayList<>(); - for (I instance : instances) + for (int i = 0; i < instances.size(); i++) { - if ((boolean) instance.config().get("auto_bootstrap")) + I instance = instances.get(i); + + if (i == 0 || (boolean) instance.config().get("auto_bootstrap")) startSequentially.add(instance); else startParallel.add(instance); @@ -486,9 +537,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, private final Factory<I, C> factory; private int nodeCount; private int subnet; - private Map<Integer, Pair<String,String>> nodeIdTopology; + private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology; + private TokenSupplier tokenSupplier; private File root; - private Versions.Version version; + private Versions.Version version = Versions.CURRENT; private Consumer<InstanceConfig> configUpdater; public Builder(Factory<I, C> factory) @@ -496,13 +548,20 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, this.factory = factory; } + public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier) + { + this.tokenSupplier = tokenSupplier; + return this; + } + public Builder<I, C> withSubnet(int subnet) { this.subnet = subnet; return this; } - public Builder<I, C> withNodes(int nodeCount) { + public Builder<I, C> withNodes(int nodeCount) + { this.nodeCount = nodeCount; return this; } @@ -534,7 +593,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, for (int rack = 1; rack <= racksPerDC; rack++) { for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++) - nodeIdTopology.put(nodeId++, Pair.create(dcName(dc), rackName(rack))); + nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack))); } } // adjust the node count to match the allocatation @@ -564,14 +623,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, nodeIdTopology = new HashMap<>(); } for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++) - nodeIdTopology.put(nodeId, Pair.create(dcName, rackName)); + nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName)); nodeCount += nodesInRack; return this; } // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount - public Builder<I, C> withNodeIdTopology(Map<Integer,Pair<String,String>> nodeIdTopology) + public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) { if (nodeIdTopology.isEmpty()) throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId."); @@ -585,7 +644,6 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, { nodeCount = nodeIdTopology.size(); logger.info("Adjusting node count to {} for supplied network topology", nodeCount); - } this.nodeIdTopology = new HashMap<>(nodeIdTopology); @@ -613,22 +671,18 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, public C createWithoutStarting() throws IOException { - File root = this.root; - Versions.Version version = this.version; - if (root == null) root = Files.createTempDirectory("dtests").toFile(); - if (version == null) - version = Versions.CURRENT; - if (nodeCount <= 0) throw new IllegalStateException("Cluster must have at least one node"); if (nodeIdTopology == null) + { nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed() .collect(Collectors.toMap(nodeId -> nodeId, - nodeId -> Pair.create(dcName(0), rackName(0)))); + nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0)))); + } root.mkdirs(); setupLogging(root); @@ -636,27 +690,40 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader(); List<InstanceConfig> configs = new ArrayList<>(); - long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / nodeCount); - - String ipPrefix = "127.0." + subnet + "."; - String seedIp = ipPrefix + "1"; - NetworkTopology networkTopology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology); + if (tokenSupplier == null) + tokenSupplier = evenlyDistributedTokens(nodeCount); - for (int i = 0 ; i < nodeCount ; ++i) + for (int i = 0; i < nodeCount; ++i) { int nodeNum = i + 1; - String ipAddress = ipPrefix + nodeNum; - InstanceConfig config = InstanceConfig.generate(i + 1, ipAddress, networkTopology, root, String.valueOf(token), seedIp); - if (configUpdater != null) - configUpdater.accept(config); - configs.add(config); - token += increment; + configs.add(createInstanceConfig(nodeNum)); } return factory.newCluster(root, version, configs, sharedClassLoader); } + public InstanceConfig newInstanceConfig(C cluster) + { + return createInstanceConfig(cluster.size() + 1); + } + + private InstanceConfig createInstanceConfig(int nodeNum) + { + String ipPrefix = "127.0." + subnet + "."; + String seedIp = ipPrefix + "1"; + String ipAddress = ipPrefix + nodeNum; + long token = tokenSupplier.token(nodeNum); + + NetworkTopology topology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology); + + InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp); + if (configUpdater != null) + configUpdater.accept(config); + + return config; + } + public C start() throws IOException { C cluster = createWithoutStarting(); @@ -665,6 +732,21 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, } } + public static TokenSupplier evenlyDistributedTokens(int numNodes) + { + long increment = (Long.MAX_VALUE / numNodes) * 2; + return (int nodeId) -> { + assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy", + nodeId, numNodes); + return Long.MIN_VALUE + 1 + nodeId * increment; + }; + } + + public static interface TokenSupplier + { + public long token(int nodeId); + } + static String dcName(int index) { return "datacenter" + index; diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java index 35e2903..f8f157a 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java @@ -19,43 +19,91 @@ package org.apache.cassandra.distributed.impl; import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; import org.apache.cassandra.config.Config; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; public class DistributedTestSnitch extends AbstractNetworkTopologySnitch { private static NetworkTopology mapping = null; + private Map<InetAddressAndPort, Map<String, String>> savedEndpoints; + private static final String DEFAULT_DC = "UNKNOWN_DC"; + private static final String DEFAULT_RACK = "UNKNOWN_RACK"; + public String getRack(InetAddress endpoint) { - assert mapping != null : "network topology must be assigned before using snitch"; int storage_port = Config.getOverrideLoadConfig().get().storage_port; - return mapping.localRack(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port)); + return getRack(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port)); } public String getRack(InetAddressAndPort endpoint) { assert mapping != null : "network topology must be assigned before using snitch"; - return mapping.localRack(endpoint); + return maybeGetFromEndpointState(mapping.localRack(endpoint), endpoint, ApplicationState.RACK, DEFAULT_RACK); } public String getDatacenter(InetAddress endpoint) { - assert mapping != null : "network topology must be assigned before using snitch"; int storage_port = Config.getOverrideLoadConfig().get().storage_port; - return mapping.localDC(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port)); + return getDatacenter(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port)); } public String getDatacenter(InetAddressAndPort endpoint) { assert mapping != null : "network topology must be assigned before using snitch"; - return mapping.localDC(endpoint); + return maybeGetFromEndpointState(mapping.localDC(endpoint), endpoint, ApplicationState.DC, DEFAULT_DC); + } + + // Here, the logic is slightly different from what we have in GossipingPropertyFileSnitch since we have a different + // goal. Passed argument (topology that was set on the node) overrides anything that is passed elsewhere. + private String maybeGetFromEndpointState(String current, InetAddressAndPort endpoint, ApplicationState state, String defaultValue) + { + if (current != null) + return current; + + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint.address); + if (epState == null || epState.getApplicationState(state) == null) + { + if (savedEndpoints == null) + { + savedEndpoints = new HashMap<>(); + int storage_port = Config.getOverrideLoadConfig().get().storage_port; + for (Map.Entry<InetAddress, Map<String, String>> entry : SystemKeyspace.loadDcRackInfo().entrySet()) + { + savedEndpoints.put(InetAddressAndPort.getByAddressOverrideDefaults(endpoint.address, storage_port), + entry.getValue()); + } + } + + if (savedEndpoints.containsKey(endpoint)) + return savedEndpoints.get(endpoint).get("data_center"); + + return defaultValue; + } + + return epState.getApplicationState(state).value; } static void assign(NetworkTopology newMapping) { mapping = new NetworkTopology(newMapping); } + + public void gossiperStarting() + { + super.gossiperStarting(); + + Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, + StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress())); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index c53012b..6d668e6 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@ -22,6 +22,7 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.SimpleSeedProvider; @@ -92,7 +93,8 @@ public class InstanceConfig implements IInstanceConfig this.num = num; this.networkTopology = networkTopology; this.hostId = java.util.UUID.randomUUID(); - this .set("broadcast_address", broadcast_address) + this .set("num_tokens", 1) + .set("broadcast_address", broadcast_address) .set("listen_address", listen_address) .set("broadcast_rpc_address", broadcast_rpc_address) .set("rpc_address", rpc_address) @@ -187,6 +189,12 @@ public class InstanceConfig implements IInstanceConfig propagate(writeToConfig, e.getKey(), e.getValue(), true); } + public void validate() + { + if (((int) get("num_tokens")) > 1) + throw new IllegalArgumentException("In-JVM dtests do not support vnodes as of now."); + } + private void propagate(Object writeToConfig, String fieldName, Object value, boolean ignoreMissing) { if (value == NULL) diff --git a/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java b/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java index 1176b32..f7c31ff 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java @@ -22,6 +22,9 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.InetAddressAndPort; @@ -29,7 +32,24 @@ import org.apache.cassandra.utils.Pair; public class NetworkTopology { - private final Map<InetAddressAndPort, Pair<String, String>> map; + private final Map<InetAddressAndPort, DcAndRack> map; + + public static class DcAndRack + { + private final String dc; + private final String rack; + + private DcAndRack(String dc, String rack) + { + this.dc = dc; + this.rack = rack; + } + } + + public static DcAndRack dcAndRack(String dc, String rack) + { + return new DcAndRack(dc, rack); + } private NetworkTopology() { map = new HashMap<>(); @@ -41,7 +61,7 @@ public class NetworkTopology map = new HashMap<>(networkTopology.map); } - public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, Pair<String, String>> nodeIdTopology) + public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, DcAndRack> nodeIdTopology) { final NetworkTopology topology = new NetworkTopology(); @@ -51,7 +71,7 @@ public class NetworkTopology try { - Pair<String,String> dcAndRack = nodeIdTopology.get(nodeId); + DcAndRack dcAndRack = nodeIdTopology.get(nodeId); if (dcAndRack == null) throw new IllegalStateException("nodeId " + nodeId + "not found in instanceMap"); @@ -67,23 +87,51 @@ public class NetworkTopology return topology; } - public Pair<String, String> put(InetAddressAndPort key, Pair<String, String> value) + public DcAndRack put(InetAddressAndPort key, DcAndRack value) { return map.put(key, value); } public String localRack(InetAddressAndPort key) { - return map.get(key).right; + DcAndRack p = map.get(key); + if (p == null) + return null; + return p.rack; } public String localDC(InetAddressAndPort key) { - return map.get(key).left; + DcAndRack p = map.get(key); + if (p == null) + return null; + return p.dc; } public boolean contains(InetAddressAndPort key) { return map.containsKey(key); } + + public String toString() + { + return "NetworkTopology{" + map + '}'; + } + + + public static Map<Integer, NetworkTopology.DcAndRack> singleDcNetworkTopology(int nodeCount, + String dc, + String rack) + { + return networkTopology(nodeCount, (nodeid) -> NetworkTopology.dcAndRack(dc, rack)); + } + + public static Map<Integer, NetworkTopology.DcAndRack> networkTopology(int nodeCount, + IntFunction<DcAndRack> dcAndRackSupplier) + { + + return IntStream.rangeClosed(1, nodeCount).boxed() + .collect(Collectors.toMap(nodeId -> nodeId, + dcAndRackSupplier::apply)); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java new file mode 100644 index 0000000..0af26ea --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.impl.IInvokableInstance; +import org.apache.cassandra.distributed.impl.InstanceConfig; +import org.apache.cassandra.distributed.impl.NetworkTopology; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class BootstrapTest extends DistributedTestBase +{ + + @Test + public void bootstrapTest() throws Throwable + { + int originalNodeCount = 2; + int expandedNodeCount = originalNodeCount + 1; + Cluster.Builder<IInvokableInstance, Cluster> builder = Cluster.build(originalNodeCount) + .withTokenSupplier(Cluster.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)); + + Map<Integer, Long> withBootstrap = null; + Map<Integer, Long> naturally = null; + + try (Cluster cluster = builder.start()) + { + populate(cluster); + + InstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .newInstanceConfig(cluster); + config.set("auto_bootstrap", true); + + IInstance newInstance = cluster.bootstrap(config); + newInstance.startup(); + + cluster.stream().forEach(instance -> { + instance.nodetool("cleanup", KEYSPACE, "tbl"); + }); + + withBootstrap = count(cluster); + } + + builder = Cluster.build(expandedNodeCount) + .withTokenSupplier(Cluster.evenlyDistributedTokens(expandedNodeCount)) + .withConfig(config -> config.with(NETWORK, GOSSIP)); + + try (Cluster cluster = builder.start()) + { + populate(cluster); + naturally = count(cluster); + } + + Assert.assertEquals(withBootstrap, naturally); + } + + public void populate(Cluster cluster) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + "};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + for (int i = 0; i < 1000; i++) + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)", + ConsistencyLevel.QUORUM, + i, i, i); + } + + public Map<Integer, Long> count(Cluster cluster) + { + return IntStream.rangeClosed(1, cluster.size()) + .boxed() + .collect(Collectors.toMap(nodeId -> nodeId, + nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0])); + } + +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java index 745e1ab..7a3d52d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java @@ -67,6 +67,7 @@ public class DistributedTestBase @BeforeClass public static void setup() { + System.setProperty("cassandra.ring_delay_ms", Integer.toString(10 * 1000)); System.setProperty("org.apache.cassandra.disable_mbean_registration", "true"); nativeLibraryWorkaround(); processReaperWorkaround(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java index 2c4f9c8..a9c2cee 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java @@ -28,6 +28,7 @@ import org.junit.Test; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.impl.NetworkTopology; import org.apache.cassandra.utils.Pair; public class NetworkTopologyTest extends DistributedTestBase @@ -36,7 +37,7 @@ public class NetworkTopologyTest extends DistributedTestBase public void namedDcTest() throws Throwable { try (Cluster cluster = Cluster.build() - .withNodeIdTopology(Collections.singletonMap(1, Pair.create("somewhere", "rack0"))) + .withNodeIdTopology(Collections.singletonMap(1, NetworkTopology.dcAndRack("somewhere", "rack0"))) .withRack("elsewhere", "firstrack", 1) .withRack("elsewhere", "secondrack", 2) .withDC("nearthere", 4) @@ -94,6 +95,6 @@ public class NetworkTopologyTest extends DistributedTestBase @Test(expected = IllegalStateException.class) public void noHolesInNodeIdTopologyTest() { - Cluster.build().withNodeIdTopology(Collections.singletonMap(2, Pair.create("doomed", "rack"))); + Cluster.build().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack"))); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org