This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 5373bd911bbf6128c5c985b52f432536fd7dcb0f Merge: 9660499 8ffa79f Author: David Capwell <[email protected]> AuthorDate: Tue Nov 10 10:48:53 2020 -0800 Merge branch 'cassandra-3.11' into trunk .../cassandra/config/YamlConfigurationLoader.java | 10 +++++-- .../apache/cassandra/distributed/Constants.java | 34 ++++++++++++++++++++++ .../cassandra/distributed/UpgradeableCluster.java | 1 + .../distributed/impl/AbstractCluster.java | 4 +-- .../cassandra/distributed/impl/Instance.java | 10 ++++--- 5 files changed, 51 insertions(+), 8 deletions(-) diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index 36bcb44,f104e00..228385e --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@@ -36,10 -35,8 +36,9 @@@ import java.util.concurrent.atomic.Atom import java.util.function.BiConsumer; import java.util.function.BiPredicate; import java.util.function.Consumer; - import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import com.google.common.collect.Sets; @@@ -354,13 -324,18 +354,13 @@@ public abstract class AbstractCluster< private InstanceConfig createInstanceConfig(int nodeNum) { - String ipPrefix = "127.0." + subnet + "."; - String seedIp = ipPrefix + "1"; - String ipAddress = ipPrefix + nodeNum; + INodeProvisionStrategy provisionStrategy = nodeProvisionStrategy.create(subnet); long token = tokenSupplier.token(nodeNum); - - NetworkTopology topology = NetworkTopology.build(ipPrefix, broadcastPort, nodeIdTopology); - - InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp, datadirCount); + NetworkTopology topology = buildNetworkTopology(provisionStrategy, nodeIdTopology); + InstanceConfig config = InstanceConfig.generate(nodeNum, provisionStrategy, topology, root, Long.toString(token), datadirCount); - config.set("dtest.api.cluster_id", clusterId.toString()); + config.set(Constants.KEY_DTEST_API_CLUSTER_ID, clusterId.toString()); if (configUpdater != null) configUpdater.accept(config); - return config; } diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 2fc7044,8a7b5a2..f4063a6 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -18,15 -18,17 +18,14 @@@ package org.apache.cassandra.distributed.impl; -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; import java.io.ByteArrayOutputStream; import java.io.Closeable; +import java.io.File; +import java.io.IOException; import java.io.PrintStream; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; --import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@@ -35,9 -37,9 +34,8 @@@ import java.util.concurrent.Completable import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; --import java.util.function.Function; import javax.management.ListenerNotFoundException; import javax.management.Notification; import javax.management.NotificationListener; @@@ -60,13 -59,14 +58,14 @@@ import org.apache.cassandra.cql3.QueryP import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.SystemKeyspaceMigrator40; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.monitoring.ApproximateTime; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.Constants; +import org.apache.cassandra.distributed.action.GossipHelper; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.ICoordinator; import org.apache.cassandra.distributed.api.IInstance; @@@ -142,11 -133,10 +141,11 @@@ public class Instance extends IsolatedE { super("node" + config.num(), classLoader); this.config = config; - Object clusterId = Objects.requireNonNull(config.get("dtest.api.cluster_id"), "cluster_id is not defined"); + Object clusterId = Objects.requireNonNull(config.get(Constants.KEY_DTEST_API_CLUSTER_ID), "cluster_id is not defined"); ClusterIDDefiner.setId("cluster-" + clusterId); InstanceIDDefiner.setInstanceId(config.num()); - FBUtilities.setBroadcastInetAddress(config.broadcastAddress().getAddress()); + FBUtilities.setBroadcastInetAddressAndPort(InetAddressAndPort.getByAddressOverrideDefaults(config.broadcastAddress().getAddress(), + config.broadcastAddress().getPort())); // Set the config at instance creation, possibly before startup() has run on all other instances. // setMessagingVersions below will call runOnInstance which will instantiate @@@ -527,12 -593,72 +526,15 @@@ new File(dir).mkdirs(); } - private static Config loadConfig(IInstanceConfig overrides) + private Config loadConfig(IInstanceConfig overrides) { Map<String,Object> params = ((InstanceConfig) overrides).getParams(); - return YamlConfigurationLoader.fromMap(params, Config.class); + boolean check = true; + if (overrides.get(Constants.KEY_DTEST_API_CONFIG_CHECK) != null) + check = (boolean) overrides.get(Constants.KEY_DTEST_API_CONFIG_CHECK); + return YamlConfigurationLoader.fromMap(params, check, Config.class); } - private void initializeRing(ICluster cluster) - { - // This should be done outside instance in order to avoid serializing config - String partitionerName = config.getString("partitioner"); - List<String> initialTokens = new ArrayList<>(); - List<InetSocketAddress> hosts = new ArrayList<>(); - List<UUID> hostIds = new ArrayList<>(); - for (int i = 1 ; i <= cluster.size() ; ++i) - { - IInstanceConfig config = cluster.get(i).config(); - initialTokens.add(config.getString("initial_token")); - hosts.add(config.broadcastAddress()); - hostIds.add(config.hostId()); - } - - try - { - IPartitioner partitioner = FBUtilities.newPartitioner(partitionerName); - StorageService storageService = StorageService.instance; - List<Token> tokens = new ArrayList<>(); - for (String token : initialTokens) - tokens.add(partitioner.getTokenFactory().fromString(token)); - - for (int i = 0; i < tokens.size(); i++) - { - InetSocketAddress ep = hosts.get(i); - UUID hostId = hostIds.get(i); - Token token = tokens.get(i); - Gossiper.runInGossipStageBlocking(() -> { - Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostId, 1); - Gossiper.instance.injectApplicationState(ep.getAddress(), - ApplicationState.TOKENS, - new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token))); - storageService.onChange(ep.getAddress(), - ApplicationState.STATUS, - new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token))); - Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress())); - }); - - int messagingVersion = cluster.get(ep).isShutdown() - ? MessagingService.current_version - : Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion()); - MessagingService.instance().setVersion(ep.getAddress(), messagingVersion); - } - - // check that all nodes are in token metadata - for (int i = 0; i < tokens.size(); ++i) - assert storageService.getTokenMetadata().isMember(hosts.get(i).getAddress()); - - storageService.setNormalModeUnsafe(); - } - catch (Throwable e) // UnknownHostException - { - throw new RuntimeException(e); - } - } - public Future<Void> shutdown() { return shutdown(true); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
