This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0ec5ef2c70 Preclude irrecoverable log corruption in case split-brain 
situation during leader election with absent seeds
0ec5ef2c70 is described below

commit 0ec5ef2c7035fc93323816140994617a9d953956
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Wed Mar 6 07:58:18 2024 +0100

    Preclude irrecoverable log corruption in case split-brain situation during 
leader election with absent seeds
    
    Patch by Alex Petrov; reviewed my marcuse for CASSANDRA-19153
---
 src/java/org/apache/cassandra/config/Config.java   |   1 +
 .../cassandra/config/DatabaseDescriptor.java       |   5 +
 .../net/CMSIdentifierMismatchException.java        |  32 +++++++
 .../cassandra/net/InboundMessageHandler.java       |  16 ++++
 .../org/apache/cassandra/net/MessageDelivery.java  |   7 +-
 .../org/apache/cassandra/tcm/ClusterMetadata.java  | 104 +++++++++++++++++++--
 src/java/org/apache/cassandra/tcm/Commit.java      |  19 +++-
 src/java/org/apache/cassandra/tcm/Discovery.java   |  33 +++++--
 src/java/org/apache/cassandra/tcm/Startup.java     |   9 +-
 .../org/apache/cassandra/tcm/log/LogState.java     |  10 +-
 .../tcm/migration/ClusterMetadataHolder.java       |   4 +-
 .../tcm/serialization/MessageSerializers.java      |   5 +
 .../cassandra/distributed/impl/InstanceConfig.java |   1 +
 .../distributed/test/log/RegisterTest.java         |   1 -
 .../distributed/test/tcm/SplitBrainTest.java       |  79 ++++++++++++++++
 15 files changed, 300 insertions(+), 26 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 1b2ea89512..75bbdd73c8 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -1296,5 +1296,6 @@ public class Config
 
     public volatile DurationSpec.LongMillisecondsBound 
progress_barrier_timeout = new DurationSpec.LongMillisecondsBound("3600000ms");
     public volatile DurationSpec.LongMillisecondsBound 
progress_barrier_backoff = new DurationSpec.LongMillisecondsBound("1000ms");
+    public volatile DurationSpec.LongSecondsBound discovery_timeout = new 
DurationSpec.LongSecondsBound("30s");
     public boolean unsafe_tcm_mode = false;
 }
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 39ddbe5046..59f4040203 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -5134,6 +5134,11 @@ public class DatabaseDescriptor
         conf.progress_barrier_backoff = new 
DurationSpec.LongMillisecondsBound(timeOutInMillis);
     }
 
