This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f05b27502f Improve CMS initialization
f05b27502f is described below
commit f05b27502f665cb1ae32169a42e4c221ec01d02b
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Nov 20 15:52:34 2025 +0000
Improve CMS initialization
* Better handling of DOWN unupgraded nodes
* Test that aborted CMS initialization cleans up state properly
* Clean up orphaned PreInitialize entries in the log on bounce
* Unconditionally reset initiator during abort
* Ensure that metadata log entries aren't exchanged before CMS
initialization is complete
* Precalculate common serialization version, excluding non-upgraded and
LEFT nodes
* Decide if metadata-impacting upgrade is in progress using min common
version
* Add metadata identifier to nodetool cms output
Patch by Sam Tunnicliffe and Marcus Eriksson; reviewed by Sam
Tunnicliffe and Marcus Eriksson for CASSANDRA-21036
Co-authored-by: Marcus Eriksson <[email protected]>
Co-authored-by: Sam Tunnicliffe <[email protected]>
---
CHANGES.txt | 1 +
.../cassandra/db/AbstractMutationVerbHandler.java | 2 +-
.../apache/cassandra/gms/GossipVerbHandler.java | 4 +-
.../apache/cassandra/net/ResponseVerbHandler.java | 4 +
.../org/apache/cassandra/tcm/CMSOperations.java | 2 +
.../org/apache/cassandra/tcm/ClusterMetadata.java | 2 +-
src/java/org/apache/cassandra/tcm/Startup.java | 5 +-
.../cassandra/tcm/log/SystemKeyspaceStorage.java | 19 +-
.../apache/cassandra/tcm/membership/Directory.java | 20 ++
.../cassandra/tcm/membership/NodeVersion.java | 17 +-
.../apache/cassandra/tcm/migration/Election.java | 23 +--
.../tcm/serialization/MessageSerializers.java | 26 +--
.../cassandra/tcm/serialization/Version.java | 24 ++-
.../tcm/transformations/cms/Initialize.java | 6 +
.../apache/cassandra/tools/nodetool/CMSAdmin.java | 2 +
...erMetadataUpgradeCleanupPreInitializeTest.java} | 44 ++++-
...lusterMetadataUpgradeDelayedInitializeTest.java | 219 +++++++++++++++++++++
.../ClusterMetadataUpgradeIgnoreHostsTest.java | 107 ++++++++++
...lusterMetadataUpgradeUnexpectedFailureTest.java | 110 +++++++++++
.../cassandra/tcm/membership/DirectoryTest.java | 77 ++++++++
.../cassandra/tcm/membership/NodeVersionTest.java | 49 +++++
21 files changed, 716 insertions(+), 47 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 06b33e9846..b0b278a751 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Improve CMS initialization (CASSANDRA-21036)
* Introducing comments and security labels for schema elements
(CASSANDRA-20943)
* Extend nodetool tablestats for dictionary memory usage (CASSANDRA-20940)
* Introduce separate GCInspector thresholds for concurrent GC events
(CASSANDRA-20980)
diff --git a/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
b/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
index 05512e5a8e..5d2e4e3cdc 100644
--- a/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
@@ -56,7 +56,7 @@ public abstract class AbstractMutationVerbHandler<T extends
IMutation> implement
protected void processMessage(Message<T> message, InetAddressAndPort
respondTo)
{
- if (message.epoch().isAfter(Epoch.EMPTY))
+ if (message.epoch().isAfter(Epoch.FIRST))
{
ClusterMetadata metadata = ClusterMetadata.current();
metadata = checkTokenOwnership(metadata, message, respondTo);
diff --git a/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
b/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
index bac0854d07..bf51b83a01 100644
--- a/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
@@ -21,12 +21,14 @@ package org.apache.cassandra.gms;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
public class GossipVerbHandler<T> implements IVerbHandler<T>
{
public void doVerb(Message<T> message)
{
Gossiper.instance.setLastProcessedMessageAt(message.creationTimeMillis());
-
ClusterMetadataService.instance().fetchLogFromPeerAsync(message.from(),
message.epoch());
+ if (message.epoch().isAfter(Epoch.FIRST))
+
ClusterMetadataService.instance().fetchLogFromPeerAsync(message.from(),
message.epoch());
}
}
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 517a10fd2a..2550ca2929 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
@@ -92,6 +93,9 @@ class ResponseVerbHandler implements IVerbHandler
private void maybeFetchLogs(Message<?> message)
{
ClusterMetadata metadata = ClusterMetadata.current();
+ if (!message.epoch().isAfter(Epoch.FIRST))
+ return;
+
if (!message.epoch().isAfter(metadata.epoch))
return;
diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java
b/src/java/org/apache/cassandra/tcm/CMSOperations.java
index a0917584d9..e439b1b9f0 100644
--- a/src/java/org/apache/cassandra/tcm/CMSOperations.java
+++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java
@@ -63,6 +63,7 @@ public class CMSOperations implements CMSOperationsMBean
public static final String LOCAL_PENDING = "LOCAL_PENDING";
public static final String COMMITS_PAUSED = "COMMITS_PAUSED";
public static final String REPLICATION_FACTOR = "REPLICATION_FACTOR";
+ public static final String CMS_ID = "CMS_ID";
private static final Logger logger =
LoggerFactory.getLogger(ClusterMetadataService.class);
public static CMSOperations instance = new
CMSOperations(ClusterMetadataService.instance());
@@ -161,6 +162,7 @@ public class CMSOperations implements CMSOperationsMBean
info.put(LOCAL_PENDING,
Integer.toString(cms.log().pendingBufferSize()));
info.put(COMMITS_PAUSED, Boolean.toString(cms.commitsPaused()));
info.put(REPLICATION_FACTOR,
ReplicationParams.meta(metadata).toString());
+ info.put(CMS_ID, Integer.toString(metadata.metadataIdentifier));
return info;
}
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index 6f4fac9f13..65cb869faf 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -1004,7 +1004,7 @@ public class ClusterMetadata
public boolean metadataSerializationUpgradeInProgress()
{
- return
!directory.clusterMaxVersion.serializationVersion().equals(directory.clusterMinVersion.serializationVersion());
+ return
!directory.clusterMaxVersion.serializationVersion().equals(directory.commonSerializationVersion);
}
public static class Serializer implements
MetadataSerializer<ClusterMetadata>
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java
b/src/java/org/apache/cassandra/tcm/Startup.java
index dab2d8387c..446c52a02e 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -58,8 +58,8 @@ import org.apache.cassandra.tcm.log.LogStorage;
import org.apache.cassandra.tcm.log.SystemKeyspaceStorage;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
-import org.apache.cassandra.tcm.migration.Election;
import org.apache.cassandra.tcm.migration.CMSInitializationRequest;
+import org.apache.cassandra.tcm.migration.Election;
import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
import org.apache.cassandra.tcm.sequences.InProgressSequences;
import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
@@ -77,7 +77,7 @@ import static
org.apache.cassandra.tcm.membership.NodeState.JOINED;
import static org.apache.cassandra.tcm.membership.NodeState.LEFT;
import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
- /**
+/**
* Initialize
*/
public class Startup
@@ -141,7 +141,6 @@ import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
ClusterMetadataService.instance().log().bootstrap(addr, datacenter);
ClusterMetadata metadata = ClusterMetadata.current();
assert ClusterMetadataService.state() == LOCAL : String.format("Can't
initialize as node hasn't transitioned to CMS state. State: %s.\n%s",
ClusterMetadataService.state(), metadata);
-
Initialize initialize = new
Initialize(metadata.initializeClusterIdentifier(addr.hashCode()));
ClusterMetadataService.instance().commit(initialize);
}
diff --git a/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java
b/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java
index a0f6263e1c..92f4b78903 100644
--- a/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java
+++ b/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java
@@ -87,12 +87,25 @@ public class SystemKeyspaceStorage implements LogStorage
public synchronized static boolean hasAnyEpoch()
{
- String query = String.format("SELECT epoch FROM %s.%s LIMIT 1",
SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME);
+ String query = String.format("SELECT epoch, kind FROM %s.%s LIMIT 2",
SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME);
+ int count = 0;
+ long preInitializeEpoch = -1;
for (UntypedResultSet.Row row : executeInternal(query))
- return true;
+ {
+ count++;
+ if (Transformation.Kind.fromId(row.getInt("kind")) ==
Transformation.Kind.PRE_INITIALIZE_CMS)
+ preInitializeEpoch = row.getLong("epoch");
+ }
+ if (count == 1 && preInitializeEpoch != -1)
+ {
+ logger.warn("Cleaning up orphaned PreInitialize at epoch {} -
restarting in gossip mode", preInitializeEpoch);
+ String cleanupQuery = String.format("DELETE from %s.%s where epoch
= %d", SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME, preInitializeEpoch);
+ executeInternal(cleanupQuery);
+ return false;
+ }
- return false;
+ return count > 0;
}
@Override
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
index 0f6cf6eb14..b7d8684976 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Directory.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -73,6 +73,7 @@ public class Directory implements MetadataValue<Directory>
private final BTreeMap<String, Multimap<String, InetAddressAndPort>>
racksByDC;
public final NodeVersion clusterMinVersion;
public final NodeVersion clusterMaxVersion;
+ public final Version commonSerializationVersion;
public Directory()
{
@@ -115,6 +116,7 @@ public class Directory implements MetadataValue<Directory>
Pair<NodeVersion, NodeVersion> minMaxVer = minMaxVersions(states,
versions);
clusterMinVersion = minMaxVer.left;
clusterMaxVersion = minMaxVer.right;
+ commonSerializationVersion = minCommonSerializationVersion(states,
versions);
}
@Override
@@ -131,6 +133,9 @@ public class Directory implements MetadataValue<Directory>
", hostIds=" + hostIds +
", endpointsByDC=" + endpointsByDC +
", racksByDC=" + racksByDC +
+ ", clusterMinVersion=" + clusterMinVersion +
+ ", clusterMaxVersion=" + clusterMaxVersion +
+ ", commonSerializationVersion=" + commonSerializationVersion +
'}';
}
@@ -778,6 +783,21 @@ public class Directory implements MetadataValue<Directory>
return Pair.create(minVersion, maxVersion);
}
+ public static Version minCommonSerializationVersion(BTreeMap<NodeId,
NodeState> states, BTreeMap<NodeId, NodeVersion> versions)
+ {
+ int commonVersion = Integer.MAX_VALUE;
+ for (Map.Entry<NodeId, NodeState> entry : states.entrySet())
+ {
+ if (entry.getValue() != NodeState.LEFT)
+ {
+ NodeVersion ver = versions.get(entry.getKey());
+ if (ver.serializationVersion > Version.OLD.asInt() &&
ver.serializationVersion < commonVersion)
+ commonVersion = ver.serializationVersion;
+ }
+ }
+ return commonVersion == Integer.MAX_VALUE ?
NodeVersion.CURRENT_METADATA_VERSION : Version.fromInt(commonVersion);
+ }
+
@Override
public int hashCode()
{
diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
index 9a15951ed0..33d60da6fe 100644
--- a/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
+++ b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.membership;
import java.io.IOException;
import java.util.Objects;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
@@ -39,6 +40,8 @@ public class NodeVersion implements Comparable<NodeVersion>
private static final CassandraVersion SINCE_VERSION =
CassandraVersion.CASSANDRA_5_1;
public final CassandraVersion cassandraVersion;
+
+ // this must be kept as an int, otherwise we can't deserialize future
NodeVersions
public final int serializationVersion;
public NodeVersion(CassandraVersion cassandraVersion, Version
serializationVersion)
@@ -113,10 +116,9 @@ public class NodeVersion implements Comparable<NodeVersion>
@Override
public void serialize(NodeVersion t, DataOutputPlus out, Version
version) throws IOException
{
- out.writeUTF(t.cassandraVersion.toString());
if (t.serializationVersion == Version.UNKNOWN.asInt())
throw new IllegalStateException("Should not serialize UNKNOWN
version");
- out.writeUnsignedVInt32(t.serializationVersion);
+ serializeHelper(out, t.cassandraVersion.toString(),
t.serializationVersion);
}
@Override
@@ -133,5 +135,16 @@ public class NodeVersion implements Comparable<NodeVersion>
return sizeof(t.cassandraVersion.toString()) +
sizeofUnsignedVInt(t.serializationVersion);
}
+ /**
+ * Used to be able to generate a NodeVersion with a future
serialization version, for test!
+ */
+ @VisibleForTesting
+ static void serializeHelper(DataOutputPlus out, String
cassandraVersion, int serializationVersion) throws IOException
+ {
+ out.writeUTF(cassandraVersion);
+ out.writeUnsignedVInt32(serializationVersion);
+ }
}
+
+
}
diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java
b/src/java/org/apache/cassandra/tcm/migration/Election.java
index 6ada116323..ee8bb36af5 100644
--- a/src/java/org/apache/cassandra/tcm/migration/Election.java
+++ b/src/java/org/apache/cassandra/tcm/migration/Election.java
@@ -89,25 +89,24 @@ public class Election
Set<InetAddressAndPort> sendTo = new HashSet<>(candidates);
sendTo.removeAll(ignoredEndpoints);
sendTo.remove(FBUtilities.getBroadcastAddressAndPort());
-
+ CMSInitializationRequest initializationRequest = new
CMSInitializationRequest(FBUtilities.getBroadcastAddressAndPort(),
UUID.randomUUID(), metadata);
+ if (!updateInitiator(null, initializationRequest.initiator))
+ throw new IllegalStateException("Migration already initiated by "
+ initiator.get());
try
{
- initiate(sendTo, metadata, verifyAllPeersMetadata);
+ initiate(initializationRequest, sendTo, metadata,
verifyAllPeersMetadata);
finish(sendTo);
}
- catch (Exception e)
+ catch (Throwable e)
{
- abort(sendTo);
+ logger.error("Got error nominating self", e);
+ abort(initializationRequest.initiator, sendTo);
throw e;
}
}
- private void initiate(Set<InetAddressAndPort> sendTo, ClusterMetadata
metadata, boolean verifyAllPeersMetadata)
+ private void initiate(CMSInitializationRequest initializationRequest,
Set<InetAddressAndPort> sendTo, ClusterMetadata metadata, boolean
verifyAllPeersMetadata)
{
- CMSInitializationRequest initializationRequest = new
CMSInitializationRequest(FBUtilities.getBroadcastAddressAndPort(),
UUID.randomUUID(), metadata);
- if (!updateInitiator(null, initializationRequest.initiator))
- throw new IllegalStateException("Migration already initiated by "
+ initiator.get());
-
logger.info("No previous migration detected, initiating");
Collection<Pair<InetAddressAndPort, CMSInitializationResponse>>
metadatas = MessageDelivery.fanoutAndWait(messaging, sendTo,
Verb.TCM_INIT_MIG_REQ, initializationRequest);
if (metadatas.size() != sendTo.size())
@@ -149,9 +148,11 @@ public class Election
}
}
- private void abort(Set<InetAddressAndPort> sendTo)
+ private void abort(CMSInitializationRequest.Initiator init,
Set<InetAddressAndPort> sendTo)
{
- CMSInitializationRequest.Initiator init = initiator.getAndSet(null);
+ logger.info("Aborting migration");
+ CMSInitializationRequest.Initiator previous =
initiator.getAndSet(null);
+ logger.info("Reset local initiator state (was {}), sending abort
message to peers", previous);
for (InetAddressAndPort ep : sendTo)
messaging.send(Message.out(Verb.TCM_ABORT_MIG, init), ep);
}
diff --git
a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
index deef0a4b1b..63fcfc8180 100644
--- a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
+++ b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
@@ -35,47 +35,47 @@ import
org.apache.cassandra.tcm.migration.CMSInitializationRequest;
* NOTE: Serialization version here is used for convenience of serializing the
message
* on the outgoing path. Since receiving node may have a different view of
* min serialization version, we _always_ have to either use a {@link
VerboseMetadataSerializer}
- * (like {@link LogState}/ {@link Replication} or explicitly serialize the
version (like {@link Commit}).
+ * (like {@link LogState}) or explicitly serialize the version (like {@link
Commit}).
*/
public class MessageSerializers
{
public static IVersionedSerializer<LogState> logStateSerializer()
{
ClusterMetadata metadata = ClusterMetadata.currentNullable();
- if (metadata == null ||
metadata.directory.clusterMinVersion.serializationVersion ==
NodeVersion.CURRENT.serializationVersion)
+ if (metadata == null ||
metadata.directory.commonSerializationVersion.asInt() ==
NodeVersion.CURRENT.serializationVersion)
return LogState.defaultMessageSerializer;
- assert
!metadata.directory.clusterMinVersion.serializationVersion().equals(NodeVersion.CURRENT.serializationVersion());
- return
LogState.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
+ assert
!metadata.directory.commonSerializationVersion.equals(NodeVersion.CURRENT.serializationVersion());
+ return
LogState.messageSerializer(metadata.directory.commonSerializationVersion);
}
public static IVersionedSerializer<Commit.Result> commitResultSerializer()
{
ClusterMetadata metadata = ClusterMetadata.currentNullable();
- if (metadata == null ||
metadata.directory.clusterMinVersion.serializationVersion ==
NodeVersion.CURRENT.serializationVersion)
+ if (metadata == null ||
metadata.directory.commonSerializationVersion.asInt() ==
NodeVersion.CURRENT.serializationVersion)
return Commit.Result.defaultMessageSerializer;
- assert
!metadata.directory.clusterMinVersion.serializationVersion().equals(NodeVersion.CURRENT.serializationVersion());
- return
Commit.Result.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
+ assert
!metadata.directory.commonSerializationVersion.equals(NodeVersion.CURRENT.serializationVersion());
+ return
Commit.Result.messageSerializer(metadata.directory.commonSerializationVersion);
}
public static IVersionedSerializer<Commit> commitSerializer()
{
ClusterMetadata metadata = ClusterMetadata.currentNullable();
- if (metadata == null ||
metadata.directory.clusterMinVersion.serializationVersion ==
NodeVersion.CURRENT.serializationVersion)
+ if (metadata == null ||
metadata.directory.commonSerializationVersion.asInt() ==
NodeVersion.CURRENT.serializationVersion)
return Commit.defaultMessageSerializer;
- assert
!metadata.directory.clusterMinVersion.serializationVersion().equals(NodeVersion.CURRENT.serializationVersion());
- return
Commit.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
+ assert
!metadata.directory.commonSerializationVersion.equals(NodeVersion.CURRENT.serializationVersion());
+ return
Commit.messageSerializer(metadata.directory.commonSerializationVersion);
}
public static IVersionedSerializer<CMSInitializationRequest>
initRequestSerializer()
{
ClusterMetadata metadata = ClusterMetadata.currentNullable();
- if (metadata == null ||
metadata.directory.clusterMinVersion.serializationVersion ==
NodeVersion.CURRENT.serializationVersion)
+ if (metadata == null ||
metadata.directory.commonSerializationVersion.asInt() ==
NodeVersion.CURRENT.serializationVersion)
return CMSInitializationRequest.defaultMessageSerializer;
- assert
!metadata.directory.clusterMinVersion.serializationVersion().equals(NodeVersion.CURRENT.serializationVersion());
- return
CMSInitializationRequest.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
+ assert
!metadata.directory.commonSerializationVersion.equals(NodeVersion.CURRENT.serializationVersion());
+ return
CMSInitializationRequest.messageSerializer(metadata.directory.commonSerializationVersion);
}
}
diff --git a/src/java/org/apache/cassandra/tcm/serialization/Version.java
b/src/java/org/apache/cassandra/tcm/serialization/Version.java
index c4fb39cd51..9cbcb8587a 100644
--- a/src/java/org/apache/cassandra/tcm/serialization/Version.java
+++ b/src/java/org/apache/cassandra/tcm/serialization/Version.java
@@ -29,7 +29,13 @@ import org.apache.cassandra.tcm.membership.NodeVersion;
public enum Version
{
+ /**
+ * Used for nodes on older, pre-CEP-21 release versions
+ */
OLD(-1),
+ /**
+ * Initial version
+ */
V0(0),
/**
* - Moved Partitioner in ClusterMetadata serializer to be the first field
@@ -81,6 +87,7 @@ public enum Version
public static final Version MIN_ACCORD_VERSION = V7;
private static Map<Integer, Version> values = new HashMap<>();
+
static
{
for (Version v : values())
@@ -99,10 +106,9 @@ public enum Version
public static Version minCommonSerializationVersion()
{
ClusterMetadata metadata = ClusterMetadata.currentNullable();
- if (metadata != null)
- return metadata.directory.clusterMinVersion.serializationVersion();
- return NodeVersion.CURRENT.serializationVersion();
-
+ return metadata != null
+ ? metadata.directory.commonSerializationVersion
+ : NodeVersion.CURRENT.serializationVersion();
}
public int asInt()
@@ -120,11 +126,21 @@ public enum Version
return version >= other.version;
}
+ public boolean isEqualOrBefore(Version version)
+ {
+ return this.version <= version.version;
+ }
+
public boolean isBefore(Version other)
{
return version < other.version;
}
+ public boolean isAfter(Version other)
+ {
+ return version > other.version;
+ }
+
public static Version fromInt(int i)
{
Version v = values.get(i);
diff --git
a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
index 11c2ed4d31..9564372c6f 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
@@ -61,6 +61,12 @@ public class Initialize extends ForceSnapshot
super(baseState);
}
+ @Override
+ public boolean allowDuringUpgrades()
+ {
+ return true;
+ }
+
public Kind kind()
{
return Kind.INITIALIZE_CMS;
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
index 8c5dc0b0dc..40fb38464c 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
@@ -34,6 +34,7 @@ import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;
+import static org.apache.cassandra.tcm.CMSOperations.CMS_ID;
import static org.apache.cassandra.tcm.CMSOperations.COMMITS_PAUSED;
import static org.apache.cassandra.tcm.CMSOperations.EPOCH;
import static org.apache.cassandra.tcm.CMSOperations.IS_MEMBER;
@@ -73,6 +74,7 @@ public class CMSAdmin extends AbstractCommand
{
Map<String, String> info =
probe.getCMSOperationsProxy().describeCMS();
output.out.printf("Cluster Metadata Service:%n");
+ output.out.printf("CMS Identifier: %s%n", info.get(CMS_ID));
output.out.printf("Members: %s%n", info.get(MEMBERS));
output.out.printf("Needs reconfiguration: %s%n",
info.get(NEEDS_RECONFIGURATION));
output.out.printf("Is Member: %s%n", info.get(IS_MEMBER));
diff --git
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostTest.java
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeCleanupPreInitializeTest.java
similarity index 52%
rename from
test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostTest.java
rename to
test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeCleanupPreInitializeTest.java
index 51267db100..3117abaed9 100644
---
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeCleanupPreInitializeTest.java
@@ -22,11 +22,16 @@ import org.junit.Test;
import org.apache.cassandra.distributed.Constants;
import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.tcm.Transformation;
-public class ClusterMetadataUpgradeIgnoreHostTest extends UpgradeTestBase
+import static org.junit.Assert.assertTrue;
+
+
+public class ClusterMetadataUpgradeCleanupPreInitializeTest extends
UpgradeTestBase
{
+
@Test
- public void upgradeIgnoreHostsTest() throws Throwable
+ public void cleanupPreInitializeTest() throws Throwable
{
new TestCase()
.nodes(3)
@@ -39,12 +44,35 @@ public class ClusterMetadataUpgradeIgnoreHostTest extends
UpgradeTestBase
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY (pk, ck))");
})
.runAfterClusterUpgrade((cluster) -> {
- // todo; isolate node 3 - actually shutting it down makes us throw
exceptions when test finishes
- cluster.filters().allVerbs().to(3).drop();
- cluster.filters().allVerbs().from(3).drop();
- cluster.get(1).nodetoolResult("cms",
"initialize").asserts().failure(); // node3 unreachable
- cluster.get(1).nodetoolResult("cms", "initialize", "--ignore",
"127.0.0.1").asserts().failure(); // can't ignore localhost
- cluster.get(1).nodetoolResult("cms", "initialize", "--ignore",
"127.0.0.3").asserts().success();
+ cluster.get(1).executeInternal("INSERT INTO
system.local_metadata_log (epoch, kind) VALUES (1, 0)");
+ cluster.get(1).flush("system");
+ cluster.get(1).shutdown().get();
+ cluster.get(1).startup();
+ cluster.get(1).logs().watchFor("Cleaning up orphaned PreInitialize
at epoch 1");
+ cluster.get(1).nodetoolResult("cms",
"initialize").asserts().success();
+ cluster.get(1).shutdown().get();
+ cluster.get(1).startup();
+ boolean seenPreInit = false;
+ boolean seenInit = false;
+ boolean seenSnapshot = false;
+ for (Object [] row : cluster.get(1).executeInternal("SELECT epoch,
kind FROM system.local_metadata_log"))
+ {
+ switch (Transformation.Kind.fromId((Integer)row[1]))
+ {
+ case PRE_INITIALIZE_CMS:
+ seenPreInit = true;
+ break;
+ case INITIALIZE_CMS:
+ seenInit = true;
+ break;
+ case TRIGGER_SNAPSHOT:
+ seenSnapshot = true;
+ break;
+ }
+ }
+ assertTrue(seenPreInit);
+ assertTrue(seenInit);
+ assertTrue(seenSnapshot);
}).run();
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeDelayedInitializeTest.java
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeDelayedInitializeTest.java
new file mode 100644
index 0000000000..dc16786f0b
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeDelayedInitializeTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Shared;
+import org.awaitility.Awaitility;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertEquals;
+
+public class ClusterMetadataUpgradeDelayedInitializeTest extends
UpgradeTestBase
+{
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterMetadataUpgradeDelayedInitializeTest.class);
+
+ @Test
+ public void delayedInitializationTest() throws Throwable
+ {
+ // In this test we force the initiator to pause between committing the
PreInitialize and Initialize, then verify
+ // that read/write/gossip messages exchanged whilst in this state do
not cause the other nodes to fetch the
+ // PreInitialize log entry. We inject that pause using ByteBuddy to
add a wait on a CountdownLatch controlled
+ // from the main test code.
+ final int nodeCount = 3;
+ Versions dtestVersions = Versions.find();
+ Versions.Version earliestVersion = dtestVersions.getLatest(v41);
+ Consumer<IInstanceConfig> configUpdater = config ->
config.with(Feature.NETWORK, Feature.GOSSIP);
+ Consumer<UpgradeableCluster.Builder> builderUpdater = builder ->
builder.withInstanceInitializer(ClusterMetadataUpgradeDelayedInitializeTest.BBInstaller::installUpgradeVersionBB);
+ try (UpgradeableCluster cluster = UpgradeableCluster.create(nodeCount,
earliestVersion, configUpdater, builderUpdater))
+ {
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor':3}"));
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int
PRIMARY KEY)");
+
+ // Upgrade all instances
+ Versions.Version upgradeVersion = dtestVersions.getLatest(CURRENT);
+ for (IUpgradeableInstance i : cluster)
+ upgradeInstance(i, upgradeVersion);
+ logger.info(String.format("Upgrade complete. Current version: %s",
upgradeVersion));
+ ((IInvokableInstance)cluster.get(2)).runOnInstance(() -> {
+ KeyspaceMetadata ksm =
ClusterMetadata.current().schema.getKeyspaceMetadata("distributed_test_keyspace");
+ TableMetadata tm = ksm.tables.getNullable("tbl");
+ System.out.println(tm.epoch);
+ });
+ // Start the CMS initialization, which should pause after
committing the PreInitialize transform
+ assertEquals(1, BBState.latch.getCount());
+ ExecutorService initExecutor = Executors.newSingleThreadExecutor();
+ IUpgradeableInstance initiator = cluster.get(1);
+ Future<NodeToolResult> result = initExecutor.submit(() ->
initiator.nodetoolResult("cms", "initialize"));
+ logger.info("Waiting for initiator to pause");
+ Awaitility.waitAtMost(30, TimeUnit.SECONDS).until(() -> {
+ // wait until the intiator has committed the PreInitialize
entry
+ NodeToolResult res = initiator.nodetoolResult("cms");
+ return res.getStdout().contains("Epoch: 1");
+ });
+ logger.info("Initiator paused");
+
+ // internode messages for reads/writes should not trigger catchup
between node1 and its peers
+ for (int i = 1; i <= 3; i++)
+ {
+ cluster.coordinator(i).execute(withKeyspace("INSERT INTO
%s.tbl (k) VALUES (0)"), ConsistencyLevel.ALL);
+ cluster.coordinator(i).execute(withKeyspace("SELECT * FROM
%s.tbl WHERE k=0"), ConsistencyLevel.ALL);
+ }
+ // nor should gossip messages
+ awaitGossipUpdateFromPeer(cluster.get(2), initiator);
+ awaitGossipUpdateFromPeer(cluster.get(3), initiator);
+
+ // Assert that nodes 2 & 3 did not catch up from node 1
+
cluster.get(2).nodetoolResult("cms").asserts().success().stdoutContains("Epoch:
-9223372036854775807");
+
cluster.get(3).nodetoolResult("cms").asserts().success().stdoutContains("Epoch:
-9223372036854775807");
+
+ logger.info("Allowing initiator to continue");
+ BBState.latch.countDown();
+ NodeToolResult initRes = result.get(30, TimeUnit.SECONDS);
+ initRes.asserts().success();
+
+ // Verify that the initialization completes
+ Awaitility.waitAtMost(30, TimeUnit.SECONDS)
+ .until(() ->
cluster.get(1).nodetoolResult("cms").getStdout().contains("Epoch: 3") &&
+
cluster.get(2).nodetoolResult("cms").getStdout().contains("Epoch: 3") &&
+
cluster.get(3).nodetoolResult("cms").getStdout().contains("Epoch: 3"));
+ }
+ }
+
+ /**
+ * Inspects gossip state on one peer and waits for the heartbeat state of
another peer to increment
+ * (which is not an absolute guarantee of direct communication between the
two nodes, but should be good enough).
+ *
+ * @param waiter the node doing the checking & waiting to observe an update
+ * @param waitingFor the node who's heartbeat being observed
+ */
+ private void awaitGossipUpdateFromPeer(IUpgradeableInstance waiter,
IUpgradeableInstance waitingFor)
+ {
+ String targetEndpoint =
waitingFor.config().getString("broadcast_address");
+ ((IInvokableInstance) waiter).runOnInstance(() -> {
+ InetAddressAndPort endpoint =
InetAddressAndPort.getByNameUnchecked(targetEndpoint);
+ final int before =
Gossiper.instance.getEndpointStateForEndpoint(endpoint)
+ .getHeartBeatState()
+ .getHeartBeatVersion();
+
+ long deadline = System.nanoTime() +
TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+ while (Gossiper.instance.getEndpointStateForEndpoint(endpoint)
+ .getHeartBeatState()
+ .getHeartBeatVersion() < before + 1)
+ {
+ FBUtilities.sleepQuietly(100);
+ if (System.nanoTime() > deadline)
+ throw new RuntimeException("Timed out waiting for gossip
state to update");
+ }
+ });
+ }
+
+ private void upgradeInstance(IUpgradeableInstance instance,
Versions.Version upgradeTo) throws ExecutionException, InterruptedException,
TimeoutException
+ {
+ int instanceId = instance.config().num();
+ logger.info("Shutting down instance {} to upgrade to {}", instanceId,
upgradeTo.version);
+ instance.shutdown(true).get(60, TimeUnit.SECONDS);
+ logger.info("Starting instanceId {} on version {}", instanceId,
upgradeTo.version);
+ instance.setVersion(upgradeTo);
+ instance.startup();
+ logger.info("Started instanceId {}", instanceId);
+ }
+
+
+ @Shared
+ public static class BBState
+ {
+ public static CountDownLatch latch = new CountDownLatch(1);
+ }
+
+ public static class BBInstaller
+ {
+ public static void installUpgradeVersionBB(ClassLoader classLoader,
Integer num)
+ {
+ try
+ {
+ new ByteBuddy().rebase(ClusterMetadata.class)
+ .method(named("initializeClusterIdentifier"))
+
.intercept(MethodDelegation.to(ClusterMetadataUpgradeDelayedInitializeTest.BBInterceptor.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+ catch (NoClassDefFoundError noClassDefFoundError)
+ {
+ logger.info("... but no class def", noClassDefFoundError);
+ }
+ catch (Throwable tr)
+ {
+ logger.info("Unable to intercept upgradeFromVersion method",
tr);
+ throw tr;
+ }
+ }
+ }
+
+ public static class BBInterceptor
+ {
+ @SuppressWarnings("unused")
+ public static ClusterMetadata initializeClusterIdentifier(@SuperCall
Callable<ClusterMetadata> zuper)
+ {
+ try
+ {
+ logger.info("initializeClusterIdentifier waiting...");
+ BBState.latch.await(60, TimeUnit.SECONDS);
+ logger.info("initializeClusterIdentifier continuing...");
+ return zuper.call();
+ }
+ catch (Throwable e)
+ {
+ logger.info("error:", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java
new file mode 100644
index 0000000000..e5cbcb7555
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterMetadataUpgradeIgnoreHostsTest extends UpgradeTestBase
+{
+ @Test
+ public void upgradeIgnoreHostsTest() throws Throwable
+ {
+ new TestCase()
+ .nodes(3)
+ .nodesToUpgrade(1, 2, 3)
+ .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)
+ .set(Constants.KEY_DTEST_FULL_STARTUP, true))
+ .upgradesToCurrentFrom(v41)
+ .setup((cluster) -> {
+ cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor':2}"));
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY (pk, ck))");
+ })
+ .runAfterClusterUpgrade((cluster) -> {
+ // todo; isolate node 3 - actually shutting it down makes us throw
exceptions when test finishes
+ cluster.filters().allVerbs().to(3).drop();
+ cluster.filters().allVerbs().from(3).drop();
+ cluster.get(1).nodetoolResult("cms",
"initialize").asserts().failure(); // node3 unreachable
+ cluster.get(1).nodetoolResult("cms", "initialize", "--ignore",
"127.0.0.1").asserts().failure(); // can't ignore localhost
+ cluster.get(1).nodetoolResult("cms", "initialize", "--ignore",
"127.0.0.3").asserts().success();
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"2").asserts().success();
+ }).run();
+ }
+
+ @Test
+ public void upgradeIgnoreHostsNonUpgradedTest() throws Throwable
+ {
+ new TestCase()
+ .nodes(3)
+ .nodesToUpgrade(1, 2)
+ .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)
+ .set(Constants.KEY_DTEST_FULL_STARTUP, true))
+ .upgradesToCurrentFrom(v41)
+ .setup((cluster) -> {
+ cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor':2}"));
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ })
+ .runAfterClusterUpgrade((cluster) -> {
+ cluster.filters().allVerbs().to(3).drop();
+ cluster.filters().allVerbs().from(3).drop();
+ cluster.get(1).nodetoolResult("cms", "initialize",
"--ignore", "127.0.0.3").asserts().success();
+ cluster.get(3).shutdown().get();
+ cluster.filters().reset();
+
cluster.get(3).setVersion(Versions.find().getLatest(CURRENT));
+ cluster.get(3).startup();
+ cluster.schemaChange(withKeyspace("ALTER TABLE " +
KEYSPACE + ".tbl with comment = 'test'"));
+ ((IInvokableInstance)cluster.get(3)).runOnInstance(() -> {
+ Epoch current = ClusterMetadata.current().epoch;
+ if (current.isBefore(Epoch.FIRST))
+ throw new AssertionError("Epoch was not
incremented as expected, still at " + current);
+ });
+
+ // Verify that CMS identifier has propagated across the
nodes as this asserts that the DOWN node
+ // did not affect the common serialization version for
metadata.
+ Set<Long> identifiers = new HashSet<>();
+ Pattern p = Pattern.compile(".*CMS Identifier\\:
([\\d]*).*", Pattern.DOTALL);
+ cluster.forEach(i -> {
+ Matcher m =
p.matcher(i.nodetoolResult("cms").getStdout());
+ assertTrue(m.matches());
+ identifiers.add(Long.parseLong(m.group(1)));
+ });
+ assertEquals(1, identifiers.size());
+ assertNotEquals(ClusterMetadata.EMPTY_METADATA_IDENTIFIER,
(long) identifiers.iterator().next());
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"2").asserts().success();
+ }).run();
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeUnexpectedFailureTest.java
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeUnexpectedFailureTest.java
new file mode 100644
index 0000000000..8ab8dda4cc
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeUnexpectedFailureTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.migration.Election;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+public class ClusterMetadataUpgradeUnexpectedFailureTest extends
UpgradeTestBase
+{
+ @Test
+ public void upgradeFailsUnexpectedlyAfterInitiation() throws Throwable
+ {
+ Consumer<UpgradeableCluster.Builder > builderUpdater = builder ->
builder.withInstanceInitializer(BBInstaller::installUpgradeVersionBB);
+ new TestCase()
+ .nodes(3)
+ .nodesToUpgrade(1, 2, 3)
+ .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)
+ .set(Constants.KEY_DTEST_FULL_STARTUP, true))
+ .upgradesToCurrentFrom(v41)
+ .withBuilder(builderUpdater)
+ .setup((cluster) -> {
+ cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor':2}"));
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ })
+ .runAfterClusterUpgrade((cluster) -> {
+ // we injected a BB helper to trigger an unexpected
failure on the first attempt at initialization.
+ // i.e. an exception that must be caught as opposed to a
mismatch in metadata or down peer.
+ cluster.get(1).nodetoolResult("cms",
"initialize").asserts().failure().errorContains("Something unexpected went
wrong");
+ // handling the failure should have included cleaning up
any state so that another attempt can be
+ // made, which this time should succeed.
+ cluster.get(1).nodetoolResult("cms",
"initialize").asserts().success();
+ }).run();
+ }
+
+ public static class BBInstaller
+ {
+ public static void installUpgradeVersionBB(ClassLoader classLoader,
Integer num)
+ {
+ try
+ {
+ new ByteBuddy().rebase(Election.class)
+ .method(named("finish"))
+
.intercept(MethodDelegation.to(BBInterceptor.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+ catch (NoClassDefFoundError noClassDefFoundError)
+ {
+ throw noClassDefFoundError;
+ }
+ catch (Throwable tr)
+ {
+ throw tr;
+ }
+ }
+ }
+
+ public static class BBInterceptor
+ {
+ private static final AtomicBoolean firstAttempt = new
AtomicBoolean(true);
+
+ @SuppressWarnings("unused")
+ public static void finish(Set<InetAddressAndPort> sendTo, @SuperCall
Callable<Void> zuper)
+ {
+ if (firstAttempt.getAndSet(false))
+ throw new IllegalStateException("Something unexpected went
wrong");
+
+ try
+ {
+ zuper.call();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
b/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
index ea66961ee5..790ac3380c 100644
--- a/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
+++ b/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.tcm.membership;
import org.junit.Test;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.CassandraVersion;
+
import static org.apache.cassandra.tcm.membership.MembershipUtils.endpoint;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -111,6 +114,80 @@ public class DirectoryTest
assertTrue(dir.allDatacenterRacks().isEmpty());
}
+ @Test
+ public void commonSerializationVersionTest()
+ {
+ int cnt = 1;
+ Location DC1_R1 = new Location("datacenter1", "rack1");
+ Directory dir = new Directory();
+ assertTrue(dir.isEmpty());
+ assertEquals(NodeVersion.CURRENT_METADATA_VERSION,
dir.commonSerializationVersion);
+
+ dir = dir.with(addresses(cnt++), DC1_R1, NodeVersion.CURRENT);
+ assertEquals(NodeVersion.CURRENT.serializationVersion,
dir.commonSerializationVersion.asInt());
+
+ dir = dir.with(addresses(cnt++), DC1_R1,
withSerializationVersion(Version.V4));
+ assertEquals(Version.V4, dir.commonSerializationVersion);
+
+ dir = dir.with(addresses(cnt++), DC1_R1,
withSerializationVersion(Version.V3));
+ assertEquals(Version.V3, dir.commonSerializationVersion);
+
+ dir = dir.with(addresses(cnt++), DC1_R1,
withSerializationVersion(Version.V2));
+ assertEquals(Version.V2, dir.commonSerializationVersion);
+
+ dir = dir.with(addresses(cnt++), DC1_R1,
withSerializationVersion(Version.V1));
+ assertEquals(Version.V1, dir.commonSerializationVersion);
+
+ // int v
+ dir = dir.with(addresses(cnt++), DC1_R1,
withSerializationVersion(Version.V0));
+ assertEquals(Version.V0, dir.commonSerializationVersion);
+
+ // Adding another, higher version doesn't affect the floor
+ dir = dir.with(addresses(cnt++), DC1_R1, NodeVersion.CURRENT);
+ assertEquals(Version.V0, dir.commonSerializationVersion);
+
+ // Version.OLD for pre-CEP-21 nodes doesn't affect the floor
+ dir = dir.with(addresses(cnt), DC1_R1,
withSerializationVersion(Version.OLD));
+ assertEquals(Version.V0, dir.commonSerializationVersion);
+
+ // remove all peers except the first, common version should revert to
CURRENT
+ while (cnt > 1)
+ dir = dir.without(dir.lastModified().nextEpoch(), new
NodeId(cnt--));
+
+ assertEquals(NodeVersion.CURRENT.serializationVersion,
dir.commonSerializationVersion.asInt());
+ }
+
+ @Test
+ public void commonSerializationVersionLEFTNodesTest()
+ {
+ int cnt = 0;
+ Location DC1_R1 = new Location("datacenter1", "rack1");
+ Directory dir = new Directory();
+ assertTrue(dir.isEmpty());
+
+ dir = dir.with(addresses(cnt++), DC1_R1, NodeVersion.CURRENT);
+ dir = dir.with(addresses(cnt++), DC1_R1, NodeVersion.CURRENT);
+ dir = dir.with(addresses(cnt++), DC1_R1, NodeVersion.CURRENT);
+ NodeAddresses leavingEndpoint = addresses(cnt++);
+ dir = dir.with(leavingEndpoint, DC1_R1,
withSerializationVersion(Version.V4));
+ assertEquals(Version.V4, dir.commonSerializationVersion);
+
+ NodeId leavingNode = dir.peerId(leavingEndpoint.broadcastAddress);
+ dir = dir.withNodeState(leavingNode, NodeState.LEFT);
+ assertEquals(NodeVersion.CURRENT_METADATA_VERSION,
dir.commonSerializationVersion);
+ }
+
+
+ private static NodeAddresses addresses(int i)
+ {
+ return new NodeAddresses(endpoint(i));
+ }
+
+ private static NodeVersion withSerializationVersion(Version version)
+ {
+ return new NodeVersion(CassandraVersion.NULL_VERSION, version);
+ }
+
private void assertInvalidLocationUpdate(Directory dir, NodeId nodeId,
Location loc, String message)
{
try
diff --git a/test/unit/org/apache/cassandra/tcm/membership/NodeVersionTest.java
b/test/unit/org/apache/cassandra/tcm/membership/NodeVersionTest.java
new file mode 100644
index 0000000000..3530570a1b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/membership/NodeVersionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.membership;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class NodeVersionTest
+{
+ @Test
+ public void futureNodeVersionTest() throws IOException
+ {
+ ByteBuffer bb;
+ try (DataOutputBuffer dob = new DataOutputBuffer())
+ {
+ NodeVersion.Serializer.serializeHelper(dob, "8.0.0",
NodeVersion.CURRENT.serializationVersion + 1);
+ bb = dob.asNewBuffer();
+ }
+
+ try (DataInputBuffer in = new DataInputBuffer(bb, false))
+ {
+ NodeVersion n = NodeVersion.serializer.deserialize(in,
NodeVersion.CURRENT.serializationVersion());
+ assertEquals(n.serializationVersion().asInt(),
Version.UNKNOWN.asInt());
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]