This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 0ec5ef2c70 Preclude irrecoverable log corruption in case split-brain situation during leader election with absent seeds 0ec5ef2c70 is described below commit 0ec5ef2c7035fc93323816140994617a9d953956 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Wed Mar 6 07:58:18 2024 +0100 Preclude irrecoverable log corruption in case split-brain situation during leader election with absent seeds Patch by Alex Petrov; reviewed my marcuse for CASSANDRA-19153 --- src/java/org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 5 + .../net/CMSIdentifierMismatchException.java | 32 +++++++ .../cassandra/net/InboundMessageHandler.java | 16 ++++ .../org/apache/cassandra/net/MessageDelivery.java | 7 +- .../org/apache/cassandra/tcm/ClusterMetadata.java | 104 +++++++++++++++++++-- src/java/org/apache/cassandra/tcm/Commit.java | 19 +++- src/java/org/apache/cassandra/tcm/Discovery.java | 33 +++++-- src/java/org/apache/cassandra/tcm/Startup.java | 9 +- .../org/apache/cassandra/tcm/log/LogState.java | 10 +- .../tcm/migration/ClusterMetadataHolder.java | 4 +- .../tcm/serialization/MessageSerializers.java | 5 + .../cassandra/distributed/impl/InstanceConfig.java | 1 + .../distributed/test/log/RegisterTest.java | 1 - .../distributed/test/tcm/SplitBrainTest.java | 79 ++++++++++++++++ 15 files changed, 300 insertions(+), 26 deletions(-) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 1b2ea89512..75bbdd73c8 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -1296,5 +1296,6 @@ public class Config public volatile DurationSpec.LongMillisecondsBound progress_barrier_timeout = new DurationSpec.LongMillisecondsBound("3600000ms"); public volatile DurationSpec.LongMillisecondsBound progress_barrier_backoff = new DurationSpec.LongMillisecondsBound("1000ms"); + public volatile DurationSpec.LongSecondsBound discovery_timeout = new DurationSpec.LongSecondsBound("30s"); public boolean unsafe_tcm_mode = false; } diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 39ddbe5046..59f4040203 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -5134,6 +5134,11 @@ public class DatabaseDescriptor conf.progress_barrier_backoff = new DurationSpec.LongMillisecondsBound(timeOutInMillis); } + public static long getDiscoveryTimeout(TimeUnit unit) + { + return conf.discovery_timeout.to(unit); + } + public static boolean getUnsafeTCMMode() { return conf.unsafe_tcm_mode; diff --git a/src/java/org/apache/cassandra/net/CMSIdentifierMismatchException.java b/src/java/org/apache/cassandra/net/CMSIdentifierMismatchException.java new file mode 100644 index 0000000000..6f525b1a90 --- /dev/null +++ b/src/java/org/apache/cassandra/net/CMSIdentifierMismatchException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +/** + * Exception thrown in case of a CMS identifier mismatch. This should usually not happen, except rare cases of + * network partition during CMS election during initial cluster bringup. This is just a precaution to avoid + * corrupting CMS log. + */ +public class CMSIdentifierMismatchException extends RuntimeException +{ + public CMSIdentifierMismatchException(String format) + { + super(format); + } +} diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java b/src/java/org/apache/cassandra/net/InboundMessageHandler.java index ce75b67656..746c0f7a39 100644 --- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java +++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java @@ -175,6 +175,14 @@ public class InboundMessageHandler extends AbstractMessageHandler noSpamLogger.info("{} incompatible schema encountered while deserializing a message", this, e); ClusterMetadataService.instance().fetchLogFromPeerAsync(header.from, header.epoch); } + catch (CMSIdentifierMismatchException e) + { + callbacks.onFailedDeserialize(size, header, e); + logger.error("{} is a member of a different CMS group. Forcing connection close.", header.from, e); + MessagingService.instance().closeOutbound(header.from); + // Sharable bytes will be released by the frame decoder + channel.close(); + } catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); @@ -372,6 +380,14 @@ public class InboundMessageHandler extends AbstractMessageHandler callbacks.onFailedDeserialize(size, header, e); noSpamLogger.info("{} incompatible schema encountered while deserializing a message", InboundMessageHandler.this, e); } + catch (CMSIdentifierMismatchException e) + { + callbacks.onFailedDeserialize(size, header, e); + noSpamLogger.info("{} is a member of a different CMS group, and should not be tried. Forcing connection close.", header.from); + // Sharable bytes will be released by the frame decoder + channel.close(); + MessagingService.instance().closeOutbound(header.from); + } catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); diff --git a/src/java/org/apache/cassandra/net/MessageDelivery.java b/src/java/org/apache/cassandra/net/MessageDelivery.java index cd2ea96d1f..e526b5ba3b 100644 --- a/src/java/org/apache/cassandra/net/MessageDelivery.java +++ b/src/java/org/apache/cassandra/net/MessageDelivery.java @@ -36,8 +36,11 @@ import org.apache.cassandra.utils.concurrent.Future; public interface MessageDelivery { Logger logger = LoggerFactory.getLogger(MessageDelivery.class); - static <REQ, RSP> Collection<Pair<InetAddressAndPort, RSP>> fanoutAndWait(MessageDelivery messaging, Set<InetAddressAndPort> sendTo, Verb verb, REQ payload) + { + return fanoutAndWait(messaging, sendTo, verb, payload, DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + } + static <REQ, RSP> Collection<Pair<InetAddressAndPort, RSP>> fanoutAndWait(MessageDelivery messaging, Set<InetAddressAndPort> sendTo, Verb verb, REQ payload, long timeout, TimeUnit timeUnit) { Accumulator<Pair<InetAddressAndPort, RSP>> responses = new Accumulator<>(sendTo.size()); CountDownLatch cdl = CountDownLatch.newCountDownLatch(sendTo.size()); @@ -63,7 +66,7 @@ public interface MessageDelivery logger.info("Election for metadata migration sending {} ({}) to {}", verb, payload.toString(), ep); messaging.sendWithCallback(Message.out(verb, payload), ep, callback); }); - cdl.awaitUninterruptibly(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + cdl.awaitUninterruptibly(timeout, timeUnit); return responses.snapshot(); } diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 3c7144eee4..5e6b9a6060 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -45,6 +45,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.net.CMSIdentifierMismatchException; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Keyspaces; @@ -77,8 +78,11 @@ import static org.apache.cassandra.db.TypeSizes.sizeof; public class ClusterMetadata { + public static final int EMPTY_METADATA_IDENTIFIER = 0; public static final Serializer serializer = new Serializer(); + public final int metadataIdentifier; + public final Epoch epoch; public final long period; public final boolean lastInPeriod; @@ -110,7 +114,8 @@ public class ClusterMetadata @VisibleForTesting public ClusterMetadata(IPartitioner partitioner, Directory directory, DistributedSchema schema) { - this(Epoch.EMPTY, + this(EMPTY_METADATA_IDENTIFIER, + Epoch.EMPTY, Period.EMPTY, true, partitioner, @@ -134,11 +139,39 @@ public class ClusterMetadata LockedRanges lockedRanges, InProgressSequences inProgressSequences, Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions) + { + this(EMPTY_METADATA_IDENTIFIER, + epoch, + period, + lastInPeriod, + partitioner, + schema, + directory, + tokenMap, + placements, + lockedRanges, + inProgressSequences, + extensions); + } + + private ClusterMetadata(int metadataIdentifier, + Epoch epoch, + long period, + boolean lastInPeriod, + IPartitioner partitioner, + DistributedSchema schema, + Directory directory, + TokenMap tokenMap, + DataPlacements placements, + LockedRanges lockedRanges, + InProgressSequences inProgressSequences, + Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions) { // TODO: token map is a feature of the specific placement strategy, and so may not be a relevant component of // ClusterMetadata in the long term. We need to consider how the actual components of metadata can be evolved // over time. assert tokenMap == null || tokenMap.partitioner().getClass().equals(partitioner.getClass()) : "Partitioner for TokenMap doesn't match base partitioner"; + this.metadataIdentifier = metadataIdentifier; this.epoch = epoch; this.period = period; this.lastInPeriod = lastInPeriod; @@ -192,7 +225,8 @@ public class ClusterMetadata // increments the published epoch by one. As each component has its own last // modified epoch, we may also need to coerce those, but only if they are // greater than the epoch we're forcing here. - return new ClusterMetadata(epoch, + return new ClusterMetadata(metadataIdentifier, + epoch, period, lastInPeriod, partitioner, @@ -205,9 +239,32 @@ public class ClusterMetadata capLastModified(extensions, epoch)); } + public ClusterMetadata initializeClusterIdentifier(int clusterIdentifier) + { + if (this.metadataIdentifier != EMPTY_METADATA_IDENTIFIER) + throw new IllegalStateException(String.format("Can only initialize cluster identifier once, but it was already set to %d", this.metadataIdentifier)); + + if (clusterIdentifier == EMPTY_METADATA_IDENTIFIER) + throw new IllegalArgumentException("Can not initialize cluster with empty cluster identifier"); + + return new ClusterMetadata(clusterIdentifier, + epoch, + period, + lastInPeriod, + partitioner, + schema, + directory, + tokenMap, + placements, + lockedRanges, + inProgressSequences, + extensions); + } + public ClusterMetadata forcePeriod(long period) { - return new ClusterMetadata(epoch, + return new ClusterMetadata(metadataIdentifier, + epoch, period, false, partitioner, @@ -591,7 +648,8 @@ public class ClusterMetadata inProgressSequences = inProgressSequences.withLastModified(epoch); } - return new Transformed(new ClusterMetadata(epoch, + return new Transformed(new ClusterMetadata(base.metadataIdentifier, + epoch, period, lastInPeriod, partitioner, @@ -607,7 +665,8 @@ public class ClusterMetadata public ClusterMetadata buildForGossipMode() { - return new ClusterMetadata(Epoch.UPGRADE_GOSSIP, + return new ClusterMetadata(base.metadataIdentifier, + Epoch.UPGRADE_GOSSIP, Period.EMPTY, true, partitioner, @@ -805,6 +864,25 @@ public class ClusterMetadata return ClusterMetadataService.instance().metadata(); } + public static void checkIdentifier(int remoteIdentifier) + { + ClusterMetadata metadata = currentNullable(); + if (metadata != null) + { + int currentIdentifier = metadata.metadataIdentifier; + // We haven't yet joined CMS fully + if (currentIdentifier == EMPTY_METADATA_IDENTIFIER) + return; + + // Peer hasn't yet joined CMS fully + if (remoteIdentifier == EMPTY_METADATA_IDENTIFIER) + return; + + if (currentIdentifier != remoteIdentifier) + throw new CMSIdentifierMismatchException(String.format("Cluster Metadata Identifier mismatch. Node is attempting to communicate with a node from a different cluster. Current identifier %d. Remote identifier: %d", currentIdentifier, remoteIdentifier)); + } + } + /** * Startup of some services may race with cluster metadata initialization. We allow those services to * gracefully handle scenarios when it is not yet initialized. @@ -843,6 +921,9 @@ public class ClusterMetadata if (version.isAtLeast(Version.V1)) out.writeUTF(metadata.partitioner.getClass().getCanonicalName()); + if (version.isAtLeast(Version.V2)) + out.writeUnsignedVInt32(metadata.metadataIdentifier); + Epoch.serializer.serialize(metadata.epoch, out); out.writeUnsignedVInt(metadata.period); out.writeBoolean(metadata.lastInPeriod); @@ -874,6 +955,13 @@ public class ClusterMetadata if (version.isAtLeast(Version.V1)) partitioner = FBUtilities.newPartitioner(in.readUTF()); + int clusterIdentifier = EMPTY_METADATA_IDENTIFIER; + if (version.isAtLeast(Version.V2)) + { + clusterIdentifier = in.readUnsignedVInt32(); + checkIdentifier(clusterIdentifier); + } + Epoch epoch = Epoch.serializer.deserialize(in); long period = in.readUnsignedVInt(); boolean lastInPeriod = in.readBoolean(); @@ -896,7 +984,8 @@ public class ClusterMetadata value.deserialize(in, version); extensions.put(key, value); } - return new ClusterMetadata(epoch, + return new ClusterMetadata(clusterIdentifier, + epoch, period, lastInPeriod, partitioner, @@ -917,6 +1006,9 @@ public class ClusterMetadata size += ExtensionKey.serializer.serializedSize(entry.getKey(), version) + entry.getValue().serializedSize(version); + if (version.isAtLeast(Version.V2)) + size += TypeSizes.sizeofUnsignedVInt(metadata.metadataIdentifier); + size += Epoch.serializer.serializedSize(metadata.epoch) + VIntCoding.computeUnsignedVIntSize(metadata.period) + TypeSizes.BOOL_SIZE + diff --git a/src/java/org/apache/cassandra/tcm/Commit.java b/src/java/org/apache/cassandra/tcm/Commit.java index 8871efa5b2..f6008f92f7 100644 --- a/src/java/org/apache/cassandra/tcm/Commit.java +++ b/src/java/org/apache/cassandra/tcm/Commit.java @@ -92,7 +92,11 @@ public class Commit public void serialize(Commit t, DataOutputPlus out, int version) throws IOException { - out.writeInt(serializationVersion.asInt()); + out.writeUnsignedVInt32(serializationVersion.asInt()); + + if (serializationVersion.isAtLeast(Version.V2)) + out.writeUnsignedVInt32(ClusterMetadata.current().metadataIdentifier); + Entry.Id.serializer.serialize(t.entryId, out, serializationVersion); Transformation.transformationSerializer.serialize(t.transform, out, serializationVersion); Epoch.serializer.serialize(t.lastKnown, out, serializationVersion); @@ -100,7 +104,11 @@ public class Commit public Commit deserialize(DataInputPlus in, int version) throws IOException { - Version deserializationVersion = Version.fromInt(in.readInt()); + Version deserializationVersion = Version.fromInt(in.readUnsignedVInt32()); + + if (deserializationVersion.isAtLeast(Version.V2)) + ClusterMetadata.checkIdentifier(in.readUnsignedVInt32()); + Entry.Id entryId = Entry.Id.serializer.deserialize(in, deserializationVersion); Transformation transform = Transformation.transformationSerializer.deserialize(in, deserializationVersion); Epoch lastKnown = Epoch.serializer.deserialize(in, deserializationVersion); @@ -109,7 +117,12 @@ public class Commit public long serializedSize(Commit t, int version) { - return TypeSizes.sizeof(serializationVersion.asInt()) + + int size = TypeSizes.sizeofUnsignedVInt(serializationVersion.asInt()); + + if (serializationVersion.isAtLeast(Version.V2)) + size += TypeSizes.sizeofUnsignedVInt(ClusterMetadata.current().metadataIdentifier); + + return size + Transformation.transformationSerializer.serializedSize(t.transform, serializationVersion) + Entry.Id.serializer.serializedSize(t.entryId, serializationVersion) + Epoch.serializer.serializedSize(t.lastKnown, serializationVersion); diff --git a/src/java/org/apache/cassandra/tcm/Discovery.java b/src/java/org/apache/cassandra/tcm/Discovery.java index 7bace528af..612975b0bc 100644 --- a/src/java/org/apache/cassandra/tcm/Discovery.java +++ b/src/java/org/apache/cassandra/tcm/Discovery.java @@ -46,10 +46,11 @@ import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.NoPayload; import org.apache.cassandra.net.Verb; -import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; + /** * Discovery is used to idenitify other participants in the cluster. Nodes send TCM_DISCOVER_REQ * in several rounds. Node is considered to be discovered by another node when it has responsed @@ -100,29 +101,45 @@ public class Discovery boolean res = state.compareAndSet(State.NOT_STARTED, State.IN_PROGRESS); assert res : String.format("Can not start discovery as it is in state %s", state.get()); - long deadline = Clock.Global.nanoTime() + TimeUnit.SECONDS.toNanos(10); + long deadline = nanoTime() + DatabaseDescriptor.getDiscoveryTimeout(TimeUnit.NANOSECONDS); + long roundTimeNanos = Math.min(TimeUnit.SECONDS.toNanos(4), + DatabaseDescriptor.getDiscoveryTimeout(TimeUnit.NANOSECONDS) / rounds); DiscoveredNodes last = null; int lastCount = discovered.size(); - int unchangedFor = 0; - while (Clock.Global.nanoTime() <= deadline || unchangedFor < rounds) + int unchangedFor = -1; + + // we run for at least DatabaseDescriptor.getDiscoveryTimeout, but also need 5 (by default) consecutive rounds where + // the discovered nodes are unchanged + while (nanoTime() <= deadline || unchangedFor < rounds) { - last = discoverOnce(null); + long startTimeNanos = nanoTime(); + last = discoverOnce(null, roundTimeNanos, TimeUnit.NANOSECONDS); if (last.kind == DiscoveredNodes.Kind.CMS_ONLY) break; if (lastCount == discovered.size()) + { unchangedFor++; + } else + { + unchangedFor = 0; lastCount = discovered.size(); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + long sleeptimeNanos = roundTimeNanos - (nanoTime() - startTimeNanos); + if (sleeptimeNanos > 0) + Uninterruptibles.sleepUninterruptibly(sleeptimeNanos, TimeUnit.NANOSECONDS); } res = state.compareAndSet(State.IN_PROGRESS, State.FINISHED); assert res : String.format("Can not finish discovery as it is in state %s", state.get()); return last; } - public DiscoveredNodes discoverOnce(InetAddressAndPort initiator) + { + return discoverOnce(initiator, 1, TimeUnit.SECONDS); + } + public DiscoveredNodes discoverOnce(InetAddressAndPort initiator, long timeout, TimeUnit timeUnit) { Set<InetAddressAndPort> candidates = new HashSet<>(); if (initiator != null) @@ -135,7 +152,7 @@ public class Discovery candidates.remove(self); - Collection<Pair<InetAddressAndPort, DiscoveredNodes>> responses = MessageDelivery.fanoutAndWait(messaging.get(), candidates, Verb.TCM_DISCOVER_REQ, NoPayload.noPayload); + Collection<Pair<InetAddressAndPort, DiscoveredNodes>> responses = MessageDelivery.fanoutAndWait(messaging.get(), candidates, Verb.TCM_DISCOVER_REQ, NoPayload.noPayload, timeout, timeUnit); for (Pair<InetAddressAndPort, DiscoveredNodes> discoveredNodes : responses) { diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 99e36fb4b2..0d5e9c02c4 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -132,9 +132,12 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; */ public static void initializeAsFirstCMSNode() { - ClusterMetadataService.instance().log().bootstrap(FBUtilities.getBroadcastAddressAndPort()); - assert ClusterMetadataService.state() == LOCAL : String.format("Can't initialize as node hasn't transitioned to CMS state. State: %s.\n%s", ClusterMetadataService.state(), ClusterMetadata.current()); - Initialize initialize = new Initialize(ClusterMetadata.current()); + InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort(); + ClusterMetadataService.instance().log().bootstrap(addr); + ClusterMetadata metadata = ClusterMetadata.current(); + assert ClusterMetadataService.state() == LOCAL : String.format("Can't initialize as node hasn't transitioned to CMS state. State: %s.\n%s", ClusterMetadataService.state(), metadata); + + Initialize initialize = new Initialize(metadata.initializeClusterIdentifier(addr.hashCode())); ClusterMetadataService.instance().commit(initialize); } diff --git a/src/java/org/apache/cassandra/tcm/log/LogState.java b/src/java/org/apache/cassandra/tcm/log/LogState.java index 6b8fd1a1ea..de69184a89 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogState.java +++ b/src/java/org/apache/cassandra/tcm/log/LogState.java @@ -213,6 +213,8 @@ public class LogState @Override public void serialize(LogState t, DataOutputPlus out, Version version) throws IOException { + if (version.isAtLeast(Version.V2)) + out.writeUnsignedVInt32(ClusterMetadata.current().metadataIdentifier); out.writeBoolean(t.baseState != null); if (t.baseState != null) ClusterMetadata.serializer.serialize(t.baseState, out, version); @@ -224,6 +226,8 @@ public class LogState @Override public LogState deserialize(DataInputPlus in, Version version) throws IOException { + if (version.isAtLeast(Version.V2)) + ClusterMetadata.checkIdentifier(in.readUnsignedVInt32()); ClusterMetadata baseState = null; if (in.readBoolean()) baseState = ClusterMetadata.serializer.deserialize(in, version); @@ -237,7 +241,11 @@ public class LogState @Override public long serializedSize(LogState t, Version version) { - long size = TypeSizes.sizeof(t.baseState != null); + long size = 0; + if (version.isAtLeast(Version.V2)) + size += TypeSizes.sizeofUnsignedVInt(ClusterMetadata.current().metadataIdentifier); + + size += TypeSizes.sizeof(t.baseState != null); if (t.baseState != null) size += ClusterMetadata.serializer.serializedSize(t.baseState, version); size += TypeSizes.INT_SIZE; diff --git a/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java b/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java index 83e94014c8..fbb0bfa701 100644 --- a/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java +++ b/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java @@ -23,14 +23,14 @@ import java.io.IOException; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer; -import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.serialization.Version; public class ClusterMetadataHolder { - public static final ClusterMetadataHolder.Serializer defaultMessageSerializer = new ClusterMetadataHolder.Serializer(NodeVersion.CURRENT.serializationVersion()); + public static final IVersionedSerializer<ClusterMetadataHolder> defaultMessageSerializer = new ClusterMetadataHolder.Serializer(NodeVersion.CURRENT.serializationVersion()); private static volatile Serializer serializerCache; public static IVersionedSerializer<ClusterMetadataHolder> messageSerializer(Version version) diff --git a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java index 3216624f90..cfced3426b 100644 --- a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java +++ b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java @@ -31,6 +31,11 @@ import org.apache.cassandra.tcm.migration.ClusterMetadataHolder; * MessagingService and the appropriate version is not established based on the * peer receiving the messages, but is the lowest supported version of any member * of the cluster. + * + * NOTE: Serialization version here is used for convenience of serializing the message + * on the outgoing path. Since receiving node may have a different view of + * min serialization version, we _always_ have to either use a {@link VerboseMetadataSerializer} + * (like {@link LogState}/ {@link Replication} or explicitly serialize the version (like {@link Commit}). */ public class MessageSerializers { diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index 13889543d8..cb6f26dc87 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@ -107,6 +107,7 @@ public class InstanceConfig implements IInstanceConfig .set("endpoint_snitch", DistributedTestSnitch.class.getName()) .set("seed_provider", new ParameterizedClass(SimpleSeedProvider.class.getName(), Collections.singletonMap("seeds", seedIp + ':' + seedPort))) + .set("discovery_timeout", "3s") // required settings for dtest functionality .set("diagnostic_events_enabled", true) .set("auto_bootstrap", false) diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java index 466a660273..afcbef66e0 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java @@ -153,7 +153,6 @@ public class RegisterTest extends TestBaseImpl { throw new RuntimeException(e); } - } catch (UnknownHostException e) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java new file mode 100644 index 0000000000..9533eb393d --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tcm; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.locator.SimpleSeedProvider; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class SplitBrainTest extends TestBaseImpl +{ + @Test + public void testSplitBrainStartup() throws IOException, TimeoutException + { + // partition the cluster in 2 parts on startup, node1, node2 in one, node3, node4 in the other + try (Cluster cluster = builder().withNodes(4) + .withConfig(config -> config.with(GOSSIP).with(NETWORK) + .set("seed_provider", new IInstanceConfig.ParameterizedClass(SimpleSeedProvider.class.getName(), + Collections.singletonMap("seeds", "127.0.0.1,127.0.0.3"))) + .set("discovery_timeout", "1s")) + .createWithoutStarting()) + { + cluster.filters().allVerbs().from(1,2).to(3,4).drop(); + cluster.filters().allVerbs().from(3,4).to(1,2).drop(); + List<Thread> startupThreads = new ArrayList<>(4); + for (int i = 0; i < 4; i++) + { + int threadNr = i + 1; + startupThreads.add(new Thread(() -> cluster.get(threadNr).startup())); + } + startupThreads.forEach(Thread::start); + startupThreads.forEach(SplitBrainTest::join); + cluster.filters().reset(); + cluster.coordinator(1).execute(withKeyspace("create keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':1}"), ConsistencyLevel.ALL); + + cluster.get(3).logs().watchFor("Cluster Metadata Identifier mismatch"); + } + } + + private static void join(Thread t) + { + try + { + t.join(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org