This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 41952a2 Improve machinery for testing bootstrap and range movements
41952a2 is described below
commit 41952a2f73ba5198250f64beba8f7ff1203204ab
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Jul 7 10:48:33 2020 +0200
Improve machinery for testing bootstrap and range movements
Patch by Alex Petrov; reviewed by Aleksey Yeschenko and David Capwell for
CASSANDRA-15935
---
.../org/apache/cassandra/dht/BootStrapper.java | 6 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 25 +-
.../org/apache/cassandra/gms/VersionedValue.java | 11 +
.../org/apache/cassandra/net/MessagingService.java | 6 +
.../apache/cassandra/schema/MigrationManager.java | 13 +-
.../apache/cassandra/service/StorageService.java | 236 ++++++-----
.../cassandra/distributed/UpgradeableCluster.java | 1 -
.../cassandra/distributed/action/GossipHelper.java | 457 +++++++++++++++++++++
.../distributed/action/InstanceAction.java | 27 ++
.../distributed/impl/AbstractCluster.java | 45 +-
.../distributed/impl/DistributedTestSnitch.java | 4 +-
.../cassandra/distributed/impl/Instance.java | 90 +---
.../distributed/impl/IsolatedExecutor.java | 2 +-
.../shared/VersionedApplicationState.java | 35 ++
.../cassandra/distributed/test/BootstrapTest.java | 102 -----
.../cassandra/distributed/test/TestBaseImpl.java | 3 +-
.../distributed/test/ring/BootstrapTest.java | 130 ++++++
.../ring/CommunicationDuringDecommissionTest.java | 76 ++++
.../distributed/test/ring/NodeNotInRingTest.java | 75 ++++
.../distributed/test/ring/PendingWritesTest.java | 109 +++++
test/unit/org/apache/cassandra/Util.java | 3 +-
21 files changed, 1154 insertions(+), 302 deletions(-)
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java
b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 94bf283..bc64325 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -152,7 +152,7 @@ public class BootStrapper extends
ProgressEventNotifierSupport
* otherwise, if allocationKeyspace is specified use the token allocation
algorithm to generate suitable tokens
* else choose num_tokens tokens at random
*/
- public static Collection<Token> getBootstrapTokens(final TokenMetadata
metadata, InetAddressAndPort address, int schemaWaitDelay) throws
ConfigurationException
+ public static Collection<Token> getBootstrapTokens(final TokenMetadata
metadata, InetAddressAndPort address, long schemaWaitDelay) throws
ConfigurationException
{
String allocationKeyspace =
DatabaseDescriptor.getAllocateTokensForKeyspace();
Integer allocationLocalRf =
DatabaseDescriptor.getAllocateTokensForLocalRf();
@@ -205,7 +205,7 @@ public class BootStrapper extends
ProgressEventNotifierSupport
InetAddressAndPort address,
String allocationKeyspace,
int numTokens,
- int schemaWaitDelay)
+ long schemaWaitDelay)
{
StorageService.instance.waitForSchema(schemaWaitDelay);
if
(!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
@@ -226,7 +226,7 @@ public class BootStrapper extends
ProgressEventNotifierSupport
InetAddressAndPort address,
int rf,
int numTokens,
- int schemaWaitDelay)
+ long schemaWaitDelay)
{
StorageService.instance.waitForSchema(schemaWaitDelay);
if
(!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 316a3cd..51e7e54 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -708,6 +708,16 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
assassinateEndpoint(address);
}
+ @VisibleForTesting
+ public void unsafeAnulEndpoint(InetAddressAndPort endpoint)
+ {
+ removeEndpoint(endpoint);
+ justRemovedEndpoints.remove(endpoint);
+ endpointStateMap.remove(endpoint);
+ expireTimeEndpointMap.remove(endpoint);
+ unreachableEndpoints.remove(endpoint);
+ }
+
/**
* Do not call this method unless you know what you are doing.
* It will try extremely hard to obliterate any endpoint from the ring,
@@ -1312,7 +1322,8 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
return pieces[0];
}
- void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap)
+ @VisibleForTesting
+ public void applyStateLocally(Map<InetAddressAndPort, EndpointState>
epStateMap)
{
checkProperThreadForStateMutation();
for (Entry<InetAddressAndPort, EndpointState> entry :
epStateMap.entrySet())
@@ -1785,9 +1796,9 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
private void addLocalApplicationStateInternal(ApplicationState state,
VersionedValue value)
{
assert taskLock.isHeldByCurrentThread();
- EndpointState epState =
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
InetAddressAndPort epAddr = FBUtilities.getBroadcastAddressAndPort();
- assert epState != null;
+ EndpointState epState = endpointStateMap.get(epAddr);
+ assert epState != null : "Can't find endpoint state for " + epAddr;
// Fire "before change" notifications:
doBeforeChangeNotifications(epAddr, epState, state, value);
// Notifications may have taken some time, so preventively raise the
version
@@ -1887,6 +1898,12 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
@VisibleForTesting
public void initializeNodeUnsafe(InetAddressAndPort addr, UUID uuid, int
generationNbr)
{
+ initializeNodeUnsafe(addr, uuid, MessagingService.current_version,
generationNbr);
+ }
+
+ @VisibleForTesting
+ public void initializeNodeUnsafe(InetAddressAndPort addr, UUID uuid, int
netVersion, int generationNbr)
+ {
HeartBeatState hbState = new HeartBeatState(generationNbr);
EndpointState newState = new EndpointState(hbState);
newState.markAlive();
@@ -1895,7 +1912,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
// always add the version state
Map<ApplicationState, VersionedValue> states = new
EnumMap<>(ApplicationState.class);
- states.put(ApplicationState.NET_VERSION,
StorageService.instance.valueFactory.networkVersion());
+ states.put(ApplicationState.NET_VERSION,
StorageService.instance.valueFactory.networkVersion(netVersion));
states.put(ApplicationState.HOST_ID,
StorageService.instance.valueFactory.hostId(uuid));
localState.addApplicationStates(states);
}
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index caf6ce9..3dc4c57 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -100,6 +100,11 @@ public class VersionedValue implements
Comparable<VersionedValue>
this(value, VersionGenerator.getNextVersion());
}
+ public static VersionedValue unsafeMakeVersionedValue(String value, int
version)
+ {
+ return new VersionedValue(value, version);
+ }
+
public int compareTo(VersionedValue value)
{
return this.version - value.version;
@@ -272,6 +277,12 @@ public class VersionedValue implements
Comparable<VersionedValue>
return new VersionedValue(version);
}
+ @VisibleForTesting
+ public VersionedValue networkVersion(int version)
+ {
+ return new VersionedValue(String.valueOf(version));
+ }
+
public VersionedValue networkVersion()
{
return new
VersionedValue(String.valueOf(MessagingService.current_version));
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java
b/src/java/org/apache/cassandra/net/MessagingService.java
index c0f57f8..4d712e8 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -431,6 +431,12 @@ public final class MessagingService extends
MessagingServiceMBeanImpl
public void shutdown(long timeout, TimeUnit units, boolean
shutdownGracefully, boolean shutdownExecutors)
{
+ if (isShuttingDown)
+ {
+ logger.info("Shutdown was already called");
+ return;
+ }
+
isShuttingDown = true;
logger.info("Waiting for messaging service to quiesce");
// We may need to schedule hints on the mutation stage, so it's
erroneous to shut down the mutation stage first
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java
b/src/java/org/apache/cassandra/schema/MigrationManager.java
index 4f91d94..8f627cd 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -21,8 +21,9 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
+import java.util.function.LongSupplier;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,13 @@ public class MigrationManager
public static final MigrationManager instance = new MigrationManager();
- private static final RuntimeMXBean runtimeMXBean =
ManagementFactory.getRuntimeMXBean();
+ private static LongSupplier getUptimeFn = () ->
ManagementFactory.getRuntimeMXBean().getUptime();
+
+ @VisibleForTesting
+ public static void setUptimeFn(LongSupplier supplier)
+ {
+ getUptimeFn = supplier;
+ }
private static final int MIGRATION_DELAY_IN_MS = 60000;
@@ -100,7 +107,7 @@ public class MigrationManager
return;
}
- if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() <
MIGRATION_DELAY_IN_MS)
+ if (Schema.instance.isEmpty() || getUptimeFn.getAsLong() <
MIGRATION_DELAY_IN_MS)
{
// If we think we may be bootstrapping or have recently started,
submit MigrationTask immediately
logger.debug("Immediately submitting migration task for {}, " +
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 746d599..4140680 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -135,6 +135,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
{
private static final Logger logger =
LoggerFactory.getLogger(StorageService.class);
+ public static final int INDEFINITE = -1;
public static final int RING_DELAY = getRingDelay(); // delay after which
we assume ring has stablized
private final JMXProgressSupport progressSupport = new
JMXProgressSupport(this);
@@ -148,7 +149,9 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
return Integer.parseInt(newdelay);
}
else
+ {
return 30 * 1000;
+ }
}
/* This abstraction maintains the token/endpoint metadata information */
@@ -258,7 +261,9 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
return joined;
}
- /** This method updates the local token on disk */
+ /**
+ * This method updates the local token on disk
+ */
public void setTokens(Collection<Token> tokens)
{
assert tokens != null && !tokens.isEmpty() : "Node needs at least one
token.";
@@ -850,10 +855,10 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
}
}
- public void waitForSchema(int delay)
+ public void waitForSchema(long delay)
{
// first sleep the delay to make sure we see all our peers
- for (int i = 0; i < delay; i += 1000)
+ for (long i = 0; i < delay; i += 1000)
{
// if we see schema, we can proceed to the next check directly
if (!Schema.instance.isEmpty())
@@ -873,7 +878,16 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
}
}
- private void joinTokenRing(int delay) throws ConfigurationException
+ private void joinTokenRing(long schemaTimeoutMillis) throws
ConfigurationException
+ {
+ joinTokenRing(!isSurveyMode, shouldBootstrap(), schemaTimeoutMillis,
INDEFINITE);
+ }
+
+ @VisibleForTesting
+ public void joinTokenRing(boolean finishJoiningRing,
+ boolean shouldBootstrap,
+ long schemaTimeoutMillis,
+ long bootstrapTimeoutMillis) throws
ConfigurationException
{
joined = true;
@@ -901,101 +915,18 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
}
boolean dataAvailable = true; // make this to false when bootstrap
streaming failed
- boolean bootstrap = shouldBootstrap();
- if (bootstrap)
- {
- if (SystemKeyspace.bootstrapInProgress())
- logger.warn("Detected previous bootstrap failure; retrying");
- else
-
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.IN_PROGRESS);
- setMode(Mode.JOINING, "waiting for ring information", true);
- waitForSchema(delay);
- setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
- setMode(Mode.JOINING, "waiting for pending range calculation",
true);
- PendingRangeCalculatorService.instance.blockUntilFinished();
- setMode(Mode.JOINING, "calculation complete, ready to bootstrap",
true);
-
- logger.debug("... got ring + schema info");
-
- if (useStrictConsistency && !allowSimultaneousMoves() &&
- (
- tokenMetadata.getBootstrapTokens().valueSet().size() >
0 ||
- tokenMetadata.getSizeOfLeavingEndpoints() > 0 ||
- tokenMetadata.getSizeOfMovingEndpoints() > 0
- ))
- {
- String bootstrapTokens =
StringUtils.join(tokenMetadata.getBootstrapTokens().valueSet(), ',');
- String leavingTokens =
StringUtils.join(tokenMetadata.getLeavingEndpoints(), ',');
- String movingTokens =
StringUtils.join(tokenMetadata.getMovingEndpoints().stream().map(e ->
e.right).toArray(), ',');
- throw new UnsupportedOperationException(String.format("Other
bootstrapping/leaving/moving nodes detected, cannot bootstrap while
cassandra.consistent.rangemovement is true. Nodes detected, bootstrapping: %s;
leaving: %s; moving: %s;", bootstrapTokens, leavingTokens, movingTokens));
- }
-
- // get bootstrap tokens
- if (!replacing)
- {
- if
(tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
- {
- String s = "This node is already a member of the token
ring; bootstrap aborted. (If replacing a dead node, remove the old one from the
ring first.)";
- throw new UnsupportedOperationException(s);
- }
- setMode(Mode.JOINING, "getting bootstrap token", true);
- bootstrapTokens =
BootStrapper.getBootstrapTokens(tokenMetadata,
FBUtilities.getBroadcastAddressAndPort(), delay);
- }
- else
- {
- if (!isReplacingSameAddress())
- {
- try
- {
- // Sleep additionally to make sure that the server
actually is not alive
- // and giving it more time to gossip if alive.
- Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
-
- // check for operator errors...
- for (Token token : bootstrapTokens)
- {
- InetAddressAndPort existing =
tokenMetadata.getEndpoint(token);
- if (existing != null)
- {
- long nanoDelay = delay * 1000000L;
- if
(Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() >
(System.nanoTime() - nanoDelay))
- throw new
UnsupportedOperationException("Cannot replace a live node... ");
- current.add(existing);
- }
- else
- {
- throw new UnsupportedOperationException("Cannot
replace token " + token + " which does not exist!");
- }
- }
- }
- else
- {
- try
- {
- Thread.sleep(RING_DELAY);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
- setMode(Mode.JOINING, "Replacing a node with token(s): " +
bootstrapTokens, true);
- }
-
- dataAvailable = bootstrap(bootstrapTokens);
+ if (shouldBootstrap)
+ {
+ current.addAll(prepareForBootstrap(schemaTimeoutMillis));
+ dataAvailable = bootstrap(bootstrapTokens, bootstrapTimeoutMillis);
}
else
{
bootstrapTokens = SystemKeyspace.getSavedTokens();
if (bootstrapTokens.isEmpty())
{
- bootstrapTokens =
BootStrapper.getBootstrapTokens(tokenMetadata,
FBUtilities.getBroadcastAddressAndPort(), delay);
+ bootstrapTokens =
BootStrapper.getBootstrapTokens(tokenMetadata,
FBUtilities.getBroadcastAddressAndPort(), schemaTimeoutMillis);
}
else
{
@@ -1008,11 +939,11 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
setUpDistributedSystemKeyspaces();
- if (!isSurveyMode)
+ if (finishJoiningRing)
{
if (dataAvailable)
{
- finishJoiningRing(bootstrap, bootstrapTokens);
+ finishJoiningRing(shouldBootstrap, bootstrapTokens);
// remove the existing info about the replaced node.
if (!current.isEmpty())
{
@@ -1107,7 +1038,8 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
.forEach(cfs ->
cfs.indexManager.executePreJoinTasksBlocking(bootstrap));
}
- private void finishJoiningRing(boolean didBootstrap, Collection<Token>
tokens)
+ @VisibleForTesting
+ public void finishJoiningRing(boolean didBootstrap, Collection<Token>
tokens)
{
// start participating in the ring.
setMode(Mode.JOINING, "Finish joining ring", true);
@@ -1140,7 +1072,8 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
return authSetupComplete;
}
- private void setUpDistributedSystemKeyspaces()
+ @VisibleForTesting
+ public void setUpDistributedSystemKeyspaces()
{
Collection<Mutation> changes = new ArrayList<>(3);
@@ -1529,6 +1462,96 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
logger.debug(logMsg);
}
+ @VisibleForTesting
+ public Collection<InetAddressAndPort> prepareForBootstrap(long schemaDelay)
+ {
+ Set<InetAddressAndPort> collisions = new HashSet<>();
+ if (SystemKeyspace.bootstrapInProgress())
+ logger.warn("Detected previous bootstrap failure; retrying");
+ else
+
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.IN_PROGRESS);
+ setMode(Mode.JOINING, "waiting for ring information", true);
+ waitForSchema(schemaDelay);
+ setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
+ setMode(Mode.JOINING, "waiting for pending range calculation", true);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+ setMode(Mode.JOINING, "calculation complete, ready to bootstrap",
true);
+
+ logger.debug("... got ring + schema info");
+
+ if (useStrictConsistency && !allowSimultaneousMoves() &&
+ (
+ tokenMetadata.getBootstrapTokens().valueSet().size() > 0 ||
+ tokenMetadata.getSizeOfLeavingEndpoints() > 0 ||
+ tokenMetadata.getSizeOfMovingEndpoints() > 0
+ ))
+ {
+ String bootstrapTokens =
StringUtils.join(tokenMetadata.getBootstrapTokens().valueSet(), ',');
+ String leavingTokens =
StringUtils.join(tokenMetadata.getLeavingEndpoints(), ',');
+ String movingTokens =
StringUtils.join(tokenMetadata.getMovingEndpoints().stream().map(e ->
e.right).toArray(), ',');
+ throw new UnsupportedOperationException(String.format("Other
bootstrapping/leaving/moving nodes detected, cannot bootstrap while
cassandra.consistent.rangemovement is true. Nodes detected, bootstrapping: %s;
leaving: %s; moving: %s;", bootstrapTokens, leavingTokens, movingTokens));
+ }
+
+ // get bootstrap tokens
+ if (!replacing)
+ {
+ if
(tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
+ {
+ String s = "This node is already a member of the token ring;
bootstrap aborted. (If replacing a dead node, remove the old one from the ring
first.)";
+ throw new UnsupportedOperationException(s);
+ }
+ setMode(Mode.JOINING, "getting bootstrap token", true);
+ bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata,
FBUtilities.getBroadcastAddressAndPort(), schemaDelay);
+ }
+ else
+ {
+ if (!isReplacingSameAddress())
+ {
+ try
+ {
+ // Sleep additionally to make sure that the server
actually is not alive
+ // and giving it more time to gossip if alive.
+ Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ // check for operator errors...
+ for (Token token : bootstrapTokens)
+ {
+ InetAddressAndPort existing =
tokenMetadata.getEndpoint(token);
+ if (existing != null)
+ {
+ long nanoDelay = schemaDelay * 1000000L;
+ if
(Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() >
(System.nanoTime() - nanoDelay))
+ throw new UnsupportedOperationException("Cannot
replace a live node... ");
+ collisions.add(existing);
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Cannot
replace token " + token + " which does not exist!");
+ }
+ }
+ }
+ else
+ {
+ try
+ {
+ Thread.sleep(RING_DELAY);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ }
+ setMode(Mode.JOINING, "Replacing a node with token(s): " +
bootstrapTokens, true);
+ }
+ return collisions;
+ }
+
/**
* Bootstrap node by fetching data from other nodes.
* If node is bootstrapping as a new node, then this also announces
bootstrapping to the cluster.
@@ -1538,7 +1561,8 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
* @param tokens bootstrapping tokens
* @return true if bootstrap succeeds.
*/
- private boolean bootstrap(final Collection<Token> tokens)
+ @VisibleForTesting
+ public boolean bootstrap(final Collection<Token> tokens, long
bootstrapTimeoutMillis)
{
isBootstrapMode = true;
SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes
us part of the ring locally which is incorrect until we are done bootstrapping
@@ -1576,13 +1600,13 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
// Force disk boundary invalidation now that local tokens are set
invalidateDiskBoundaries();
- setMode(Mode.JOINING, "Starting to bootstrap...", true);
- BootStrapper bootstrapper = new
BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata);
- bootstrapper.addProgressListener(progressSupport);
- ListenableFuture<StreamState> bootstrapStream =
bootstrapper.bootstrap(streamStateStore, useStrictConsistency && !replacing);
// handles token update
+ Future<StreamState> bootstrapStream = startBootstrap(tokens);
try
{
- bootstrapStream.get();
+ if (bootstrapTimeoutMillis > 0)
+ bootstrapStream.get(bootstrapTimeoutMillis, MILLISECONDS);
+ else
+ bootstrapStream.get();
bootstrapFinished();
logger.info("Bootstrap completed for tokens {}", tokens);
return true;
@@ -1594,6 +1618,14 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
}
}
+ public Future<StreamState> startBootstrap(Collection<Token> tokens)
+ {
+ setMode(Mode.JOINING, "Starting to bootstrap...", true);
+ BootStrapper bootstrapper = new
BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata);
+ bootstrapper.addProgressListener(progressSupport);
+ return bootstrapper.bootstrap(streamStateStore, useStrictConsistency
&& !replacing); // handles token update
+ }
+
private void invalidateDiskBoundaries()
{
for (Keyspace keyspace : Keyspace.all())
diff --git
a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 532c1b1..47fd15b 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -24,7 +24,6 @@ import java.util.function.Consumer;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.impl.AbstractCluster;
-import org.apache.cassandra.distributed.shared.AbstractBuilder;
import org.apache.cassandra.distributed.shared.Versions;
/**
diff --git
a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
new file mode 100644
index 0000000..70ef503
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.action;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.distributed.shared.VersionedApplicationState;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static
org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
+
+public class GossipHelper
+{
+ public static InstanceAction statusToBootstrap(IInvokableInstance newNode)
+ {
+ return (instance) ->
+ {
+ changeGossipState(instance,
+ newNode,
+ Arrays.asList(tokens(newNode),
+ statusBootstrapping(newNode),
+
statusWithPortBootstrapping(newNode)));
+ };
+ }
+
+ public static InstanceAction statusToNormal(IInvokableInstance peer)
+ {
+ return (target) ->
+ {
+ changeGossipState(target,
+ peer,
+ Arrays.asList(tokens(peer),
+ statusNormal(peer),
+ releaseVersion(peer),
+ netVersion(peer),
+ statusWithPortNormal(peer)));
+ };
+ }
+
+ /**
+ * This method is unsafe and should be used _only_ when gossip is not used
or available: it creates versioned values on the
+ * target instance, which means Gossip versioning gets out of sync. Use a
safe couterpart at all times when performing _any_
+ * ring movement operations _or_ if Gossip is used.
+ */
+ public static void unsafeStatusToNormal(IInvokableInstance target,
IInstance peer)
+ {
+ int messagingVersion = peer.getMessagingVersion();
+ changeGossipState(target,
+ peer,
+ Arrays.asList(unsafeVersionedValue(target,
+
ApplicationState.TOKENS,
+ (partitioner,
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens),
+
peer.config().getString("partitioner"),
+
peer.config().getString("initial_token")),
+ unsafeVersionedValue(target,
+
ApplicationState.STATUS,
+ (partitioner,
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
+
peer.config().getString("partitioner"),
+
peer.config().getString("initial_token")),
+ unsafeVersionedValue(target,
+
ApplicationState.STATUS_WITH_PORT,
+ (partitioner,
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
+
peer.config().getString("partitioner"),
+
peer.config().getString("initial_token")),
+ unsafeVersionedValue(target,
+
ApplicationState.NET_VERSION,
+ (partitioner) ->
new
VersionedValue.VersionedValueFactory(partitioner).networkVersion(messagingVersion),
+
peer.config().getString("partitioner")),
+ unsafeReleaseVersion(target,
+
peer.config().getString("partitioner"),
+
peer.getReleaseVersionString())));
+ }
+
+ public static InstanceAction statusToLeaving(IInvokableInstance newNode)
+ {
+ return (instance) -> {
+ changeGossipState(instance,
+ newNode,
+ Arrays.asList(tokens(newNode),
+ statusLeaving(newNode),
+ statusWithPortLeaving(newNode)));
+ };
+ }
+
+ public static InstanceAction bootstrap()
+ {
+ return new BootstrapAction();
+ }
+
+ public static InstanceAction bootstrap(boolean joinRing, Duration
waitForBootstrap, Duration waitForSchema)
+ {
+ return new BootstrapAction(joinRing, waitForBootstrap, waitForSchema);
+ }
+
+ public static InstanceAction disseminateGossipState(IInvokableInstance
newNode)
+ {
+ return new DisseminateGossipState(newNode);
+ }
+
+ public static InstanceAction pullSchemaFrom(IInvokableInstance pullFrom)
+ {
+ return new PullSchemaFrom(pullFrom);
+ }
+
+ private static InstanceAction disableBinary()
+ {
+ return (instance) ->
instance.nodetoolResult("disablebinary").asserts().success();
+ }
+
+ private static class DisseminateGossipState implements InstanceAction
+ {
+ final Map<InetSocketAddress, byte[]> gossipState;
+
+ public DisseminateGossipState(IInvokableInstance... from)
+ {
+ gossipState = new HashMap<>();
+ for (IInvokableInstance node : from)
+ {
+ byte[] epBytes = node.callsOnInstance(() -> {
+ EndpointState epState =
Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort());
+ return toBytes(epState);
+ }).call();
+ gossipState.put(node.broadcastAddress(), epBytes);
+ }
+ }
+
+ public void accept(IInvokableInstance instance)
+ {
+
instance.appliesOnInstance((IIsolatedExecutor.SerializableFunction<Map<InetSocketAddress,
byte[]>, Void>)
+ (map) -> {
+ Map<InetAddressAndPort,
EndpointState> newState = new HashMap<>();
+ for (Map.Entry<InetSocketAddress,
byte[]> e : map.entrySet())
+
newState.put(toCassandraInetAddressAndPort(e.getKey()),
fromBytes(e.getValue()));
+
+
Gossiper.runInGossipStageBlocking(() -> {
+
Gossiper.instance.applyStateLocally(newState);
+ });
+ return null;
+ }).apply(gossipState);
+ }
+ }
+
+ private static byte[] toBytes(EndpointState epState)
+ {
+ try (DataOutputBuffer out = new DataOutputBuffer(1024))
+ {
+ EndpointState.serializer.serialize(epState, out,
MessagingService.current_version);
+ return out.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static EndpointState fromBytes(byte[] bytes)
+ {
+ try (DataInputBuffer in = new DataInputBuffer(bytes))
+ {
+ return EndpointState.serializer.deserialize(in,
MessagingService.current_version);
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+ }
+
+ private static class PullSchemaFrom implements InstanceAction
+ {
+ final InetSocketAddress pullFrom;
+
+ public PullSchemaFrom(IInvokableInstance pullFrom)
+ {
+ this.pullFrom = pullFrom.broadcastAddress();;
+ }
+
+ public void accept(IInvokableInstance pullTo)
+ {
+ pullTo.acceptsOnInstance((InetSocketAddress pullFrom) -> {
+ InetAddressAndPort endpoint =
toCassandraInetAddressAndPort(pullFrom);
+ EndpointState state =
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ MigrationManager.scheduleSchemaPull(endpoint, state);
+ MigrationManager.waitUntilReadyForBootstrap();
+ }).accept(pullFrom);
+ }
+ }
+
+ private static class BootstrapAction implements InstanceAction,
Serializable
+ {
+ private final boolean joinRing;
+ private final Duration waitForBootstrap;
+ private final Duration waitForSchema;
+
+ public BootstrapAction()
+ {
+ this(true, Duration.ofMinutes(10), Duration.ofSeconds(10));
+ }
+
+ public BootstrapAction(boolean joinRing, Duration waitForBootstrap,
Duration waitForSchema)
+ {
+ this.joinRing = joinRing;
+ this.waitForBootstrap = waitForBootstrap;
+ this.waitForSchema = waitForSchema;
+ }
+
+ public void accept(IInvokableInstance instance)
+ {
+ instance.appliesOnInstance((String partitionerString, String
tokenString) -> {
+ IPartitioner partitioner =
FBUtilities.newPartitioner(partitionerString);
+ List<Token> tokens =
Collections.singletonList(partitioner.getTokenFactory().fromString(tokenString));
+ try
+ {
+ Collection<InetAddressAndPort> collisions =
StorageService.instance.prepareForBootstrap(waitForSchema.toMillis());
+ assert collisions.size() == 0 : String.format("Didn't
expect any replacements but got %s", collisions);
+ boolean isBootstrapSuccessful =
StorageService.instance.bootstrap(tokens, waitForBootstrap.toMillis());
+ assert isBootstrapSuccessful : "Bootstrap did not complete
successfully";
+ StorageService.instance.setUpDistributedSystemKeyspaces();
+ if (joinRing)
+ StorageService.instance.finishJoiningRing(true,
tokens);
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+
+ return null;
+ }).apply(instance.config().getString("partitioner"),
instance.config().getString("initial_token"));
+ }
+ }
+
+ public static InstanceAction decomission()
+ {
+ return (target) ->
target.nodetoolResult("decommission").asserts().success();
+ }
+
+
+ public static VersionedApplicationState tokens(IInvokableInstance instance)
+ {
+ return versionedToken(instance, ApplicationState.TOKENS, (partitioner,
tokens) -> new
VersionedValue.VersionedValueFactory(partitioner).tokens(tokens));
+ }
+
+ public static VersionedApplicationState netVersion(IInvokableInstance
instance)
+ {
+ return versionedToken(instance, ApplicationState.NET_VERSION,
(partitioner, tokens) -> new
VersionedValue.VersionedValueFactory(partitioner).networkVersion());
+ }
+
+ private static VersionedApplicationState
unsafeReleaseVersion(IInvokableInstance instance, String partitionerStr, String
releaseVersionStr)
+ {
+ return unsafeVersionedValue(instance,
ApplicationState.RELEASE_VERSION, (partitioner) -> new
VersionedValue.VersionedValueFactory(partitioner).releaseVersion(releaseVersionStr),
partitionerStr);
+ }
+
+ public static VersionedApplicationState releaseVersion(IInvokableInstance
instance)
+ {
+ return unsafeReleaseVersion(instance,
instance.config().getString("partitioner"), instance.getReleaseVersionString());
+ }
+
+ public static VersionedApplicationState statusNormal(IInvokableInstance
instance)
+ {
+ return versionedToken(instance, ApplicationState.STATUS, (partitioner,
tokens) -> new
VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
+ }
+
+ public static VersionedApplicationState
statusWithPortNormal(IInvokableInstance instance)
+ {
+ return versionedToken(instance, ApplicationState.STATUS_WITH_PORT,
(partitioner, tokens) -> new
VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
+ }
+
+ public static VersionedApplicationState
statusBootstrapping(IInvokableInstance instance)
+ {
+ return versionedToken(instance, ApplicationState.STATUS, (partitioner,
tokens) -> new
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
+ }
+
+ public static VersionedApplicationState
statusWithPortBootstrapping(IInvokableInstance instance)
+ {
+ return versionedToken(instance, ApplicationState.STATUS_WITH_PORT,
(partitioner, tokens) -> new
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
+ }
+
+ public static VersionedApplicationState statusLeaving(IInvokableInstance
instance)
+ {
+ return versionedToken(instance, ApplicationState.STATUS, (partitioner,
tokens) -> new
VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
+ }
+
+ public static VersionedApplicationState statusLeft(IInvokableInstance
instance)
+ {
+ return versionedToken(instance, ApplicationState.STATUS, (partitioner,
tokens) -> {
+ return new
VersionedValue.VersionedValueFactory(partitioner).left(tokens,
System.currentTimeMillis() + Gossiper.aVeryLongTime);
+ });
+ }
+
+ public static VersionedApplicationState
statusWithPortLeft(IInvokableInstance instance)
+ {
+ return versionedToken(instance, ApplicationState.STATUS_WITH_PORT,
(partitioner, tokens) -> {
+ return new
VersionedValue.VersionedValueFactory(partitioner).left(tokens,
System.currentTimeMillis() + Gossiper.aVeryLongTime);
+
+ });
+ }
+
+ public static VersionedApplicationState
statusWithPortLeaving(IInvokableInstance instance)
+ {
+ return versionedToken(instance, ApplicationState.STATUS_WITH_PORT,
(partitioner, tokens) -> new
VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
+ }
+
+ public static VersionedValue toVersionedValue(VersionedApplicationState vv)
+ {
+ return VersionedValue.unsafeMakeVersionedValue(vv.value, vv.version);
+ }
+
+ public static ApplicationState
toApplicationState(VersionedApplicationState vv)
+ {
+ return ApplicationState.values()[vv.applicationState];
+ }
+
+ private static VersionedApplicationState
unsafeVersionedValue(IInvokableInstance instance,
+
ApplicationState applicationState,
+
IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>,
VersionedValue> supplier,
+ String
partitionerStr, String initialTokenStr)
+ {
+ return instance.appliesOnInstance((String partitionerString, String
tokenString) -> {
+ IPartitioner partitioner =
FBUtilities.newPartitioner(partitionerString);
+ Token token =
partitioner.getTokenFactory().fromString(tokenString);
+
+ VersionedValue versionedValue = supplier.apply(partitioner,
Collections.singleton(token));
+ return new VersionedApplicationState(applicationState.ordinal(),
versionedValue.value, versionedValue.version);
+ }).apply(partitionerStr, initialTokenStr);
+ }
+
+ private static VersionedApplicationState
unsafeVersionedValue(IInvokableInstance instance,
+
ApplicationState applicationState,
+
IIsolatedExecutor.SerializableFunction<IPartitioner, VersionedValue> supplier,
+ String
partitionerStr)
+ {
+ return instance.appliesOnInstance((String partitionerString) -> {
+ IPartitioner partitioner =
FBUtilities.newPartitioner(partitionerString);
+ VersionedValue versionedValue = supplier.apply(partitioner);
+ return new VersionedApplicationState(applicationState.ordinal(),
versionedValue.value, versionedValue.version);
+ }).apply(partitionerStr);
+ }
+
+ public static VersionedApplicationState versionedToken(IInvokableInstance
instance, ApplicationState applicationState,
IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>,
VersionedValue> supplier)
+ {
+ return unsafeVersionedValue(instance, applicationState, supplier,
instance.config().getString("partitioner"),
instance.config().getString("initial_token"));
+ }
+
+ public static InstanceAction removeFromRing(IInvokableInstance peer)
+ {
+ return (target) -> {
+ InetAddressAndPort endpoint =
toCassandraInetAddressAndPort(peer.broadcastAddress());
+ VersionedApplicationState newState = statusLeft(peer);
+
+ target.runOnInstance(() -> {
+ // state to 'left'
+ EndpointState currentState =
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ ApplicationState as = toApplicationState(newState);
+ VersionedValue vv = toVersionedValue(newState);
+ currentState.addApplicationState(as, vv);
+ StorageService.instance.onChange(endpoint, as, vv);
+
+ // remove from gossip
+ Gossiper.runInGossipStageBlocking(() ->
Gossiper.instance.unsafeAnulEndpoint(endpoint));
+ SystemKeyspace.removeEndpoint(endpoint);
+ PendingRangeCalculatorService.instance.update();
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+ });
+ };
+ }
+
+ /**
+ * Changes gossip state of the `peer` on `target`
+ */
+ public static void changeGossipState(IInvokableInstance target, IInstance
peer, List<VersionedApplicationState> newState)
+ {
+ InetSocketAddress addr = peer.broadcastAddress();
+ UUID hostId = peer.config().hostId();
+ int netVersion = peer.getMessagingVersion();
+ target.runOnInstance(() -> {
+ InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr);
+ StorageService storageService = StorageService.instance;
+
+ Gossiper.runInGossipStageBlocking(() -> {
+ EndpointState state =
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ if (state == null)
+ {
+ Gossiper.instance.initializeNodeUnsafe(endpoint, hostId,
netVersion, 1);
+ state =
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ if (state.isAlive() &&
!Gossiper.instance.isDeadState(state))
+ Gossiper.instance.realMarkAlive(endpoint, state);
+ }
+
+ for (VersionedApplicationState value : newState)
+ {
+ ApplicationState as = toApplicationState(value);
+ VersionedValue vv = toVersionedValue(value);
+ state.addApplicationState(as, vv);
+ storageService.onChange(endpoint, as, vv);
+ }
+ });
+ });
+ }
+
+ public static void withProperty(String prop, boolean value, Runnable r)
+ {
+ String before = System.getProperty(prop);
+ try
+ {
+ System.setProperty(prop, Boolean.toString(value));
+ r.run();
+ }
+ finally
+ {
+ System.setProperty(prop, before == null ? "true" : before);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/distributed/action/InstanceAction.java
b/test/distributed/org/apache/cassandra/distributed/action/InstanceAction.java
new file mode 100644
index 0000000..ce14dbc
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/action/InstanceAction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.action;
+
+import java.util.function.Consumer;
+
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+public interface InstanceAction extends Consumer<IInvokableInstance>
+{
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 361519d..44c9744 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -162,7 +164,6 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
}
}
-
protected class Wrapper extends DelegatingInvokableInstance implements
IUpgradeableInstance
{
private final int generation;
@@ -252,7 +253,7 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
if (!isShutdown && delegate != null)
return delegate().liveMemberCount();
- throw new IllegalStateException("Cannot get live member count on
shutdown instance");
+ throw new IllegalStateException("Cannot get live member count on
shutdown instance: " + config.num());
}
public NodeToolResult nodetoolResult(boolean withNotifications,
String... commandAndArgs)
@@ -387,13 +388,8 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
public I bootstrap(IInstanceConfig config)
{
- if (!config.has(Feature.GOSSIP) || !config.has(Feature.NETWORK))
- throw new IllegalStateException("New nodes can only be
bootstrapped when gossip and networking is enabled.");
-
I instance = newInstanceWrapperInternal(0, initialVersion, config);
-
instances.add(instance);
-
I prev = instanceMap.put(config.broadcastAddress(), instance);
if (null != prev)
@@ -448,6 +444,41 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
i.config().localRack().equals(rackName));
}
+ public void run(Consumer<? super I> action, Predicate<I> filter)
+ {
+ run(Collections.singletonList(action), filter);
+ }
+
+ public void run(Collection<Consumer<? super I>> actions, Predicate<I>
filter)
+ {
+ stream().forEach(instance -> {
+ for (Consumer<? super I> action : actions)
+ {
+ if (filter.test(instance))
+ action.accept(instance);
+ }
+
+ });
+ }
+
+ public void run(Consumer<? super I> action, int instanceId, int...
moreInstanceIds)
+ {
+ run(Collections.singletonList(action), instanceId, moreInstanceIds);
+ }
+
+ public void run(List<Consumer<? super I>> actions, int instanceId, int...
moreInstanceIds)
+ {
+ int[] instanceIds = new int[moreInstanceIds.length + 1];
+ instanceIds[0] = instanceId;
+ System.arraycopy(moreInstanceIds, 0, instanceIds, 1,
moreInstanceIds.length);
+
+ for (int idx : instanceIds)
+ {
+ for (Consumer<? super I> action : actions)
+ action.accept(this.get(idx));
+ }
+ }
+
public void forEach(IIsolatedExecutor.SerializableRunnable runnable)
{
forEach(i -> i.sync(runnable));
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
index fbfda3a..0dfaa7e 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
@@ -40,7 +40,7 @@ public class DistributedTestSnitch extends
AbstractNetworkTopologySnitch
private static final Map<InetAddressAndPort, InetSocketAddress> cache =
new ConcurrentHashMap<>();
private static final Map<InetSocketAddress, InetAddressAndPort>
cacheInverse = new ConcurrentHashMap<>();
- static InetAddressAndPort toCassandraInetAddressAndPort(InetSocketAddress
addressAndPort)
+ public static InetAddressAndPort
toCassandraInetAddressAndPort(InetSocketAddress addressAndPort)
{
InetAddressAndPort m = cacheInverse.get(addressAndPort);
if (m == null)
@@ -51,7 +51,7 @@ public class DistributedTestSnitch extends
AbstractNetworkTopologySnitch
return m;
}
- static InetSocketAddress
fromCassandraInetAddressAndPort(InetAddressAndPort addressAndPort)
+ public static InetSocketAddress
fromCassandraInetAddressAndPort(InetAddressAndPort addressAndPort)
{
InetSocketAddress m = cache.get(addressAndPort);
if (m == null)
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index edd525d..0448cb5 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -35,9 +35,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
-
import javax.management.ListenerNotFoundException;
import javax.management.Notification;
import javax.management.NotificationListener;
@@ -64,8 +64,8 @@ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.SystemKeyspaceMigrator40;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.action.GossipHelper;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
@@ -78,11 +78,8 @@ import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
-import org.apache.cassandra.distributed.shared.InstanceClassLoader;
import org.apache.cassandra.exceptions.StartupException;
-import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
@@ -96,6 +93,7 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.MigrationManager;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.ActiveRepairService;
@@ -110,8 +108,8 @@ import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.streaming.StreamReceiveTask;
import org.apache.cassandra.streaming.StreamTransferTask;
import org.apache.cassandra.streaming.async.StreamingInboundHandler;
-import org.apache.cassandra.tools.Output;
import org.apache.cassandra.tools.NodeTool;
+import org.apache.cassandra.tools.Output;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -142,6 +140,7 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
}};
public final IInstanceConfig config;
+ private final long startedAt = System.nanoTime();
// should never be invoked directly, so that it is instantiated on other
class loader;
// only visible for inheritance
@@ -477,17 +476,23 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting());
if (config.has(GOSSIP))
{
+ MigrationManager.setUptimeFn(() ->
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedAt));
StorageService.instance.initServer();
StorageService.instance.removeShutdownHook();
Gossiper.waitToSettle();
}
else
{
- initializeRing(cluster);
+ cluster.stream().forEach(peer -> {
+ if (cluster instanceof Cluster)
+ GossipHelper.statusToNormal((IInvokableInstance)
peer).accept(this);
+ else
+ GossipHelper.unsafeStatusToNormal(this,
(IInstance) peer);
+ });
+
}
StorageService.instance.ensureTraceKeyspace();
-
SystemKeyspace.finishStartup();
CassandraDaemon.getInstanceForTesting().setupCompleted();
@@ -513,6 +518,7 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
}).run();
}
+
private void mkdirs()
{
new File(config.getString("saved_caches_directory")).mkdirs();
@@ -529,72 +535,6 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
return config;
}
- private void initializeRing(ICluster cluster)
- {
- // This should be done outside instance in order to avoid serializing
config
- String partitionerName = config.getString("partitioner");
- List<String> initialTokens = new ArrayList<>();
- List<InetSocketAddress> hosts = new ArrayList<>();
- List<UUID> hostIds = new ArrayList<>();
- List<String> versions = new ArrayList<>();
- for (int i = 1 ; i <= cluster.size() ; ++i)
- {
- IInstance instance = cluster.get(i);
- IInstanceConfig config = instance.config();
- initialTokens.add(config.getString("initial_token"));
- hosts.add(config.broadcastAddress());
- hostIds.add(config.hostId());
- versions.add(instance.getReleaseVersionString());
- }
-
- try
- {
- IPartitioner partitioner =
FBUtilities.newPartitioner(partitionerName);
- StorageService storageService = StorageService.instance;
- List<Token> tokens = new ArrayList<>();
- for (String token : initialTokens)
- tokens.add(partitioner.getTokenFactory().fromString(token));
-
- for (int i = 0; i < tokens.size(); i++)
- {
- InetSocketAddress ep = hosts.get(i);
- InetAddressAndPort addressAndPort =
toCassandraInetAddressAndPort(ep);
- UUID hostId = hostIds.get(i);
- Token token = tokens.get(i);
- String releaseVersion = versions.get(i);
- Gossiper.runInGossipStageBlocking(() -> {
- Gossiper.instance.initializeNodeUnsafe(addressAndPort,
hostId, 1);
- Gossiper.instance.injectApplicationState(addressAndPort,
-
ApplicationState.TOKENS,
- new
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
- Gossiper.instance.injectApplicationState(addressAndPort,
-
ApplicationState.RELEASE_VERSION,
- new
VersionedValue.VersionedValueFactory(partitioner).releaseVersion(releaseVersion));
- storageService.onChange(addressAndPort,
- ApplicationState.STATUS_WITH_PORT,
- new
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
- storageService.onChange(addressAndPort,
- ApplicationState.STATUS,
- new
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
- Gossiper.instance.realMarkAlive(addressAndPort,
Gossiper.instance.getEndpointStateForEndpoint(addressAndPort));
- });
-
- int messagingVersion = cluster.get(ep).isShutdown()
- ? MessagingService.current_version
- :
Math.min(MessagingService.current_version,
cluster.get(ep).getMessagingVersion());
- MessagingService.instance().versions.set(addressAndPort,
messagingVersion);
- }
-
- // check that all nodes are in token metadata
- for (int i = 0; i < tokens.size(); ++i)
- assert
storageService.getTokenMetadata().isMember(toCassandraInetAddressAndPort(hosts.get(i)));
- }
- catch (Throwable e) // UnknownHostException
- {
- throw new RuntimeException(e);
- }
- }
-
public Future<Void> shutdown()
{
return shutdown(true);
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index 0d8f96f..c3b8019 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -150,7 +150,7 @@ public class IsolatedExecutor implements IIsolatedExecutor
}
catch (IllegalAccessException | InvocationTargetException e)
{
- throw new RuntimeException(e);
+ throw new RuntimeException("Error while transfering object to " +
classLoader, e);
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/VersionedApplicationState.java
b/test/distributed/org/apache/cassandra/distributed/shared/VersionedApplicationState.java
new file mode 100644
index 0000000..fd3e40a
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/shared/VersionedApplicationState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.Serializable;
+
+public class VersionedApplicationState implements Serializable
+{
+ public final int applicationState;
+ public final String value;
+ public final int version;
+
+ public VersionedApplicationState(int applicationState, String value, int
version)
+ {
+ this.applicationState = applicationState;
+ this.value = value;
+ this.version = version;
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
deleted file mode 100644
index 934ad65..0000000
--- a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.ICluster;
-import org.apache.cassandra.distributed.api.IInstanceConfig;
-import org.apache.cassandra.distributed.api.TokenSupplier;
-import org.apache.cassandra.distributed.shared.NetworkTopology;
-
-import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-
-// TODO: this test should be removed after running in-jvm dtests is set up via
the shared API repository
-public class BootstrapTest extends TestBaseImpl
-{
-
- @Test
- public void bootstrapTest() throws Throwable
- {
- int originalNodeCount = 2;
- int expandedNodeCount = originalNodeCount + 1;
- Cluster.Builder builder = builder().withNodes(originalNodeCount)
-
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
-
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount,
"dc0", "rack0"))
- .withConfig(config ->
config.with(NETWORK, GOSSIP));
-
- Map<Integer, Long> withBootstrap = null;
- Map<Integer, Long> naturally = null;
- try (Cluster cluster = builder.withNodes(originalNodeCount).start())
- {
- populate(cluster);
-
- IInstanceConfig config = cluster.newInstanceConfig();
- config.set("auto_bootstrap", true);
-
- cluster.bootstrap(config).startup();
-
- cluster.stream().forEach(instance -> {
- instance.nodetool("cleanup", KEYSPACE, "tbl");
- });
-
- withBootstrap = count(cluster);
- }
-
- builder = builder.withNodes(expandedNodeCount)
-
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
- .withConfig(config -> config.with(NETWORK, GOSSIP));
-
- try (ICluster cluster = builder.start())
- {
- populate(cluster);
- naturally = count(cluster);
- }
-
- for (Map.Entry<Integer, Long> e : withBootstrap.entrySet())
- Assert.assertTrue(e.getValue() >= naturally.get(e.getKey()));
- }
-
- public void populate(ICluster cluster)
- {
- cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + "};");
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck
int, v int, PRIMARY KEY (pk, ck))");
-
- for (int i = 0; i < 1000; i++)
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v) VALUES (?, ?, ?)",
- ConsistencyLevel.QUORUM,
- i, i, i);
- }
-
- public Map<Integer, Long> count(ICluster cluster)
- {
- return IntStream.rangeClosed(1, cluster.size())
- .boxed()
- .collect(Collectors.toMap(nodeId -> nodeId,
- nodeId -> (Long)
cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE +
".tbl")[0][0]));
- }
-}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 09fa1df..d53cbd4 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -46,7 +46,8 @@ public class TestBaseImpl extends DistributedTestBase
}
@BeforeClass
- public static void beforeClass() throws Throwable {
+ public static void beforeClass() throws Throwable
+ {
ICluster.setup();
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
new file mode 100644
index 0000000..e0c5a78
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static java.util.Arrays.asList;
+import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
+import static
org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
+import static
org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
+import static
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class BootstrapTest extends TestBaseImpl
+{
+ @Test
+ public void bootstrapTest() throws Throwable
+ {
+ int originalNodeCount = 2;
+ int expandedNodeCount = originalNodeCount + 1;
+
+ try (Cluster cluster = builder().withNodes(originalNodeCount)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount,
"dc0", "rack0"))
+ .withConfig(config ->
config.with(NETWORK, GOSSIP))
+ .start())
+ {
+ populate(cluster,0, 100);
+
+ IInstanceConfig config = cluster.newInstanceConfig();
+ IInvokableInstance newInstance = cluster.bootstrap(config);
+ withProperty("cassandra.join_ring", false,
+ () -> newInstance.startup(cluster));
+
+ cluster.forEach(statusToBootstrap(newInstance));
+
+ cluster.run(asList(pullSchemaFrom(cluster.get(1)),
+ bootstrap()),
+ newInstance.config().num());
+
+ for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
+ Assert.assertEquals("Node " + e.getKey() + " has incorrect row
state",
+ 100L,
+ e.getValue().longValue());
+ }
+ }
+
+ @Test
+ public void autoBootstrapTest() throws Throwable
+ {
+ int originalNodeCount = 2;
+ int expandedNodeCount = originalNodeCount + 1;
+
+ try (Cluster cluster = builder().withNodes(originalNodeCount)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount,
"dc0", "rack0"))
+ .withConfig(config ->
config.with(NETWORK, GOSSIP))
+ .start())
+ {
+ populate(cluster,0, 100);
+
+ IInstanceConfig config = cluster.newInstanceConfig();
+ config.set("auto_bootstrap", true);
+ IInvokableInstance newInstance = cluster.bootstrap(config);
+ withProperty("cassandra.join_ring", false,
+ () -> newInstance.startup(cluster));
+
+ newInstance.nodetoolResult("join").asserts().success();
+
+ for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
+ Assert.assertEquals("Node " + e.getKey() + " has incorrect row
state", e.getValue().longValue(), 100L);
+ }
+ }
+
+ public static void populate(ICluster cluster, int from, int to)
+ {
+ populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
+ }
+
+ public static void populate(ICluster cluster, int from, int to, int coord,
int rf, ConsistencyLevel cl)
+ {
+ cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + "
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf +
"};");
+ cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ for (int i = from; i < to; i++)
+ {
+ cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE +
".tbl (pk, ck, v) VALUES (?, ?, ?)",
+ cl,
+ i, i, i);
+ }
+ }
+
+ public static Map<Integer, Long> count(ICluster cluster)
+ {
+ return IntStream.rangeClosed(1, cluster.size())
+ .boxed()
+ .collect(Collectors.toMap(nodeId -> nodeId,
+ nodeId -> (Long)
cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE +
".tbl")[0][0]));
+ }
+}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
new file mode 100644
index 0000000..37b1802
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.apache.cassandra.distributed.action.GossipHelper.decomission;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class CommunicationDuringDecommissionTest extends TestBaseImpl
+{
+ @Test
+ public void internodeConnectionsDuringDecom() throws Throwable
+ {
+ try (Cluster cluster = builder().withNodes(4)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+ .start())
+ {
+ BootstrapTest.populate(cluster, 0, 100);
+
+ cluster.run(decomission(), 1);
+
+ cluster.filters().allVerbs().from(1).messagesMatching((i, i1,
iMessage) -> {
+ throw new AssertionError("Decomissioned node should not send
any messages");
+ }).drop();
+
+
+ Map<Integer, Long> connectionAttempts = new HashMap<>();
+ long deadline = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(10);
+
+ // Wait 10 seconds and check if there are any new connection
attempts to the decomissioned node
+ while (System.currentTimeMillis() <= deadline)
+ {
+ for (int i = 2; i <= cluster.size(); i++)
+ {
+ Object[][] res = cluster.get(i).executeInternal("SELECT
active_connections, connection_attempts FROM system_views.internode_outbound
WHERE address = '127.0.0.1' AND port = 7012");
+ Assert.assertEquals(1, res.length);
+ Assert.assertEquals(0L, ((Long) res[0][0]).longValue());
+ long attempts = ((Long) res[0][1]).longValue();
+ if (connectionAttempts.get(i) == null)
+ connectionAttempts.put(i, attempts);
+ else
+ Assert.assertEquals(connectionAttempts.get(i), (Long)
attempts);
+ }
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ring/NodeNotInRingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ring/NodeNotInRingTest.java
new file mode 100644
index 0000000..2333077
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/ring/NodeNotInRingTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.action.GossipHelper;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.net.Verb;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class NodeNotInRingTest extends TestBaseImpl
+{
+ @Test
+ public void nodeNotInRingTest() throws Throwable
+ {
+ try (Cluster cluster = builder().withNodes(3)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP))
+ .start())
+ {
+ cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE +
" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};");
+ cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE +
".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ cluster.filters().verbs(Verb.GOSSIP_DIGEST_ACK.id,
+ Verb.GOSSIP_DIGEST_SYN.id)
+ .from(3)
+ .outbound()
+ .drop()
+ .on();
+ cluster.run(GossipHelper.removeFromRing(cluster.get(3)), 1, 2);
+
+ populate(cluster, 0, 50, 1, ConsistencyLevel.ALL);
+ populate(cluster, 50, 100, 2, ConsistencyLevel.ALL);
+
+ Map<Integer, Long> counts = BootstrapTest.count(cluster);
+ Assert.assertEquals(counts.get(1).longValue(), 100L);
+ Assert.assertEquals(counts.get(2).longValue(), 100L);
+ Assert.assertEquals(counts.get(3).longValue(), 0L);
+ }
+ }
+
+ public static void populate(ICluster cluster, int from, int to, int coord,
ConsistencyLevel cl)
+ {
+ for (int i = from; i < to; i++)
+ {
+ cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE +
".tbl (pk, ck, v) VALUES (?, ?, ?)",
+ cl,
+ i, i, i);
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ring/PendingWritesTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ring/PendingWritesTest.java
new file mode 100644
index 0000000..8daa58a
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/ring/PendingWritesTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.DistributedTestSnitch;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
+import static
org.apache.cassandra.distributed.action.GossipHelper.disseminateGossipState;
+import static
org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
+import static
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class PendingWritesTest extends TestBaseImpl
+{
+ @Test
+ public void testPendingWrites() throws Throwable
+ {
+ int originalNodeCount = 2;
+ int expandedNodeCount = originalNodeCount + 1;
+
+ try (Cluster cluster = builder().withNodes(originalNodeCount)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount,
"dc0", "rack0"))
+ .withConfig(config ->
config.with(NETWORK, GOSSIP))
+ .start())
+ {
+ BootstrapTest.populate(cluster, 0, 100);
+ IInstanceConfig config = cluster.newInstanceConfig();
+ IInvokableInstance newInstance = cluster.bootstrap(config);
+ withProperty("cassandra.join_ring", false,
+ () -> newInstance.startup(cluster));
+
+ cluster.forEach(statusToBootstrap(newInstance));
+ cluster.run(bootstrap(false, Duration.ofSeconds(60),
Duration.ofSeconds(60)), newInstance.config().num());
+
+ cluster.get(1).acceptsOnInstance((InetSocketAddress ip) -> {
+ Set<InetAddressAndPort> set = new HashSet<>();
+ for (Map.Entry<Range<Token>, EndpointsForRange.Builder> e :
StorageService.instance.getTokenMetadata().getPendingRanges(KEYSPACE))
+ {
+ set.addAll(e.getValue().build().endpoints());
+ }
+ Assert.assertEquals(set.size(), 1);
+ Assert.assertTrue(String.format("%s should contain %s", set,
ip),
+
set.contains(DistributedTestSnitch.toCassandraInetAddressAndPort(ip)));
+ }).accept(cluster.get(3).broadcastAddress());
+
+ BootstrapTest.populate(cluster, 100, 150);
+
+ newInstance.nodetoolResult("join").asserts().success();
+
+ cluster.run(disseminateGossipState(newInstance),1, 2);
+
+ cluster.run((instance) -> {
+ instance.runOnInstance(() -> {
+ PendingRangeCalculatorService.instance.update();
+
PendingRangeCalculatorService.instance.blockUntilFinished();
+ });
+ }, 1, 2);
+
+ cluster.get(1).acceptsOnInstance((InetSocketAddress ip) -> {
+ Set<InetAddressAndPort> set = new HashSet<>();
+ for (Map.Entry<Range<Token>, EndpointsForRange.Builder> e :
StorageService.instance.getTokenMetadata().getPendingRanges(KEYSPACE))
+ set.addAll(e.getValue().build().endpoints());
+ assert set.size() == 0 : set;
+ }).accept(cluster.get(3).broadcastAddress());
+
+ for (Map.Entry<Integer, Long> e :
BootstrapTest.count(cluster).entrySet())
+ Assert.assertEquals("Node " + e.getKey() + " has incorrect row
state", e.getValue().longValue(), 150L);
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/Util.java
b/test/unit/org/apache/cassandra/Util.java
index 2c171e7..eba9a7c 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
@@ -224,7 +225,7 @@ public class Util
for (int i=0; i<endpointTokens.size(); i++)
{
InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." +
String.valueOf(i + 1));
- Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 1);
+ Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i),
MessagingService.current_version, 1);
Gossiper.instance.injectApplicationState(ep,
ApplicationState.TOKENS, new
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(endpointTokens.get(i))));
ss.onChange(ep,
ApplicationState.STATUS_WITH_PORT,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]