+    public static long getDiscoveryTimeout(TimeUnit unit)
+    {
+        return conf.discovery_timeout.to(unit);
+    }
+
     public static boolean getUnsafeTCMMode()
     {
         return conf.unsafe_tcm_mode;
diff --git 
a/src/java/org/apache/cassandra/net/CMSIdentifierMismatchException.java 
b/src/java/org/apache/cassandra/net/CMSIdentifierMismatchException.java
new file mode 100644
index 0000000000..6f525b1a90
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/CMSIdentifierMismatchException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Exception thrown in case of a CMS identifier mismatch. This should usually 
not happen, except rare cases of
+ * network partition during CMS election during initial cluster bringup. This 
is just a precaution to avoid
+ * corrupting CMS log.
+ */
+public class CMSIdentifierMismatchException extends RuntimeException
+{
+    public CMSIdentifierMismatchException(String format)
+    {
+        super(format);
+    }
+}
diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java 
b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
index ce75b67656..746c0f7a39 100644
--- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
@@ -175,6 +175,14 @@ public class InboundMessageHandler extends 
AbstractMessageHandler
             noSpamLogger.info("{} incompatible schema encountered while 
deserializing a message", this, e);
             
ClusterMetadataService.instance().fetchLogFromPeerAsync(header.from, 
header.epoch);
         }
+        catch (CMSIdentifierMismatchException e)
+        {
+            callbacks.onFailedDeserialize(size, header, e);
+            logger.error("{} is a member of a different CMS group. Forcing 
connection close.", header.from, e);
+            MessagingService.instance().closeOutbound(header.from);
+            // Sharable bytes will be released by the frame decoder
+            channel.close();
+        }
         catch (Throwable t)
         {
             JVMStabilityInspector.inspectThrowable(t);
@@ -372,6 +380,14 @@ public class InboundMessageHandler extends 
AbstractMessageHandler
                 callbacks.onFailedDeserialize(size, header, e);
                 noSpamLogger.info("{} incompatible schema encountered while 
deserializing a message", InboundMessageHandler.this, e);
             }
+            catch (CMSIdentifierMismatchException e)
+            {
+                callbacks.onFailedDeserialize(size, header, e);
+                noSpamLogger.info("{} is a member of a different CMS group, 
and should not be tried. Forcing connection close.", header.from);
+                // Sharable bytes will be released by the frame decoder
+                channel.close();
+                MessagingService.instance().closeOutbound(header.from);
+            }
             catch (Throwable t)
             {
                 JVMStabilityInspector.inspectThrowable(t);
diff --git a/src/java/org/apache/cassandra/net/MessageDelivery.java 
b/src/java/org/apache/cassandra/net/MessageDelivery.java
index cd2ea96d1f..e526b5ba3b 100644
--- a/src/java/org/apache/cassandra/net/MessageDelivery.java
+++ b/src/java/org/apache/cassandra/net/MessageDelivery.java
@@ -36,8 +36,11 @@ import org.apache.cassandra.utils.concurrent.Future;
 public interface MessageDelivery
 {
     Logger logger = LoggerFactory.getLogger(MessageDelivery.class);
-
     static <REQ, RSP> Collection<Pair<InetAddressAndPort, RSP>> 
fanoutAndWait(MessageDelivery messaging, Set<InetAddressAndPort> sendTo, Verb 
verb, REQ payload)
+    {
+        return fanoutAndWait(messaging, sendTo, verb, payload, 
DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS);
+    }
+    static <REQ, RSP> Collection<Pair<InetAddressAndPort, RSP>> 
fanoutAndWait(MessageDelivery messaging, Set<InetAddressAndPort> sendTo, Verb 
verb, REQ payload, long timeout, TimeUnit timeUnit)
     {
         Accumulator<Pair<InetAddressAndPort, RSP>> responses = new 
Accumulator<>(sendTo.size());
         CountDownLatch cdl = CountDownLatch.newCountDownLatch(sendTo.size());
@@ -63,7 +66,7 @@ public interface MessageDelivery
             logger.info("Election for metadata migration sending {} ({}) to 
{}", verb, payload.toString(), ep);
             messaging.sendWithCallback(Message.out(verb, payload), ep, 
callback);
         });
-        
cdl.awaitUninterruptibly(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.MILLISECONDS),
 TimeUnit.MILLISECONDS);
+        cdl.awaitUninterruptibly(timeout, timeUnit);
         return responses.snapshot();
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index 3c7144eee4..5e6b9a6060 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.net.CMSIdentifierMismatchException;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces;
@@ -77,8 +78,11 @@ import static org.apache.cassandra.db.TypeSizes.sizeof;
 
 public class ClusterMetadata
 {
+    public static final int EMPTY_METADATA_IDENTIFIER = 0;
     public static final Serializer serializer = new Serializer();
 
+    public final int metadataIdentifier;
+
     public final Epoch epoch;
     public final long period;
     public final boolean lastInPeriod;
@@ -110,7 +114,8 @@ public class ClusterMetadata
     @VisibleForTesting
     public ClusterMetadata(IPartitioner partitioner, Directory directory, 
DistributedSchema schema)
     {
-        this(Epoch.EMPTY,
+        this(EMPTY_METADATA_IDENTIFIER,
+             Epoch.EMPTY,
              Period.EMPTY,
              true,
              partitioner,
@@ -134,11 +139,39 @@ public class ClusterMetadata
                            LockedRanges lockedRanges,
                            InProgressSequences inProgressSequences,
                            Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions)
+    {
+        this(EMPTY_METADATA_IDENTIFIER,
+             epoch,
+             period,
+             lastInPeriod,
+             partitioner,
+             schema,
+             directory,
+             tokenMap,
+             placements,
+             lockedRanges,
+             inProgressSequences,
+             extensions);
+    }
+
+    private ClusterMetadata(int metadataIdentifier,
+                           Epoch epoch,
+                           long period,
+                           boolean lastInPeriod,
+                           IPartitioner partitioner,
+                           DistributedSchema schema,
+                           Directory directory,
+                           TokenMap tokenMap,
+                           DataPlacements placements,
+                           LockedRanges lockedRanges,
+                           InProgressSequences inProgressSequences,
+                           Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions)
     {
         // TODO: token map is a feature of the specific placement strategy, 
and so may not be a relevant component of
         //  ClusterMetadata in the long term. We need to consider how the 
actual components of metadata can be evolved
         //  over time.
         assert tokenMap == null || 
tokenMap.partitioner().getClass().equals(partitioner.getClass()) : "Partitioner 
for TokenMap doesn't match base partitioner";
+        this.metadataIdentifier = metadataIdentifier;
         this.epoch = epoch;
         this.period = period;
         this.lastInPeriod = lastInPeriod;
@@ -192,7 +225,8 @@ public class ClusterMetadata
         // increments the published epoch by one. As each component has its 
own last
         // modified epoch, we may also need to coerce those, but only if they 
are
         // greater than the epoch we're forcing here.
-        return new ClusterMetadata(epoch,
+        return new ClusterMetadata(metadataIdentifier,
+                                   epoch,
                                    period,
                                    lastInPeriod,
                                    partitioner,
@@ -205,9 +239,32 @@ public class ClusterMetadata
                                    capLastModified(extensions, epoch));
     }
 
+    public ClusterMetadata initializeClusterIdentifier(int clusterIdentifier)
+    {
+        if (this.metadataIdentifier != EMPTY_METADATA_IDENTIFIER)
+            throw new IllegalStateException(String.format("Can only initialize 
cluster identifier once, but it was already set to %d", 
this.metadataIdentifier));
+
+        if (clusterIdentifier == EMPTY_METADATA_IDENTIFIER)
+            throw new IllegalArgumentException("Can not initialize cluster 
with empty cluster identifier");
+
+        return new ClusterMetadata(clusterIdentifier,
+                                   epoch,
+                                   period,
+                                   lastInPeriod,
+                                   partitioner,
+                                   schema,
+                                   directory,
+                                   tokenMap,
+                                   placements,
+                                   lockedRanges,
+                                   inProgressSequences,
+                                   extensions);
+    }
+
     public ClusterMetadata forcePeriod(long period)
     {
-        return new ClusterMetadata(epoch,
+        return new ClusterMetadata(metadataIdentifier,
+                                   epoch,
                                    period,
                                    false,
                                    partitioner,
@@ -591,7 +648,8 @@ public class ClusterMetadata
                 inProgressSequences = 
inProgressSequences.withLastModified(epoch);
             }
 
-            return new Transformed(new ClusterMetadata(epoch,
+            return new Transformed(new ClusterMetadata(base.metadataIdentifier,
+                                                       epoch,
                                                        period,
                                                        lastInPeriod,
                                                        partitioner,
@@ -607,7 +665,8 @@ public class ClusterMetadata
 
         public ClusterMetadata buildForGossipMode()
         {
-            return new ClusterMetadata(Epoch.UPGRADE_GOSSIP,
+            return new ClusterMetadata(base.metadataIdentifier,
+                                       Epoch.UPGRADE_GOSSIP,
                                        Period.EMPTY,
                                        true,
                                        partitioner,
@@ -805,6 +864,25 @@ public class ClusterMetadata
         return ClusterMetadataService.instance().metadata();
     }
 
+    public static void checkIdentifier(int remoteIdentifier)
+    {
+        ClusterMetadata metadata = currentNullable();
+        if (metadata != null)
+        {
+            int currentIdentifier = metadata.metadataIdentifier;
+            // We haven't yet joined CMS fully
+            if (currentIdentifier == EMPTY_METADATA_IDENTIFIER)
+                return;
+
+            // Peer hasn't yet joined CMS fully
+            if (remoteIdentifier == EMPTY_METADATA_IDENTIFIER)
+                return;
+
+            if (currentIdentifier != remoteIdentifier)
+                throw new 
CMSIdentifierMismatchException(String.format("Cluster Metadata Identifier 
mismatch. Node is attempting to communicate with a node from a different 
cluster. Current identifier %d. Remote identifier: %d", currentIdentifier, 
remoteIdentifier));
+        }
+    }
+
     /**
      * Startup of some services may race with cluster metadata initialization. 
We allow those services to
      * gracefully handle scenarios when it is not yet initialized.
@@ -843,6 +921,9 @@ public class ClusterMetadata
             if (version.isAtLeast(Version.V1))
                 
out.writeUTF(metadata.partitioner.getClass().getCanonicalName());
 
+            if (version.isAtLeast(Version.V2))
+                out.writeUnsignedVInt32(metadata.metadataIdentifier);
+
             Epoch.serializer.serialize(metadata.epoch, out);
             out.writeUnsignedVInt(metadata.period);
             out.writeBoolean(metadata.lastInPeriod);
@@ -874,6 +955,13 @@ public class ClusterMetadata
             if (version.isAtLeast(Version.V1))
                 partitioner = FBUtilities.newPartitioner(in.readUTF());
 
+            int clusterIdentifier = EMPTY_METADATA_IDENTIFIER;
+            if (version.isAtLeast(Version.V2))
+            {
+                clusterIdentifier = in.readUnsignedVInt32();
+                checkIdentifier(clusterIdentifier);
+            }
+
             Epoch epoch = Epoch.serializer.deserialize(in);
             long period = in.readUnsignedVInt();
             boolean lastInPeriod = in.readBoolean();
@@ -896,7 +984,8 @@ public class ClusterMetadata
                 value.deserialize(in, version);
                 extensions.put(key, value);
             }
-            return new ClusterMetadata(epoch,
+            return new ClusterMetadata(clusterIdentifier,
+                                       epoch,
                                        period,
                                        lastInPeriod,
                                        partitioner,
@@ -917,6 +1006,9 @@ public class ClusterMetadata
                 size += ExtensionKey.serializer.serializedSize(entry.getKey(), 
version) +
                         entry.getValue().serializedSize(version);
 
+            if (version.isAtLeast(Version.V2))
+                size += 
TypeSizes.sizeofUnsignedVInt(metadata.metadataIdentifier);
+
             size += Epoch.serializer.serializedSize(metadata.epoch) +
                     VIntCoding.computeUnsignedVIntSize(metadata.period) +
                     TypeSizes.BOOL_SIZE +
diff --git a/src/java/org/apache/cassandra/tcm/Commit.java 
b/src/java/org/apache/cassandra/tcm/Commit.java
index 8871efa5b2..f6008f92f7 100644
--- a/src/java/org/apache/cassandra/tcm/Commit.java
+++ b/src/java/org/apache/cassandra/tcm/Commit.java
@@ -92,7 +92,11 @@ public class Commit
 
         public void serialize(Commit t, DataOutputPlus out, int version) 
throws IOException
         {
-            out.writeInt(serializationVersion.asInt());
+            out.writeUnsignedVInt32(serializationVersion.asInt());
+
+            if (serializationVersion.isAtLeast(Version.V2))
+                
out.writeUnsignedVInt32(ClusterMetadata.current().metadataIdentifier);
+
             Entry.Id.serializer.serialize(t.entryId, out, 
serializationVersion);
             Transformation.transformationSerializer.serialize(t.transform, 
out, serializationVersion);
             Epoch.serializer.serialize(t.lastKnown, out, serializationVersion);
@@ -100,7 +104,11 @@ public class Commit
 
         public Commit deserialize(DataInputPlus in, int version) throws 
IOException
         {
-            Version deserializationVersion = Version.fromInt(in.readInt());
+            Version deserializationVersion = 
Version.fromInt(in.readUnsignedVInt32());
+
+            if (deserializationVersion.isAtLeast(Version.V2))
+                ClusterMetadata.checkIdentifier(in.readUnsignedVInt32());
+
             Entry.Id entryId = Entry.Id.serializer.deserialize(in, 
deserializationVersion);
             Transformation transform = 
Transformation.transformationSerializer.deserialize(in, deserializationVersion);
             Epoch lastKnown = Epoch.serializer.deserialize(in, 
deserializationVersion);
@@ -109,7 +117,12 @@ public class Commit
 
         public long serializedSize(Commit t, int version)
         {
-            return TypeSizes.sizeof(serializationVersion.asInt()) +
+            int size = 
TypeSizes.sizeofUnsignedVInt(serializationVersion.asInt());
+
+            if (serializationVersion.isAtLeast(Version.V2))
+                size += 
TypeSizes.sizeofUnsignedVInt(ClusterMetadata.current().metadataIdentifier);
+
+            return size +
                    
Transformation.transformationSerializer.serializedSize(t.transform, 
serializationVersion) +
                    Entry.Id.serializer.serializedSize(t.entryId, 
serializationVersion) +
                    Epoch.serializer.serializedSize(t.lastKnown, 
serializationVersion);
diff --git a/src/java/org/apache/cassandra/tcm/Discovery.java 
b/src/java/org/apache/cassandra/tcm/Discovery.java
index 7bace528af..612975b0bc 100644
--- a/src/java/org/apache/cassandra/tcm/Discovery.java
+++ b/src/java/org/apache/cassandra/tcm/Discovery.java
@@ -46,10 +46,11 @@ import org.apache.cassandra.net.MessageDelivery;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.NoPayload;
 import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
 /**
  * Discovery is used to idenitify other participants in the cluster. Nodes 
send TCM_DISCOVER_REQ
  * in several rounds. Node is considered to be discovered by another node when 
it has responsed
@@ -100,29 +101,45 @@ public class Discovery
         boolean res = state.compareAndSet(State.NOT_STARTED, 
State.IN_PROGRESS);
         assert res : String.format("Can not start discovery as it is in state 
%s", state.get());
 
-        long deadline = Clock.Global.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+        long deadline = nanoTime() + 
DatabaseDescriptor.getDiscoveryTimeout(TimeUnit.NANOSECONDS);
+        long roundTimeNanos = Math.min(TimeUnit.SECONDS.toNanos(4),
+                                       
DatabaseDescriptor.getDiscoveryTimeout(TimeUnit.NANOSECONDS) / rounds);
         DiscoveredNodes last = null;
         int lastCount = discovered.size();
-        int unchangedFor = 0;
-        while (Clock.Global.nanoTime() <= deadline || unchangedFor < rounds)
+        int unchangedFor = -1;
+
+        // we run for at least DatabaseDescriptor.getDiscoveryTimeout, but 
also need 5 (by default) consecutive rounds where
+        // the discovered nodes are unchanged
+        while (nanoTime() <= deadline || unchangedFor < rounds)
         {
-            last = discoverOnce(null);
+            long startTimeNanos = nanoTime();
+            last = discoverOnce(null, roundTimeNanos, TimeUnit.NANOSECONDS);
             if (last.kind == DiscoveredNodes.Kind.CMS_ONLY)
                 break;
 
             if (lastCount == discovered.size())
+            {
                 unchangedFor++;
+            }
             else
+            {
+                unchangedFor = 0;
                 lastCount = discovered.size();
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            }
+            long sleeptimeNanos = roundTimeNanos - (nanoTime() - 
startTimeNanos);
+            if (sleeptimeNanos > 0)
+                Uninterruptibles.sleepUninterruptibly(sleeptimeNanos, 
TimeUnit.NANOSECONDS);
         }
 
         res = state.compareAndSet(State.IN_PROGRESS, State.FINISHED);
         assert res : String.format("Can not finish discovery as it is in state 
%s", state.get());
         return last;
     }
-
     public DiscoveredNodes discoverOnce(InetAddressAndPort initiator)
+    {
+        return discoverOnce(initiator, 1, TimeUnit.SECONDS);
+    }
+    public DiscoveredNodes discoverOnce(InetAddressAndPort initiator, long 
timeout, TimeUnit timeUnit)
     {
         Set<InetAddressAndPort> candidates = new HashSet<>();
         if (initiator != null)
@@ -135,7 +152,7 @@ public class Discovery
 
         candidates.remove(self);
 
-        Collection<Pair<InetAddressAndPort, DiscoveredNodes>> responses = 
MessageDelivery.fanoutAndWait(messaging.get(), candidates, 
Verb.TCM_DISCOVER_REQ, NoPayload.noPayload);
+        Collection<Pair<InetAddressAndPort, DiscoveredNodes>> responses = 
MessageDelivery.fanoutAndWait(messaging.get(), candidates, 
Verb.TCM_DISCOVER_REQ, NoPayload.noPayload, timeout, timeUnit);
 
         for (Pair<InetAddressAndPort, DiscoveredNodes> discoveredNodes : 
responses)
         {
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index 99e36fb4b2..0d5e9c02c4 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -132,9 +132,12 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
      */
     public static void initializeAsFirstCMSNode()
     {
-        
ClusterMetadataService.instance().log().bootstrap(FBUtilities.getBroadcastAddressAndPort());
-        assert ClusterMetadataService.state() == LOCAL : String.format("Can't 
initialize as node hasn't transitioned to CMS state. State: %s.\n%s", 
ClusterMetadataService.state(), ClusterMetadata.current());
-        Initialize initialize = new Initialize(ClusterMetadata.current());
+        InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
+        ClusterMetadataService.instance().log().bootstrap(addr);
+        ClusterMetadata metadata =  ClusterMetadata.current();
+        assert ClusterMetadataService.state() == LOCAL : String.format("Can't 
initialize as node hasn't transitioned to CMS state. State: %s.\n%s", 
ClusterMetadataService.state(),  metadata);
+
+        Initialize initialize = new 
Initialize(metadata.initializeClusterIdentifier(addr.hashCode()));
         ClusterMetadataService.instance().commit(initialize);
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/log/LogState.java 
b/src/java/org/apache/cassandra/tcm/log/LogState.java
index 6b8fd1a1ea..de69184a89 100644
--- a/src/java/org/apache/cassandra/tcm/log/LogState.java
+++ b/src/java/org/apache/cassandra/tcm/log/LogState.java
@@ -213,6 +213,8 @@ public class LogState
         @Override
         public void serialize(LogState t, DataOutputPlus out, Version version) 
throws IOException
         {
+            if (version.isAtLeast(Version.V2))
+                
out.writeUnsignedVInt32(ClusterMetadata.current().metadataIdentifier);
             out.writeBoolean(t.baseState != null);
             if (t.baseState != null)
                 ClusterMetadata.serializer.serialize(t.baseState, out, 
version);
@@ -224,6 +226,8 @@ public class LogState
         @Override
         public LogState deserialize(DataInputPlus in, Version version) throws 
IOException
         {
+            if (version.isAtLeast(Version.V2))
+                ClusterMetadata.checkIdentifier(in.readUnsignedVInt32());
             ClusterMetadata baseState = null;
             if (in.readBoolean())
                 baseState = ClusterMetadata.serializer.deserialize(in, 
version);
@@ -237,7 +241,11 @@ public class LogState
         @Override
         public long serializedSize(LogState t, Version version)
         {
-            long size = TypeSizes.sizeof(t.baseState != null);
+            long size = 0;
+            if (version.isAtLeast(Version.V2))
+                size += 
TypeSizes.sizeofUnsignedVInt(ClusterMetadata.current().metadataIdentifier);
+
+            size += TypeSizes.sizeof(t.baseState != null);
             if (t.baseState != null)
                 size += ClusterMetadata.serializer.serializedSize(t.baseState, 
version);
             size += TypeSizes.INT_SIZE;
diff --git 
a/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java 
b/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java
index 83e94014c8..fbb0bfa701 100644
--- a/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java
+++ b/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java
@@ -23,14 +23,14 @@ import java.io.IOException;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.membership.NodeVersion;
 import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
-import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.serialization.Version;
 
 public class ClusterMetadataHolder
 {
-    public static final ClusterMetadataHolder.Serializer 
defaultMessageSerializer = new 
ClusterMetadataHolder.Serializer(NodeVersion.CURRENT.serializationVersion());
+    public static final IVersionedSerializer<ClusterMetadataHolder> 
defaultMessageSerializer = new 
ClusterMetadataHolder.Serializer(NodeVersion.CURRENT.serializationVersion());
 
     private static volatile Serializer serializerCache;
     public static IVersionedSerializer<ClusterMetadataHolder> 
messageSerializer(Version version)
diff --git 
a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java 
b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
index 3216624f90..cfced3426b 100644
--- a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
+++ b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
@@ -31,6 +31,11 @@ import 
org.apache.cassandra.tcm.migration.ClusterMetadataHolder;
  * MessagingService and the appropriate version is not established based on the
  * peer receiving the messages, but is the lowest supported version of any 
member
  * of the cluster.
+ *
+ * NOTE: Serialization version here is used for convenience of serializing the 
message
+ * on the outgoing path. Since receiving node may have a different view of
+ * min serialization version, we _always_ have to either use a {@link 
VerboseMetadataSerializer}
+ * (like {@link LogState}/ {@link Replication} or explicitly serialize the 
version (like {@link Commit}).
  */
 public class MessageSerializers
 {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 13889543d8..cb6f26dc87 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -107,6 +107,7 @@ public class InstanceConfig implements IInstanceConfig
                 .set("endpoint_snitch", DistributedTestSnitch.class.getName())
                 .set("seed_provider", new 
ParameterizedClass(SimpleSeedProvider.class.getName(),
                         Collections.singletonMap("seeds", seedIp + ':' + 
seedPort)))
+                .set("discovery_timeout", "3s")
                 // required settings for dtest functionality
                 .set("diagnostic_events_enabled", true)
                 .set("auto_bootstrap", false)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
index 466a660273..afcbef66e0 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
@@ -153,7 +153,6 @@ public class RegisterTest extends TestBaseImpl
                     {
                         throw new RuntimeException(e);
                     }
-
                 }
                 catch (UnknownHostException e)
                 {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java
new file mode 100644
index 0000000000..9533eb393d
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tcm;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.SimpleSeedProvider;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class SplitBrainTest extends TestBaseImpl
+{
+    @Test
+    public void testSplitBrainStartup() throws IOException, TimeoutException
+    {
+        // partition the cluster in 2 parts on startup, node1, node2 in one, 
node3, node4 in the other
+        try (Cluster cluster = builder().withNodes(4)
+                                        .withConfig(config -> 
config.with(GOSSIP).with(NETWORK)
+                                                                    
.set("seed_provider", new 
IInstanceConfig.ParameterizedClass(SimpleSeedProvider.class.getName(),
+                                                                               
                                                  
Collections.singletonMap("seeds", "127.0.0.1,127.0.0.3")))
+                                                                    
.set("discovery_timeout", "1s"))
+                                        .createWithoutStarting())
+        {
+            cluster.filters().allVerbs().from(1,2).to(3,4).drop();
+            cluster.filters().allVerbs().from(3,4).to(1,2).drop();
+            List<Thread> startupThreads = new ArrayList<>(4);
+            for (int i = 0; i < 4; i++)
+            {
+                int threadNr = i + 1;
+                startupThreads.add(new Thread(() -> 
cluster.get(threadNr).startup()));
+            }
+            startupThreads.forEach(Thread::start);
+            startupThreads.forEach(SplitBrainTest::join);
+            cluster.filters().reset();
+            cluster.coordinator(1).execute(withKeyspace("create keyspace %s 
with replication = {'class':'SimpleStrategy', 'replication_factor':1}"), 
ConsistencyLevel.ALL);
+
+            cluster.get(3).logs().watchFor("Cluster Metadata Identifier 
mismatch");
+        }
+    }
+
+    private static void join(Thread t)
+    {
+        try
+        {
+            t.join();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to