This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new ad3f0e8f41 Ensure that gossip state for LEFT nodes is expired
eventually
ad3f0e8f41 is described below
commit ad3f0e8f412707f3ace2ae3c41b43aadb853cb82
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Wed Dec 17 13:52:41 2025 +0000
Ensure that gossip state for LEFT nodes is expired eventually
By default the expiry time is calculated on each peer independently. It can
be
made to converge by disabling gossip quarantine using the configuration
setting
gossip_quarantine_disabled or via a hotprop on GossiperMBean.
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-21035
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 25 +++
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../org/apache/cassandra/gms/EndpointState.java | 2 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 184 +++++++++++++++++----
.../org/apache/cassandra/gms/GossiperMBean.java | 4 +
.../org/apache/cassandra/gms/VersionedValue.java | 5 +
.../apache/cassandra/service/StorageService.java | 13 +-
.../tcm/listeners/LegacyStateListener.java | 3 +-
.../tcm/listeners/UpgradeMigrationListener.java | 2 +
.../apache/cassandra/tcm/membership/Directory.java | 5 +-
.../tcm/sequences/SingleNodeSequences.java | 10 +-
.../cassandra/tcm/transformations/Assassinate.java | 10 +-
.../cassandra/distributed/shared/ClusterUtils.java | 3 +-
.../apache/cassandra/distributed/test/CASTest.java | 20 ++-
.../cassandra/distributed/test/CASTestBase.java | 5 +-
.../cassandra/distributed/test/GossipTest.java | 2 +-
.../gossip/GossipExpiryAfterAssassinateTest.java | 48 ++++++
.../gossip/GossipExpiryAfterDecommissionTest.java | 24 +--
.../gossip/GossipExpiryAfterRemoveNodeTest.java | 47 ++++++
.../test/gossip/GossipExpiryTestBase.java | 171 +++++++++++++++++++
.../test/log/ClusterMetadataTestHelper.java | 27 ++-
.../apache/cassandra/gms/GossipExpiryHelper.java | 60 +++++++
.../org/apache/cassandra/gms/GossiperTest.java | 8 +
.../cassandra/tcm/membership/MembershipUtils.java | 11 ++
.../tcm/transformations/PrepareJoinTest.java | 10 +-
26 files changed, 640 insertions(+), 70 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 272cb69c55..557256717a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Ensure peers with LEFT status are expired from gossip state
(CASSANDRA-21035)
* Optimize UTF8Validator.validate for ASCII prefixed Strings (CASSANDRA-21075)
* Switch LatencyMetrics to use ThreadLocalTimer/ThreadLocalCounter
(CASSANDRA-21080)
* Accord: write rejections would be returned to users as server errors rather
than INVALID and TxnReferenceOperation didn't handle all collections prperly
(CASSANDRA-21061)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index d314cb0a8d..a8860be63a 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -1521,4 +1521,29 @@ public class Config
public boolean enforce_native_deadline_for_hints = false;
public boolean paxos_repair_race_wait = true;
+
+ /**
+ * If true, gossip state updates for nodes which have left the cluster
will continue to be processed while the
+ * node is still present in ClusterMetadata. This enables the gossip
expiry time for those nodes (the deadline
+ * after which their state is fully purged from gossip) to converge across
the remaining nodes in the cluster.
+ * This is a change from previous behaviour as historically once a node
has advertised a LEFT status further
+ * updates to gossip state for it are ignored for a period of time to
prevent flapping if older/stale states
+ * are encountered.
+ * Following CEP-21, most significant state changes are handled by the
cluster metadata log, so resurrection
+ * of left nodes is not a problem for gossip to solve and so quarantine is
not really necessary. However,
+ * FailureDetector does still use gossip messages to assess node health
and some external systems still use gossip
+ * state to inform decisions about topology/node health/etc. For those
reasons, for now the disabling of quarantine
+ * is off by default and hot-proppable.
+ *
+ * With quarantine still in effect, expiry from gossip of LEFT nodes will
occur at different times on each peer.
+ * Also, when there are LEFT nodes in gossip, the state will never fully
converge across the cluster as each node
+ * will have its own expiry time for a LEFT peer.
+ *
+ * With quarantine disabled the STATUS_WITH_PORT values for the left node
which include the expiry time will
+ * converge and peers will all evict it from gossip after the same
deadline.
+ *
+ * Eventually, this configuration option should be removed and quarantine
disabled entirely for clusters running
+ * 6.0 and later.
+ */
+ public volatile boolean gossip_quarantine_disabled = false;
}
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 09100da8d3..9e28199f7c 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -6119,4 +6119,14 @@ public class DatabaseDescriptor
{
partitioner = FBUtilities.newPartitioner(name);
}
+
+ public static boolean getGossipQuarantineDisabled()
+ {
+ return conf.gossip_quarantine_disabled;
+ }
+
+ public static void setGossipQuarantineDisabled(boolean disabled)
+ {
+ conf.gossip_quarantine_disabled = disabled;
+ }
}
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 26fe33ec69..41a79caf7f 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -324,7 +324,7 @@ public class EndpointState
public String toString()
{
View view = ref.get();
- return "EndpointState: HeartBeatState = " + view.hbState + ",
AppStateMap = " + view.applicationState;
+ return "EndpointState: HeartBeatState = " + view.hbState + ",
AppStateMap = " + view.applicationState + ", isAlive = " + isAlive;
}
public boolean isSupersededBy(EndpointState that)
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 83bf5169af..6d0195093f 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -87,6 +87,7 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.utils.FBUtilities;
@@ -107,6 +108,10 @@ import static
org.apache.cassandra.gms.Gossiper.GossipedWith.CMS;
import static org.apache.cassandra.gms.Gossiper.GossipedWith.SEED;
import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS;
import static org.apache.cassandra.gms.VersionedValue.HIBERNATE;
+import static org.apache.cassandra.gms.VersionedValue.REMOVED_TOKEN;
+import static org.apache.cassandra.gms.VersionedValue.REMOVING_TOKEN;
+import static org.apache.cassandra.gms.VersionedValue.SHUTDOWN;
+import static org.apache.cassandra.gms.VersionedValue.STATUS_LEFT;
import static org.apache.cassandra.gms.VersionedValue.unsafeMakeVersionedValue;
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.Verb.ECHO_REQ;
@@ -137,8 +142,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
private static final ScheduledExecutorPlus executor =
executorFactory().scheduled("GossipTasks");
static final ApplicationState[] STATES = ApplicationState.values();
- static final List<String> DEAD_STATES =
Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
-
VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
+ static final List<String> DEAD_STATES = Arrays.asList(REMOVING_TOKEN,
REMOVED_TOKEN, STATUS_LEFT, HIBERNATE);
static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>();
static
{
@@ -185,7 +189,15 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
/* map where key is endpoint and value is timestamp when this endpoint was
removed from
* gossip. We will ignore any gossip regarding these endpoints for
QUARANTINE_DELAY time
* after removal to prevent nodes from falsely reincarnating during the
time when removal
- * gossip gets propagated to all nodes */
+ * gossip gets propagated to all nodes.
+ * Note: in future, this need only be used when ClusterMetadataService is
in the GOSSIP state,
+ * i.e. during the major upgrade to the version with CEP-21, but before
the CMS is initialized.
+ * In this state, gossip is still used to propagate changes to broadcast
address and release
+ * version. Once the CMS initialization is complete, this is no longer
necessary.
+ * Currently in order to support a controlled rollout of that change to
behaviour, quarantine
+ * is still used by default, but can be disabled via config
(gossip_quarantine_disabled) or
+ * JMX (GossiperMBean::setQuarantineDisabled)
+ */
private final Map<InetAddressAndPort, Long> justRemovedEndpoints = new
ConcurrentHashMap<>();
private final Map<InetAddressAndPort, Long> expireTimeEndpointMap = new
ConcurrentHashMap<>();
@@ -450,14 +462,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
public static boolean isShutdown(VersionedValue vv)
{
- if (vv == null)
- return false;
-
- String value = vv.value;
- String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
- assert (pieces.length > 0);
- String state = pieces[0];
- return state.equals(VersionedValue.SHUTDOWN);
+ return matchesStatusString(vv, SHUTDOWN);
}
public static boolean isHibernate(EndpointState epState)
@@ -469,15 +474,39 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
}
public static boolean isHibernate(VersionedValue vv)
+ {
+ return matchesStatusString(vv, HIBERNATE);
+ }
+
+ public static boolean isLeft(VersionedValue vv)
+ {
+ return matchesStatusString(vv, STATUS_LEFT);
+ }
+
+ private static boolean matchesStatusString(VersionedValue vv, String
toMatch)
{
if (vv == null)
return false;
- String value = vv.value;
- String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
+ String[] pieces = vv.splitValue();
assert (pieces.length > 0);
String state = pieces[0];
- return state.equals(VersionedValue.HIBERNATE);
+ return state.equals(toMatch);
+ }
+
+ public static long extractExpireTime(String[] pieces)
+ {
+ if (pieces.length < 3)
+ return 0L;
+ try
+ {
+ return Long.parseLong(pieces[2]);
+ }
+ catch (NumberFormatException e)
+ {
+ logger.debug("Invalid value found for expire time ({}), ignoring",
pieces[2]);
+ return 0L;
+ }
}
public static void runInGossipStageBlocking(Runnable runnable)
@@ -696,10 +725,21 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
{
if (disableEndpointRemoval)
return;
+
+ // Quarantine is only necessary while upgrading from gossip-driven
management of cluster metadata
+ if (getQuarantineDisabled() &&
ClusterMetadata.current().epoch.isAfter(Epoch.UPGRADE_GOSSIP))
+ return;
+
justRemovedEndpoints.put(endpoint, quarantineExpiration);
GossiperDiagnostics.quarantinedEndpoint(this, endpoint,
quarantineExpiration);
}
+ public void clearQuarantinedEndpoints()
+ {
+ logger.info("Clearing quarantined endpoints");
+ justRemovedEndpoints.clear();
+ }
+
/**
* The gossip digest is built based on randomization
* rather than just looping through the collection of live endpoints.
@@ -948,16 +988,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
}
// check for dead state removal
- long expireTime = getExpireTimeForEndpoint(endpoint);
- if (!epState.isAlive() && (now > expireTime)
- && (!metadata.directory.allAddresses().contains(endpoint)))
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("time is expiring for endpoint : {}
({})", endpoint, expireTime);
- }
- runInGossipStageBlocking(() ->
evictFromMembership(endpoint));
- }
+ evictIfExpired(endpoint, epState, metadata.directory, now);
}
}
@@ -975,7 +1006,21 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
}
}
- protected long getExpireTimeForEndpoint(InetAddressAndPort endpoint)
+ @VisibleForTesting
+ void evictIfExpired(InetAddressAndPort endpoint, EndpointState epState,
Directory directory, long now)
+ {
+ if (!epState.isAlive() &&
(!directory.allJoinedEndpoints().contains(endpoint)))
+ {
+ long expireTime = getExpireTimeForEndpoint(endpoint);
+ if (now > expireTime)
+ {
+ logger.info("Reached gossip expiry time for endpoint : {}
({})", endpoint, expireTime);
+ runInGossipStageBlocking(() -> evictFromMembership(endpoint));
+ }
+ }
+ }
+
+ long getExpireTimeForEndpoint(InetAddressAndPort endpoint)
{
/* default expireTime is aVeryLongTime */
Long storedTime = expireTimeEndpointMap.get(endpoint);
@@ -1897,11 +1942,15 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
public void addExpireTimeForEndpoint(InetAddressAndPort endpoint, long
expireTime)
{
- if (logger.isDebugEnabled())
+ if (expireTime == 0L)
+ {
+ logger.debug("Supplied expire time for {} was 0, not recording",
endpoint);
+ }
+ else
{
logger.debug("adding expire time for endpoint : {} ({})",
endpoint, expireTime);
+ expireTimeEndpointMap.put(endpoint, expireTime);
}
- expireTimeEndpointMap.put(endpoint, expireTime);
}
public static long computeExpireTime()
@@ -2104,6 +2153,50 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
MessagingService.instance().send(message, ep);
}
+ public void unsafeBroadcastLeftStatus(InetAddressAndPort left,
+ Collection<Token> tokens,
+ Iterable<InetAddressAndPort> sendTo)
+ {
+ runInGossipStageBlocking(() -> {
+ EndpointState epState = endpointStateMap.get(left);
+ if (epState == null)
+ {
+ logger.info("No gossip state for node {}", left);
+ return;
+ }
+
+ NodeState state =
ClusterMetadata.current().directory.peerState(left);
+ if (state != NodeState.LEFT)
+ {
+ logger.info("Node Status for {} is not LEFT ({})", left,
state);
+ return;
+ }
+
+ EndpointState toSend = new EndpointState(epState);
+ toSend.forceNewerGenerationUnsafe();
+ toSend.markDead();
+ VersionedValue value =
StorageService.instance.valueFactory.left(tokens, computeExpireTime());
+
+ if (left.equals(getBroadcastAddressAndPort()))
+ {
+ // Adding local state bumps the value's version. To keep this
consistent across
+ // the cluster, re-fetch it before broadcasting.
+
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT,
value);
+ value =
Gossiper.instance.endpointStateMap.get(getBroadcastAddressAndPort())
+
.getApplicationState(ApplicationState.STATUS_WITH_PORT);
+ }
+
+ toSend.addApplicationState(ApplicationState.STATUS_WITH_PORT,
value);
+ GossipDigestAck2 payload = new
GossipDigestAck2(Collections.singletonMap(left, toSend));
+ logger.info("Sending app state with status {} to {}", value.value,
sendTo);
+ for (InetAddressAndPort ep : sendTo)
+ {
+ Message<GossipDigestAck2> message =
Message.out(Verb.GOSSIP_DIGEST_ACK2, payload);
+ MessagingService.instance().send(message, ep);
+ }
+ });
+ }
+
private void unsafeUpdateEpStates(InetAddressAndPort endpoint,
EndpointState epstate)
{
checkProperThreadForStateMutation();
@@ -2221,9 +2314,31 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
newValue = valueFactory.hibernate(true);
break;
}
+
if (isLocal &&
!StorageService.instance.shouldJoinRing())
break;
- newValue = GossipHelper.nodeStateToStatus(nodeId,
metadata, tokens, valueFactory, oldValue);
+
+ // If quarantine has been disabled and we have
already seen a LEFT status for a remote peer
+ // which originated from the peer itself or the
node which coordinated its removal (and so
+ // has a version > 0), keep it as this is how we
ensure the gossip expiry time encoded in
+ // the status string converges across peers.
+ // Should a node leave and then rejoin after
resetting its local state (i.e. wipe and
+ // rejoin), it is automatically unregistered which
removes all gossip state for it so there
+ // will be no oldValue in that case.
+ //
+ // Note: don't reorder these conditions as isLeft
includes a null check
+ if (getQuarantineDisabled() && !isLocal &&
Gossiper.isLeft(oldValue) && oldValue.version > 0)
+ {
+ logger.debug("Already seen a LEFT status for
{} with a non-zero version, " +
+ "dropping derived value {}",
endpoint, newValue);
+ newValue = oldValue;
+ }
+ else
+ {
+ newValue =
GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory,
oldValue);
+ if (Gossiper.isLeft(newValue))
+
Gossiper.instance.addExpireTimeForEndpoint(endpoint,
Gossiper.extractExpireTime(newValue.splitValue()));
+ }
break;
default:
newValue = oldValue;
@@ -2269,4 +2384,17 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
sendGossip(message, cms);
}
}
+
+ @Override
+ public boolean getQuarantineDisabled()
+ {
+ return DatabaseDescriptor.getGossipQuarantineDisabled();
+ }
+
+ @Override
+ public void setQuarantineDisabled(boolean enabled)
+ {
+ logger.info("Setting gossip_quarantine_disabled: {}", enabled);
+ DatabaseDescriptor.setGossipQuarantineDisabled(enabled);
+ }
}
diff --git a/src/java/org/apache/cassandra/gms/GossiperMBean.java
b/src/java/org/apache/cassandra/gms/GossiperMBean.java
index 0552883a60..3d46887b0e 100644
--- a/src/java/org/apache/cassandra/gms/GossiperMBean.java
+++ b/src/java/org/apache/cassandra/gms/GossiperMBean.java
@@ -43,4 +43,8 @@ public interface GossiperMBean
public boolean getLooseEmptyEnabled();
public void setLooseEmptyEnabled(boolean enabled);
+
+ public boolean getQuarantineDisabled();
+
+ public void setQuarantineDisabled(boolean disabled);
}
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index b03211b05c..126aecf703 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -159,6 +159,11 @@ public class VersionedValue implements
Comparable<VersionedValue>
return value.getBytes(ISO_8859_1);
}
+ public String[] splitValue()
+ {
+ return value.split(DELIMITER_STR, -1);
+ }
+
private static String versionString(String... args)
{
return StringUtils.join(args, VersionedValue.DELIMITER);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 3dc5adf5a5..e6b051e570 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2158,6 +2158,12 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
Gossiper.instance.markDead(endpoint, epState);
});
}
+ else if (Gossiper.isLeft(value))
+ {
+ long expireTime =
Gossiper.extractExpireTime(value.splitValue());
+ logger.info("Node state LEFT detected, setting or updating
expire time {}", expireTime);
+ Gossiper.instance.addExpireTimeForEndpoint(endpoint,
expireTime);
+ }
}
if (epState == null || Gossiper.instance.isDeadState(epState))
@@ -2207,7 +2213,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
updateNetVersion(endpoint, value);
break;
case STATUS_WITH_PORT:
- String[] pieces = splitValue(value);
+ String[] pieces = value.splitValue();
String moveName = pieces[0];
if (moveName.equals(VersionedValue.SHUTDOWN))
logger.info("Node {} state jump to shutdown",
endpoint);
@@ -2226,11 +2232,6 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
}
}
- private static String[] splitValue(VersionedValue value)
- {
- return value.value.split(VersionedValue.DELIMITER_STR, -1);
- }
-
public static void updateIndexStatus(InetAddressAndPort endpoint,
VersionedValue versionedValue)
{
IndexStatusManager.instance.receivePeerIndexStatus(endpoint,
versionedValue);
diff --git
a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
index fa1fe9d25b..3f77db174f 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
@@ -65,8 +65,7 @@ public class LegacyStateListener implements
ChangeListener.Async
next.tokenMap.lastModified().equals(prev.tokenMap.lastModified()))
return;
- Set<InetAddressAndPort> removedAddr = Sets.difference(new
HashSet<>(prev.directory.allAddresses()),
- new
HashSet<>(next.directory.allAddresses()));
+ Set<InetAddressAndPort> removedAddr =
Sets.difference(prev.directory.allAddresses(), next.directory.allAddresses());
Set<NodeId> changed = new HashSet<>();
for (NodeId node : next.directory.peerIds())
diff --git
a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java
b/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java
index ff24e17b9d..67931af177 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java
@@ -35,5 +35,7 @@ public class UpgradeMigrationListener implements
ChangeListener.Async
logger.info("Detected upgrade from gossip mode, updating my host id in
gossip to {}", next.myNodeId());
Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next);
+ if (Gossiper.instance.getQuarantineDisabled())
+ Gossiper.instance.clearQuarantinedEndpoints();
}
}
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
index 8e73f7f341..ec5f2c4a9d 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Directory.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -26,7 +26,6 @@ import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@@ -385,9 +384,9 @@ public class Directory implements MetadataValue<Directory>
* those cases use allJoinedEndpoints.
* @return
*/
- public ImmutableList<InetAddressAndPort> allAddresses()
+ public Set<InetAddressAndPort> allAddresses()
{
- return ImmutableList.copyOf(peers.values());
+ return peers.values();
}
public NavigableSet<NodeId> peerIds()
diff --git
a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
index 58c5f024f9..9da95b8aff 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.tcm.sequences;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -38,6 +39,7 @@ import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.PrepareMove;
+import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.service.StorageService.Mode.LEAVING;
import static org.apache.cassandra.service.StorageService.Mode.MOVE_FAILED;
@@ -73,7 +75,7 @@ public interface SingleNodeSequences
logger.debug("DECOMMISSIONING");
NodeId self = metadata.myNodeId();
-
+ Collection<Token> tokens = metadata.tokenMap.tokens(self);
ReconfigureCMS.maybeReconfigureCMS(metadata,
getBroadcastAddressAndPort());
MultiStepOperation<?> inProgress =
metadata.inProgressSequences.get(self);
@@ -95,6 +97,9 @@ public interface SingleNodeSequences
}
InProgressSequences.finishInProgressSequences(self);
+
Gossiper.instance.unsafeBroadcastLeftStatus(FBUtilities.getBroadcastAddressAndPort(),
+ tokens,
+
metadata.directory.allJoinedEndpoints());
if (shutdownNetworking)
StorageService.instance.shutdownNetworking();
}
@@ -134,12 +139,13 @@ public interface SingleNodeSequences
ReconfigureCMS.maybeReconfigureCMS(metadata, endpoint);
logger.info("starting removenode with {} {}", metadata.epoch,
toRemove);
-
+ Collection<Token> tokens = metadata.tokenMap.tokens(toRemove);
ClusterMetadataService.instance().commit(new PrepareLeave(toRemove,
force,
ClusterMetadataService.instance().placementProvider(),
LeaveStreams.Kind.REMOVENODE));
InProgressSequences.finishInProgressSequences(toRemove);
+ Gossiper.instance.unsafeBroadcastLeftStatus(endpoint, tokens,
metadata.directory.allJoinedEndpoints());
}
static void abortRemoveNode(String nodeId)
diff --git a/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java
b/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java
index 76bca253ee..f1bb6d50d3 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java
@@ -18,8 +18,12 @@
package org.apache.cassandra.tcm.transformations;
+import java.util.Collection;
+
import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
@@ -64,8 +68,10 @@ public class Assassinate extends PrepareLeave
ReconfigureCMS.maybeReconfigureCMS(metadata, endpoint);
NodeId nodeId = metadata.directory.peerId(endpoint);
- ClusterMetadataService.instance().commit(new Assassinate(nodeId,
-
ClusterMetadataService.instance().placementProvider()));
+ Collection<Token> tokens = metadata.tokenMap.tokens(nodeId);
+ ClusterMetadataService.instance()
+ .commit(new Assassinate(nodeId,
ClusterMetadataService.instance().placementProvider()));
+ Gossiper.instance.unsafeBroadcastLeftStatus(endpoint, tokens,
metadata.directory.allJoinedEndpoints());
}
@Override
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index d6479e1813..fedc1a7c55 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -46,7 +46,6 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import accord.topology.EpochReady;
@@ -805,7 +804,7 @@ public class ClusterUtils
public static Map<String, Epoch> getPeerEpochs(IInvokableInstance
requester)
{
Map<String, Long> map = requester.callOnInstance(() -> {
- ImmutableList<InetAddressAndPort> peers =
ClusterMetadata.current().directory.allAddresses();
+ Set<InetAddressAndPort> peers =
ClusterMetadata.current().directory.allAddresses();
CountDownLatch latch =
CountDownLatch.newCountDownLatch(peers.size());
Map<String, Long> epochs = new ConcurrentHashMap<>(peers.size());
peers.forEach(peer -> {
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
index a08ba58a06..15c532422a 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@ -105,13 +105,17 @@ public class CASTest extends CASCommonTestCases
ParameterizedClass seeds = new
ParameterizedClass(SimpleSeedProvider.class.getName(),
Collections.singletonMap("seeds", "127.0.0.2"));
+ // TODO: This currently requires the accord service to be disabled as
some tests remove a node from the
+ // cluster and re-join it using the same broadcast address. See
CASSANDRA-21026
Consumer<IInstanceConfig> conf = config -> config
.set("paxos_variant", "v2")
.set("write_request_timeout", REQUEST_TIMEOUT)
.set("cas_contention_timeout", CONTENTION_TIMEOUT)
.set("request_timeout",
REQUEST_TIMEOUT)
.set("seed_provider", seeds)
- .set("auto_bootstrap",
config.num() == 2);
+ .set("auto_bootstrap",
config.num() == 2)
+ .set("accord.enabled",
false);
+
// TODO: fails with vnode enabled
THREE_NODES =
init(Cluster.build(3).withConfig(conf).withoutVNodes().start());
FOUR_NODES =
init(Cluster.build(4).withConfig(conf).withoutVNodes().start(), 3);
@@ -830,18 +834,28 @@ public class CASTest extends CASCommonTestCases
{
IInstanceConfig config = cluster.get(node).config();
InetAddressAndPort address =
InetAddressAndPort.getByAddress(config.broadcastAddress());
+ String dc = config.localDatacenter();
+ String rack = config.localRack();
IPartitioner partitioner =
FBUtilities.newPartitioner(config.getString("partitioner"));
Token token =
partitioner.getTokenFactory().fromString(config.getString("initial_token"));
- cluster.get(node).runOnInstance(() ->
ClusterMetadataTestHelper.join(address, token));
+ cluster.get(node).runOnInstance(() -> {
+ ClusterMetadataTestHelper.register(address, dc, rack);
+ ClusterMetadataTestHelper.join(address, token);
+ });
}
private void joinPartially(Cluster cluster, int node)
{
IInstanceConfig config = cluster.get(node).config();
InetAddressAndPort address =
InetAddressAndPort.getByAddress(config.broadcastAddress());
+ String dc = config.localDatacenter();
+ String rack = config.localRack();
IPartitioner partitioner =
FBUtilities.newPartitioner(config.getString("partitioner"));
Token token =
partitioner.getTokenFactory().fromString(config.getString("initial_token"));
- cluster.get(node).runOnInstance(() ->
ClusterMetadataTestHelper.joinPartially(address, token));
+ cluster.get(node).runOnInstance(() -> {
+ ClusterMetadataTestHelper.register(address, dc, rack);
+ ClusterMetadataTestHelper.joinPartially(address, token);
+ });
}
private void finishJoin(Cluster cluster, int node)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
index 1c441ded3e..715a091dca 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
@@ -193,7 +193,10 @@ public abstract class CASTestBase extends TestBaseImpl
public static void removeFromRing(IInvokableInstance peer)
{
- peer.runOnInstance(() ->
ClusterMetadataTestHelper.leave(FBUtilities.getBroadcastAddressAndPort()));
+ peer.runOnInstance(() -> {
+
ClusterMetadataTestHelper.leave(FBUtilities.getBroadcastAddressAndPort());
+
ClusterMetadataTestHelper.unregister(FBUtilities.getBroadcastAddressAndPort());
+ });
}
public static void assertNotVisibleInRing(IInstance peer)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index 9c52ac92a9..cb5cb1f470 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@ -251,7 +251,7 @@ public class GossipTest extends TestBaseImpl
}
@Test
- public void testQuarantine() throws IOException
+ public void testReplacedNodeRemovedFromGossip() throws IOException
{
TokenSupplier even = TokenSupplier.evenlyDistributedTokens(4, 1);
try (Cluster cluster = Cluster.build(4)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterAssassinateTest.java
b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterAssassinateTest.java
new file mode 100644
index 0000000000..4bcae67a2a
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterAssassinateTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.gossip;
+
+import java.net.InetSocketAddress;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+public class GossipExpiryAfterAssassinateTest extends GossipExpiryTestBase
+{
+ @Override
+ void doRemoval(Cluster cluster, IInvokableInstance toRemove)
+ {
+ // Shut down one peer, then have another assassinate it. The
coordinating node will gossip a final LEFT status,
+ // including the expiry time it calculated to the remaining members.
+ IInvokableInstance coordinator = cluster.get(1);
+ if (coordinator.equals(toRemove))
+ throw new IllegalArgumentException("Node cannot assassinate
itself");
+
+ try
+ {
+ InetSocketAddress toAssassinate = toRemove.broadcastAddress();
+ toRemove.shutdown().get();
+ coordinator.nodetoolResult("assassinate",
toAssassinate.getHostString()).asserts().success();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java
b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterDecommissionTest.java
similarity index 52%
copy from
src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java
copy to
test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterDecommissionTest.java
index ff24e17b9d..5b3f4e09fd 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterDecommissionTest.java
@@ -16,24 +16,18 @@
* limitations under the License.
*/
-package org.apache.cassandra.tcm.listeners;
+package org.apache.cassandra.distributed.test.gossip;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.Epoch;
-
-public class UpgradeMigrationListener implements ChangeListener.Async
+public class GossipExpiryAfterDecommissionTest extends GossipExpiryTestBase
{
- private static final Logger logger =
LoggerFactory.getLogger(UpgradeMigrationListener.class);
- public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next,
boolean fromSnapshot)
+ @Override
+ void doRemoval(Cluster cluster, IInvokableInstance toRemove)
{
- if (!prev.epoch.equals(Epoch.UPGRADE_GOSSIP))
- return;
-
- logger.info("Detected upgrade from gossip mode, updating my host id in
gossip to {}", next.myNodeId());
- Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next);
+ // Decommission one peer. Before shutting down messaging, the leaving
node will gossip its final LEFT
+ // status, including the expiry time it calculated to the remaining
members.
+ toRemove.nodetoolResult("decommission").asserts().success();
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterRemoveNodeTest.java
b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterRemoveNodeTest.java
new file mode 100644
index 0000000000..c9def985dd
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterRemoveNodeTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.gossip;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+public class GossipExpiryAfterRemoveNodeTest extends GossipExpiryTestBase
+{
+ @Override
+ void doRemoval(Cluster cluster, IInvokableInstance toRemove)
+ {
+ // Shut down one peer, then have another remove it. The coordinating
node will gossip a final LEFT status,
+ // including the expiry time it calculated to the remaining members.
+ IInvokableInstance coordinator = cluster.get(1);
+ if (coordinator.equals(toRemove))
+ throw new IllegalArgumentException("Node to be removed cannot act
as removal coordinator");
+
+ try
+ {
+ String nodeId = toRemove.callOnInstance(() ->
ClusterMetadata.current().myNodeId().toUUID().toString());
+ toRemove.shutdown().get();
+ coordinator.nodetoolResult("removenode",
nodeId).asserts().success();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryTestBase.java
new file mode 100644
index 0000000000..51d340a70c
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryTestBase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.gossip;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.WithProperties;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.gms.GossipExpiryHelper;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.listeners.LegacyStateListener;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.awaitility.Awaitility;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.VERY_LONG_TIME_MS;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.gms.ApplicationState.STATUS_WITH_PORT;
+import static org.apache.cassandra.gms.VersionedValue.STATUS_LEFT;
+
+public abstract class GossipExpiryTestBase extends TestBaseImpl
+{
+ private static final Logger logger =
LoggerFactory.getLogger(GossipExpiryTestBase.class);
+
+ abstract void doRemoval(Cluster cluster, IInvokableInstance toRemove);
+
+ @Test
+ public void testExpiryOfLeftStateWithoutQuarantine() throws IOException
+ {
+ doTest(true);
+ }
+
+ @Test
+ public void testExpiryOfLeftStateWithQuarantine() throws IOException
+ {
+ doTest(false);
+ }
+
+ private void doTest(boolean withQuarantineDisabled) throws IOException
+ {
+ // This test verifies that when a node leaves the cluster, the expiry
time for its state in gossip is
+ // recorded on each node and then expunged when that deadline is
reached. By default, the expiry time for
+ // a left peer is calculated on each node independently, but if the
gossip_quarantine_disabled config
+ // option is set to true it will converge and become consistent across
the remaining members.
+ //
+ // * First we set the property that controls expiry time to 10s. This
interval is added to the current wall
+ // clock time to calculate the expiry deadline.
+ // * Use bytebuddy to inject some jitter into the local expiry time
calculation. Each node will individually
+ // calculate an expiry based on when it processes the completion of
the operation that removes the node. If
+ // the config option is set the cluster should converge on the
expiry time calculated by the node coordinating
+ // the operation. For decommission, this will be the leaving node
itself and for removenode/assassinate it
+ // will be the coordinator. The jitter is to make sure that in the
test, the nodes start off with differing
+ // expiry times.
+ // * Remove one peer via decommission, removenode or assassinate.
+ // * After maybe verifying the convergence, check that the state for
the left node does in fact get removed.
+ try (WithProperties ignored = new
WithProperties().set(VERY_LONG_TIME_MS, 11000);
+ Cluster cluster = builder().withNodes(5)
+
.withInstanceInitializer(GossipExpiryTestBase.BB::install)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP)
+
.set("gossip_quarantine_disabled", withQuarantineDisabled))
+ .start())
+ {
+ cluster.forEach(i -> i.runOnInstance(() ->
BB.injectDelay.set(true)));
+
+ IInvokableInstance toRemove = cluster.get(5);
+ String gossipStateKey =
toRemove.config().broadcastAddress().getAddress().toString();
+ doRemoval(cluster, toRemove);
+ if (withQuarantineDisabled)
+ {
+ // STATUS_WITH_PORT for the left node should converge to share
the same expiry time across all nodes
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(() -> {
+ Set<String> endpointStates = new HashSet<>();
+ cluster.forEach(i -> {
+ if (!i.equals(toRemove))
+ {
+ Map<String, String> instanceState =
ClusterUtils.gossipInfo(i).get(gossipStateKey);
+ if (instanceState != null &&
instanceState.containsKey(STATUS_WITH_PORT.name()))
+
endpointStates.add(instanceState.get(STATUS_WITH_PORT.name()));
+ }
+ });
+ logger.info("Collected STATUS_WITH_PORT values:
{}", endpointStates);
+ return endpointStates.size() == 1 &&
endpointStates.iterator()
+
.next()
+
.contains(STATUS_LEFT);
+ });
+ }
+
+ // Once the expiry time is reached, gossip state for the left node
is purged
+ Awaitility.waitAtMost(30, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(() -> {
+ AtomicBoolean purged = new AtomicBoolean(true);
+ cluster.forEach(i -> {
+ if (!i.equals(toRemove))
+ {
+ // Expiry happens during periodic gossip
tasks. Sometimes these may not run in a
+ // timely fashion, so for tests we can
trigger the status check artificially.
+
i.runOnInstance(GossipExpiryHelper.evictExpiredFromGossip(toRemove));
+ if
(ClusterUtils.gossipInfo(i).containsKey(gossipStateKey))
+ purged.set(false);
+ }
+ });
+ return purged.get();
+ });
+ }
+ }
+
+ public static class BB
+ {
+ static void install(ClassLoader cl, int nodeNumber)
+ {
+ if (nodeNumber != 2)
+ return;
+ new ByteBuddy().rebase(LegacyStateListener.class)
+ .method(named("processChangesToRemotePeers"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ static AtomicBoolean injectDelay = new AtomicBoolean(false);
+ static Random random = new Random(System.nanoTime());
+
+ public static void processChangesToRemotePeers(ClusterMetadata prev,
+ ClusterMetadata next,
+ Set<NodeId> changed,
+ @SuperCall
Callable<Void> zuper) throws Exception
+ {
+ if (injectDelay.get())
+ TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
+ zuper.call();
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index 43c147c255..cbdcc22394 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.distributed.test.log;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
@@ -83,9 +84,9 @@ import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
import org.apache.cassandra.tcm.sequences.InProgressSequences;
+import org.apache.cassandra.tcm.sequences.LeaveStreams;
import org.apache.cassandra.tcm.sequences.LockedRanges;
import org.apache.cassandra.tcm.sequences.Move;
-import org.apache.cassandra.tcm.sequences.LeaveStreams;
import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
import org.apache.cassandra.tcm.transformations.AlterSchema;
@@ -94,6 +95,7 @@ import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.PrepareMove;
import org.apache.cassandra.tcm.transformations.PrepareReplace;
import org.apache.cassandra.tcm.transformations.Register;
+import org.apache.cassandra.tcm.transformations.Unregister;
import org.apache.cassandra.tcm.transformations.cms.AdvanceCMSReconfiguration;
import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -101,6 +103,10 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Throwables;
import static org.apache.cassandra.schema.SchemaTestUtil.submit;
+import static org.apache.cassandra.tcm.membership.NodeState.BOOTSTRAPPING;
+import static org.apache.cassandra.tcm.membership.NodeState.BOOT_REPLACING;
+import static org.apache.cassandra.tcm.membership.NodeState.LEFT;
+import static org.apache.cassandra.tcm.membership.NodeState.REGISTERED;
import static org.junit.Assert.assertEquals;
public class ClusterMetadataTestHelper
@@ -427,6 +433,25 @@ public class ClusterMetadataTestHelper
}
}
+ public static void unregister(InetAddressAndPort endpoint)
+ {
+ unregister(nodeId(endpoint));
+ }
+
+ public static void unregister(NodeId nodeId)
+ {
+ try
+ {
+ commit(new Unregister(nodeId,
+ EnumSet.of(REGISTERED, BOOTSTRAPPING,
BOOT_REPLACING, LEFT),
+
ClusterMetadataService.instance().placementProvider()));
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
public static JoinProcess lazyJoin(int nodeIdx, long token)
{
return lazyJoin(addr(nodeIdx), Collections.singleton(new
Murmur3Partitioner.LongToken(token)));
diff --git a/test/distributed/org/apache/cassandra/gms/GossipExpiryHelper.java
b/test/distributed/org/apache/cassandra/gms/GossipExpiryHelper.java
new file mode 100644
index 0000000000..ed808a6bcb
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/gms/GossipExpiryHelper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Directory;
+
+import static
org.apache.cassandra.distributed.impl.TestEndpointCache.toCassandraInetAddressAndPort;
+
+public class GossipExpiryHelper
+{
+ public static IIsolatedExecutor.SerializableRunnable
evictExpiredFromGossip(IInvokableInstance instance)
+ {
+ InetSocketAddress address = instance.config().broadcastAddress();
+ return () -> {
+ Logger logger = LoggerFactory.getLogger(Gossiper.class);
+ Directory directory = ClusterMetadata.current().directory;
+ long now = System.currentTimeMillis();
+ InetAddressAndPort endpoint =
toCassandraInetAddressAndPort(address);
+ EndpointState epState =
Gossiper.instance.endpointStateMap.get(endpoint);
+ if (epState == null)
+ {
+ logger.info("Test helper found no gossip state for endpoint
{}", endpoint);
+ return;
+ }
+ logger.info("Test helper triggering expiry check at {} for {}
(joined: {}, alive: {})",
+ now,
+ endpoint,
+ directory.allJoinedEndpoints().contains(endpoint),
+ epState.isAlive());
+ FailureDetector.instance.forceConviction(endpoint);
+ Gossiper.instance.evictIfExpired(endpoint, epState, directory,
now);
+ };
+ }
+}
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java
b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index d254850a36..1dfe194b22 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.SeedProvider;
import org.apache.cassandra.tcm.ClusterMetadataService;
@@ -122,6 +123,8 @@ public class GossiperTest
public void testLargeGenerationJump() throws UnknownHostException,
InterruptedException
{
Util.initGossipTokens(partitioner, endpointTokens, hosts, hostIds, 2);
+ for (InetAddressAndPort host : hosts)
+ ClusterMetadataTestHelper.register(host);
try
{
InetAddressAndPort remoteHostAddress = hosts.get(1);
@@ -169,6 +172,8 @@ public class GossiperTest
SimpleStateChangeListener stateChangeListener = null;
Util.initGossipTokens(partitioner, endpointTokens, hosts, hostIds, 2);
+ for (InetAddressAndPort host : hosts)
+ ClusterMetadataTestHelper.register(host);
try
{
InetAddressAndPort remoteHostAddress = hosts.get(1);
@@ -329,6 +334,9 @@ public class GossiperTest
new
VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
Util.initGossipTokens(partitioner, endpointTokens, hosts, hostIds, 2);
+ for (InetAddressAndPort host : hosts)
+ ClusterMetadataTestHelper.register(host);
+
SimpleStateChangeListener stateChangeListener = null;
try
{
diff --git a/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java
b/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java
index aebe443529..91e1f1e1c3 100644
--- a/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java
+++ b/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.tcm.membership;
import java.net.UnknownHostException;
import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -36,6 +38,15 @@ public class MembershipUtils
return endpoint(random.nextInt(254) + 1);
}
+ public static Set<InetAddressAndPort> uniqueEndpoints(Random random, int
count)
+ {
+ return random.ints(1, 255)
+ .distinct()
+ .limit(count)
+ .mapToObj(MembershipUtils::endpoint)
+ .collect(Collectors.toSet());
+ }
+
public static InetAddressAndPort endpoint(int i)
{
return endpoint((byte)i);
diff --git
a/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java
b/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java
index 33ccb137ae..9cd8ee7746 100644
--- a/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java
+++ b/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.transformations;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Random;
import java.util.Set;
@@ -36,16 +37,18 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.ownership.OwnershipUtils;
-import static
org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses;
+import static
org.apache.cassandra.tcm.membership.MembershipUtils.uniqueEndpoints;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -116,9 +119,10 @@ public class PrepareJoinTest
other = new NodeId(1);
joining = new NodeId(2);
Location location = new Location("dc", "rack");
- Directory directory = new Directory().unsafeWithNodeForTesting(other,
nodeAddresses(random), location, NodeVersion.CURRENT)
+ Iterator<InetAddressAndPort> endpoints = uniqueEndpoints(random,
2).iterator();
+ Directory directory = new Directory().unsafeWithNodeForTesting(other,
new NodeAddresses(endpoints.next()), location, NodeVersion.CURRENT)
.withNodeState(other,
NodeState.JOINED)
-
.unsafeWithNodeForTesting(joining, nodeAddresses(random), location,
NodeVersion.CURRENT)
+
.unsafeWithNodeForTesting(joining, new NodeAddresses(endpoints.next()),
location, NodeVersion.CURRENT)
.withNodeState(joining,
NodeState.REGISTERED);
Set<Token> ownedTokens = OwnershipUtils.randomTokens(16, partitioner,
random);
return ClusterMetadataTestHelper.minimalForTesting(partitioner)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]