This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 415eaffb9cc50df5a12a330e14027ef60c07ee02 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Fri Jan 31 10:02:49 2025 +0100 Reduce heap pressure when initializing CMS Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20267 --- CHANGES.txt | 1 + NEWS.txt | 3 +- src/java/org/apache/cassandra/net/Verb.java | 8 +- .../apache/cassandra/schema/SchemaKeyspace.java | 28 ++++ .../cassandra/tcm/ClusterMetadataService.java | 2 +- src/java/org/apache/cassandra/tcm/Startup.java | 7 +- .../tcm/migration/CMSInitializationRequest.java | 169 +++++++++++++++++++++ .../tcm/migration/CMSInitializationResponse.java | 74 +++++++++ .../tcm/migration/ClusterMetadataHolder.java | 95 ------------ .../apache/cassandra/tcm/migration/Election.java | 160 +++++++------------ .../tcm/serialization/MessageSerializers.java | 8 +- .../ClusterMetadataUpgradeIgnoreHostsTest.java | 50 ------ .../upgrade/ClusterMetadataUpgradeTest.java | 36 +++++ 13 files changed, 379 insertions(+), 262 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 630a13c373..685d86119b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Reduce heap pressure when initializing CMS (CASSANDRA-20267) * Paxos Repair: NoSuchElementException on DistributedSchema.getKeyspaceMetadata (CASSANDRA-20320) * Improve performance of DistributedSchema.validate for large schemas (CASSANDRA-20360) * Add JSON constraint (CASSANDRA-20273) diff --git a/NEWS.txt b/NEWS.txt index bb8898d53b..b35a3c0274 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -149,7 +149,8 @@ Upgrading However, nodes still UP and running the old version will. This will eventually cause the migration to fail, as the cluster will not be in agreement. - > nodetool cms initialize - Got mismatching cluster metadatas from [/x.x.x.x:7000] aborting migration + Got mismatching cluster metadatas. Check logs on peers ([/x.x.x.x:7000]) for details of mismatches. + Aborting migration. See 'nodetool help' or 'nodetool help <command>'. If the cms initialize command fails, it will indicate which nodes’ current metadata does not agree with the node where the command was executed. To mitigate this situation, bring any mismatching nodes DOWN and rerun the diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 17c4550fc0..c2cce663ef 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -94,7 +94,9 @@ import org.apache.cassandra.tcm.Discovery; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.FetchCMSLog; import org.apache.cassandra.tcm.FetchPeerLog; +import org.apache.cassandra.tcm.migration.CMSInitializationResponse; import org.apache.cassandra.tcm.migration.Election; +import org.apache.cassandra.tcm.migration.CMSInitializationRequest; import org.apache.cassandra.tcm.sequences.DataMovements; import org.apache.cassandra.tcm.serialization.MessageSerializers; import org.apache.cassandra.utils.BooleanSerializer; @@ -232,9 +234,9 @@ public enum Verb TCM_NOTIFY_RSP (806, P0, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, () -> ResponseVerbHandler.instance ), TCM_NOTIFY_REQ (807, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> logNotifyHandler(), TCM_NOTIFY_RSP ), TCM_CURRENT_EPOCH_REQ (808, P0, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, () -> currentEpochRequestHandler(), TCM_NOTIFY_RSP ), - TCM_INIT_MIG_RSP (809, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::metadataHolderSerializer, () -> ResponseVerbHandler.instance ), - TCM_INIT_MIG_REQ (810, P0, rpcTimeout, INTERNAL_METADATA, () -> Election.Initiator.serializer, () -> Election.instance.prepareHandler, TCM_INIT_MIG_RSP ), - TCM_ABORT_MIG (811, P0, rpcTimeout, INTERNAL_METADATA, () -> Election.Initiator.serializer, () -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ), + TCM_INIT_MIG_RSP (809, P0, rpcTimeout, INTERNAL_METADATA, () -> CMSInitializationResponse.serializer, () -> ResponseVerbHandler.instance ), + TCM_INIT_MIG_REQ (810, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::initRequestSerializer, () -> Election.instance.prepareHandler, TCM_INIT_MIG_RSP ), + TCM_ABORT_MIG (811, P0, rpcTimeout, INTERNAL_METADATA, () -> CMSInitializationRequest.Initiator.serializer,() -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ), TCM_DISCOVER_RSP (812, P0, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, () -> ResponseVerbHandler.instance ), TCM_DISCOVER_REQ (813, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ), TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout, FETCH_LOG, MessageSerializers::logStateSerializer, () -> ResponseVerbHandler.instance ), diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index a6877cc631..b3ed1c2702 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -364,6 +364,34 @@ public final class SchemaKeyspace ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush(ColumnFamilyStore.FlushReason.INTERNALLY_FORCED))); } + /** + * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest + * will be converted into UUID which would act as content-based version of the schema. + * + * Only used when initializing the CMS + */ + public static UUID calculateSchemaDigest() + { + Digest digest = Digest.forSchema(); + for (String table : ALL) + { + ReadCommand cmd = getReadCommandForTableSchema(table); + try (ReadExecutionController executionController = cmd.executionController(); + PartitionIterator schema = cmd.executeInternal(executionController)) + { + while (schema.hasNext()) + { + try (RowIterator partition = schema.next()) + { + if (!isSystemKeyspaceSchemaPartition(partition.partitionKey())) + RowIterators.digest(partition, digest); + } + } + } + } + return UUID.nameUUIDFromBytes(digest.digest()); + } + /** * @param schemaTableName The name of the table responsible for part of the schema * @return CFS responsible to hold low-level serialized schema diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 555cdb7f93..8adfb81051 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -362,7 +362,7 @@ public class ClusterMetadataService !ignored.contains(ep)) .collect(toImmutableSet()); - Election.instance.nominateSelf(candidates, ignored, metadata::equals, metadata); + Election.instance.nominateSelf(candidates, ignored, metadata, true); ClusterMetadataService.instance().triggerSnapshot(); } else diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index b184866f57..151d24ab90 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -59,6 +59,7 @@ import org.apache.cassandra.tcm.log.SystemKeyspaceStorage; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.migration.Election; +import org.apache.cassandra.tcm.migration.CMSInitializationRequest; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; import org.apache.cassandra.tcm.sequences.InProgressSequences; import org.apache.cassandra.tcm.sequences.ReconfigureCMS; @@ -229,8 +230,8 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; { Election.instance.nominateSelf(candidates.nodes(), Collections.singleton(FBUtilities.getBroadcastAddressAndPort()), - (cm) -> true, - null); + ClusterMetadata.current(), + false); } } @@ -243,7 +244,7 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; } else { - Election.Initiator initiator = Election.instance.initiator(); + CMSInitializationRequest.Initiator initiator = Election.instance.initiator(); candidates = Discovery.instance.discoverOnce(initiator == null ? null : initiator.initiator); } Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); diff --git a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java new file mode 100644 index 0000000000..dac50e5edb --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.migration; + +import java.io.IOException; +import java.util.Objects; +import java.util.UUID; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.TokenMap; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.UUIDSerializer; + +public class CMSInitializationRequest +{ + public static final IVersionedSerializer<CMSInitializationRequest> defaultMessageSerializer = new Serializer(NodeVersion.CURRENT.serializationVersion()); + + private static volatile Serializer serializerCache; + + public static IVersionedSerializer<CMSInitializationRequest> messageSerializer(Version version) + { + Serializer cached = serializerCache; + if (cached != null && cached.serializationVersion.equals(version)) + return cached; + cached = new Serializer(version); + serializerCache = cached; + return cached; + } + + public final Initiator initiator; + public final Directory directory; + public final TokenMap tokenMap; + public final UUID schemaVersion; + + public CMSInitializationRequest(InetAddressAndPort initiator, UUID initToken, ClusterMetadata metadata) + { + this(new Initiator(initiator, initToken), metadata.directory, metadata.tokenMap, SchemaKeyspace.calculateSchemaDigest()); + } + + public CMSInitializationRequest(Initiator initiator, Directory directory, TokenMap tokenMap, UUID schemaVersion) + { + this.initiator = initiator; + this.directory = directory; + this.tokenMap = tokenMap; + this.schemaVersion = schemaVersion; + } + + public static class Serializer implements IVersionedSerializer<CMSInitializationRequest> + { + private final Version serializationVersion; + + public Serializer(Version serializationVersion) + { + this.serializationVersion = serializationVersion; + } + + @Override + public void serialize(CMSInitializationRequest t, DataOutputPlus out, int version) throws IOException + { + Initiator.serializer.serialize(t.initiator, out, version); + Directory.serializer.serialize(t.directory, out, serializationVersion); + TokenMap.serializer.serialize(t.tokenMap, out, serializationVersion); + UUIDSerializer.serializer.serialize(t.schemaVersion, out, version); + } + + @Override + public CMSInitializationRequest deserialize(DataInputPlus in, int version) throws IOException + { + Initiator initiator = Initiator.serializer.deserialize(in, version); + Directory directory = Directory.serializer.deserialize(in, serializationVersion); + TokenMap tokenMap = TokenMap.serializer.deserialize(in, serializationVersion); + UUID schemaVersion = UUIDSerializer.serializer.deserialize(in, version); + return new CMSInitializationRequest(initiator, directory, tokenMap, schemaVersion); + } + + @Override + public long serializedSize(CMSInitializationRequest t, int version) + { + return Initiator.serializer.serializedSize(t.initiator, version) + + Directory.serializer.serializedSize(t.directory, serializationVersion) + + TokenMap.serializer.serializedSize(t.tokenMap, serializationVersion) + + UUIDSerializer.serializer.serializedSize(t.schemaVersion, version); + } + } + + public static class Initiator + { + public static final Serializer serializer = new Serializer(); + public final InetAddressAndPort initiator; + public final UUID initToken; + + public Initiator(InetAddressAndPort initiator, UUID initToken) + { + this.initiator = initiator; + this.initToken = initToken; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof Initiator)) return false; + Initiator other = (Initiator) o; + return Objects.equals(initiator, other.initiator) && Objects.equals(initToken, other.initToken); + } + + @Override + public int hashCode() + { + return Objects.hash(initiator, initToken); + } + + @Override + public String toString() + { + return "Initiator{" + + "initiator=" + initiator + + ", initToken=" + initToken + + '}'; + } + + public static class Serializer implements IVersionedSerializer<Initiator> + { + @Override + public void serialize(Initiator t, DataOutputPlus out, int version) throws IOException + { + InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.initiator, out, version); + UUIDSerializer.serializer.serialize(t.initToken, out, version); + } + + @Override + public Initiator deserialize(DataInputPlus in, int version) throws IOException + { + return new Initiator(InetAddressAndPort.Serializer.inetAddressAndPortSerializer.deserialize(in, version), + UUIDSerializer.serializer.deserialize(in, version)); + } + + @Override + public long serializedSize(Initiator t, int version) + { + return InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.initiator, version) + + UUIDSerializer.serializer.serializedSize(t.initToken, version); + } + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationResponse.java b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationResponse.java new file mode 100644 index 0000000000..73502fdb63 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationResponse.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.migration; + +import java.io.IOException; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class CMSInitializationResponse +{ + public static final IVersionedSerializer<CMSInitializationResponse> serializer = new Serializer(); + + public final CMSInitializationRequest.Initiator initiator; + public final boolean metadataMatches; + + public CMSInitializationResponse(CMSInitializationRequest.Initiator initiator, boolean metadataMatches) + { + this.initiator = initiator; + this.metadataMatches = metadataMatches; + } + + @Override + public String toString() + { + return "CMSInitializationResponse{" + + "initiator=" + initiator + + ", metadataMatches=" + metadataMatches + + '}'; + } + + private static class Serializer implements IVersionedSerializer<CMSInitializationResponse> + { + @Override + public void serialize(CMSInitializationResponse t, DataOutputPlus out, int version) throws IOException + { + CMSInitializationRequest.Initiator.serializer.serialize(t.initiator, out, version); + out.writeBoolean(t.metadataMatches); + } + + @Override + public CMSInitializationResponse deserialize(DataInputPlus in, int version) throws IOException + { + CMSInitializationRequest.Initiator coordinator = CMSInitializationRequest.Initiator.serializer.deserialize(in, version); + boolean metadataMatches = in.readBoolean(); + return new CMSInitializationResponse(coordinator, metadataMatches); + } + + @Override + public long serializedSize(CMSInitializationResponse t, int version) + { + return CMSInitializationRequest.Initiator.serializer.serializedSize(t.initiator, version) + + TypeSizes.sizeof(t.metadataMatches); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java b/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java deleted file mode 100644 index fbb0bfa701..0000000000 --- a/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.tcm.migration; - -import java.io.IOException; - -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.tcm.membership.NodeVersion; -import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer; -import org.apache.cassandra.tcm.serialization.Version; - -public class ClusterMetadataHolder -{ - public static final IVersionedSerializer<ClusterMetadataHolder> defaultMessageSerializer = new ClusterMetadataHolder.Serializer(NodeVersion.CURRENT.serializationVersion()); - - private static volatile Serializer serializerCache; - public static IVersionedSerializer<ClusterMetadataHolder> messageSerializer(Version version) - { - Serializer cached = serializerCache; - if (cached != null && cached.serializationVersion.equals(version)) - return cached; - cached = new Serializer(version); - serializerCache = cached; - return cached; - } - - public final Election.Initiator coordinator; - public final ClusterMetadata metadata; - - public ClusterMetadataHolder(Election.Initiator coordinator, ClusterMetadata metadata) - { - this.coordinator = coordinator; - this.metadata = metadata; - } - - @Override - public String toString() - { - return "ClusterMetadataHolder{" + - "coordinator=" + coordinator + - ", epoch=" + metadata.epoch + - '}'; - } - - private static class Serializer implements IVersionedSerializer<ClusterMetadataHolder> - { - private final Version serializationVersion; - - public Serializer(Version serializationVersion) - { - this.serializationVersion = serializationVersion; - } - - @Override - public void serialize(ClusterMetadataHolder t, DataOutputPlus out, int version) throws IOException - { - Election.Initiator.serializer.serialize(t.coordinator, out, version); - VerboseMetadataSerializer.serialize(ClusterMetadata.serializer, t.metadata, out, serializationVersion); - } - - @Override - public ClusterMetadataHolder deserialize(DataInputPlus in, int version) throws IOException - { - Election.Initiator coordinator = Election.Initiator.serializer.deserialize(in, version); - ClusterMetadata metadata = VerboseMetadataSerializer.deserialize(ClusterMetadata.serializer, in); - return new ClusterMetadataHolder(coordinator, metadata); - } - - @Override - public long serializedSize(ClusterMetadataHolder t, int version) - { - return Election.Initiator.serializer.serializedSize(t.coordinator, version) + - VerboseMetadataSerializer.serializedSize(ClusterMetadata.serializer, t.metadata, serializationVersion); - } - } -} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java b/src/java/org/apache/cassandra/tcm/migration/Election.java index 2bd2f7a392..04e1a8fa4c 100644 --- a/src/java/org/apache/cassandra/tcm/migration/Election.java +++ b/src/java/org/apache/cassandra/tcm/migration/Election.java @@ -20,14 +20,11 @@ package org.apache.cassandra.tcm.migration; import java.io.IOException; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.stream.Collectors; import com.google.common.collect.Sets; @@ -36,12 +33,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Startup; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.ownership.TokenMap; import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.IVerbHandler; @@ -52,7 +49,6 @@ import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.UUIDSerializer; /** * Election process establishes initial CMS leader, from which you can further evolve cluster metadata. @@ -60,9 +56,9 @@ import org.apache.cassandra.utils.UUIDSerializer; public class Election { private static final Logger logger = LoggerFactory.getLogger(Election.class); - private static final Initiator MIGRATED = new Initiator(null, null); + private static final CMSInitializationRequest.Initiator MIGRATED = new CMSInitializationRequest.Initiator(null, null); - private final AtomicReference<Initiator> initiator = new AtomicReference<>(); + private final AtomicReference<CMSInitializationRequest.Initiator> initiator = new AtomicReference<>(); public static Election instance = new Election(); @@ -83,7 +79,7 @@ public class Election this.abortHandler = new AbortHandler(); } - public void nominateSelf(Set<InetAddressAndPort> candidates, Set<InetAddressAndPort> ignoredEndpoints, Function<ClusterMetadata, Boolean> isMatch, ClusterMetadata metadata) + public void nominateSelf(Set<InetAddressAndPort> candidates, Set<InetAddressAndPort> ignoredEndpoints, ClusterMetadata metadata, boolean verifyAllPeersMetadata) { Set<InetAddressAndPort> sendTo = new HashSet<>(candidates); sendTo.removeAll(ignoredEndpoints); @@ -91,7 +87,7 @@ public class Election try { - initiate(sendTo, isMatch, metadata); + initiate(sendTo, metadata, verifyAllPeersMetadata); finish(sendTo); } catch (Exception e) @@ -101,13 +97,14 @@ public class Election } } - private void initiate(Set<InetAddressAndPort> sendTo, Function<ClusterMetadata, Boolean> isMatch, ClusterMetadata metadata) + private void initiate(Set<InetAddressAndPort> sendTo, ClusterMetadata metadata, boolean verifyAllPeersMetadata) { - if (!updateInitiator(null, new Initiator(FBUtilities.getBroadcastAddressAndPort(), UUID.randomUUID()))) + CMSInitializationRequest initializationRequest = new CMSInitializationRequest(FBUtilities.getBroadcastAddressAndPort(), UUID.randomUUID(), metadata); + if (!updateInitiator(null, initializationRequest.initiator)) throw new IllegalStateException("Migration already initiated by " + initiator.get()); logger.info("No previous migration detected, initiating"); - Collection<Pair<InetAddressAndPort, ClusterMetadataHolder>> metadatas = MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_INIT_MIG_REQ, initiator.get()); + Collection<Pair<InetAddressAndPort, CMSInitializationResponse>> metadatas = MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_INIT_MIG_REQ, initializationRequest); if (metadatas.size() != sendTo.size()) { Set<InetAddressAndPort> responded = metadatas.stream().map(p -> p.left).collect(Collectors.toSet()); @@ -116,45 +113,38 @@ public class Election throw new IllegalStateException(msg); } - Set<InetAddressAndPort> mismatching = metadatas.stream().filter(p -> !isMatch.apply(p.right.metadata)).map(p -> p.left).collect(Collectors.toSet()); - if (!mismatching.isEmpty()) + if (verifyAllPeersMetadata) { - String msg = String.format("Got mismatching cluster metadatas from %s aborting migration", mismatching); - Map<InetAddressAndPort, ClusterMetadataHolder> metadataMap = new HashMap<>(); - metadatas.forEach(pair -> metadataMap.put(pair.left, pair.right)); - if (metadata != null) + Set<InetAddressAndPort> mismatching = metadatas.stream().filter(p -> !p.right.metadataMatches).map(p -> p.left).collect(Collectors.toSet()); + if (!mismatching.isEmpty()) { - for (InetAddressAndPort e : mismatching) - { - logger.warn("Diff with {}", e); - metadata.dumpDiff(metadataMap.get(e).metadata); - } + String msg = String.format("Got mismatching cluster metadatas. Check logs on peers (%s) for details of mismatches. Aborting migration.", mismatching); + throw new IllegalStateException(msg); } - throw new IllegalStateException(msg); } } private void finish(Set<InetAddressAndPort> sendTo) { - Initiator currentCoordinator = initiator.get(); - assert currentCoordinator.initiator.equals(FBUtilities.getBroadcastAddressAndPort()); + CMSInitializationRequest.Initiator currentInitiator = initiator.get(); + assert currentInitiator.initiator.equals(FBUtilities.getBroadcastAddressAndPort()); Startup.initializeAsFirstCMSNode(); Register.maybeRegister(); SystemKeyspace.setLocalHostId(ClusterMetadata.current().myNodeId().toUUID()); - updateInitiator(currentCoordinator, MIGRATED); + updateInitiator(currentInitiator, MIGRATED); MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, false)); } private void abort(Set<InetAddressAndPort> sendTo) { - Initiator init = initiator.getAndSet(null); + CMSInitializationRequest.Initiator init = initiator.getAndSet(null); for (InetAddressAndPort ep : sendTo) messaging.send(Message.out(Verb.TCM_ABORT_MIG, init), ep); } - public Initiator initiator() + public CMSInitializationRequest.Initiator initiator() { return initiator.get(); } @@ -164,103 +154,63 @@ public class Election initiator.set(MIGRATED); } - private boolean updateInitiator(Initiator expected, Initiator newCoordinator) + private boolean updateInitiator(CMSInitializationRequest.Initiator expected, CMSInitializationRequest.Initiator newInitiator) { - Initiator current = initiator.get(); - return Objects.equals(current, expected) && initiator.compareAndSet(current, newCoordinator); + CMSInitializationRequest.Initiator current = initiator.get(); + return Objects.equals(current, expected) && initiator.compareAndSet(current, newInitiator); } public boolean isMigrating() { - Initiator coordinator = initiator(); - return coordinator != null && coordinator != MIGRATED; + CMSInitializationRequest.Initiator initiator = initiator(); + return initiator != null && initiator != MIGRATED; } - public class PrepareHandler implements IVerbHandler<Initiator> + public class PrepareHandler implements IVerbHandler<CMSInitializationRequest> { @Override - public void doVerb(Message<Initiator> message) throws IOException + public void doVerb(Message<CMSInitializationRequest> message) throws IOException { logger.info("Received election initiation message {} from {}", message.payload, message.from()); - if (!updateInitiator(null, message.payload)) + if (!updateInitiator(null, message.payload.initiator)) throw new IllegalStateException(String.format("Got duplicate initiate migration message from %s, migration is already started by %s", message.from(), initiator())); - // todo; disallow ANY changes to state managed in ClusterMetadata logger.info("Sending initiation response"); - messaging.send(message.responseWith(new ClusterMetadataHolder(message.payload, ClusterMetadata.current())), message.from()); + Directory initiatorDirectory = message.payload.directory; + TokenMap initiatorTokenMap = message.payload.tokenMap; + UUID initiatorSchemaVersion = message.payload.schemaVersion; + ClusterMetadata metadata = ClusterMetadata.current(); + boolean match = true; + if (!initiatorDirectory.equals(metadata.directory)) + { + match = false; + logger.warn("Initiator directory different from our"); + initiatorDirectory.dumpDiff(metadata.directory); + } + if (!initiatorTokenMap.equals(metadata.tokenMap)) + { + match = false; + logger.warn("Initiator tokenmap different from ours"); + initiatorTokenMap.dumpDiff(metadata.tokenMap); + } + UUID schemaDigest = SchemaKeyspace.calculateSchemaDigest(); + if (!initiatorSchemaVersion.equals(schemaDigest)) + { + match = false; + logger.warn("Initiator schema different from our: {} != {}", initiatorSchemaVersion, schemaDigest); + } + messaging.send(message.responseWith(new CMSInitializationResponse(message.payload.initiator, match)), message.from()); } } - public class AbortHandler implements IVerbHandler<Initiator> + public class AbortHandler implements IVerbHandler<CMSInitializationRequest.Initiator> { @Override - public void doVerb(Message<Initiator> message) throws IOException + public void doVerb(Message<CMSInitializationRequest.Initiator> message) throws IOException { logger.info("Received election abort message {} from {}", message.payload, message.from()); if (!message.from().equals(initiator().initiator) || !updateInitiator(message.payload, null)) logger.error("Could not clear initiator - initiator is set to {}, abort message received from {}", initiator(), message.payload); } } - - public static class Initiator - { - public static final Serializer serializer = new Serializer(); - - public final InetAddressAndPort initiator; - public final UUID initToken; - - public Initiator(InetAddressAndPort initiator, UUID initToken) - { - this.initiator = initiator; - this.initToken = initToken; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (!(o instanceof Initiator)) return false; - Initiator other = (Initiator) o; - return Objects.equals(initiator, other.initiator) && Objects.equals(initToken, other.initToken); - } - - @Override - public int hashCode() - { - return Objects.hash(initiator, initToken); - } - - @Override - public String toString() - { - return "Initiator{" + - "initiator=" + initiator + - ", initToken=" + initToken + - '}'; - } - - public static class Serializer implements IVersionedSerializer<Initiator> - { - @Override - public void serialize(Initiator t, DataOutputPlus out, int version) throws IOException - { - InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.initiator, out, version); - UUIDSerializer.serializer.serialize(t.initToken, out, version); - } - - @Override - public Initiator deserialize(DataInputPlus in, int version) throws IOException - { - return new Initiator(InetAddressAndPort.Serializer.inetAddressAndPortSerializer.deserialize(in, version), - UUIDSerializer.serializer.deserialize(in, version)); - } - - @Override - public long serializedSize(Initiator t, int version) - { - return InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.initiator, version) + - UUIDSerializer.serializer.serializedSize(t.initToken, version); - } - } - } } diff --git a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java index cfced3426b..deef0a4b1b 100644 --- a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java +++ b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java @@ -23,7 +23,7 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Commit; import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.tcm.membership.NodeVersion; -import org.apache.cassandra.tcm.migration.ClusterMetadataHolder; +import org.apache.cassandra.tcm.migration.CMSInitializationRequest; /** * Provides IVersionedSerializers for internode messages where the payload includes @@ -69,13 +69,13 @@ public class MessageSerializers return Commit.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion()); } - public static IVersionedSerializer<ClusterMetadataHolder> metadataHolderSerializer() + public static IVersionedSerializer<CMSInitializationRequest> initRequestSerializer() { ClusterMetadata metadata = ClusterMetadata.currentNullable(); if (metadata == null || metadata.directory.clusterMinVersion.serializationVersion == NodeVersion.CURRENT.serializationVersion) - return ClusterMetadataHolder.defaultMessageSerializer; + return CMSInitializationRequest.defaultMessageSerializer; assert !metadata.directory.clusterMinVersion.serializationVersion().equals(NodeVersion.CURRENT.serializationVersion()); - return ClusterMetadataHolder.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion()); + return CMSInitializationRequest.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion()); } } diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java deleted file mode 100644 index d89b902d7c..0000000000 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.upgrade; - -import org.junit.Test; - -import org.apache.cassandra.distributed.Constants; -import org.apache.cassandra.distributed.api.Feature; - -public class ClusterMetadataUpgradeIgnoreHostsTest extends UpgradeTestBase -{ - @Test - public void upgradeIgnoreHostsTest() throws Throwable - { - new TestCase() - .nodes(3) - .nodesToUpgrade(1, 2, 3) - .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP) - .set(Constants.KEY_DTEST_FULL_STARTUP, true)) - .upgradesToCurrentFrom(v41) - .setup((cluster) -> { - cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor':2}")); - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - }) - .runAfterClusterUpgrade((cluster) -> { - // todo; isolate node 3 - actually shutting it down makes us throw exceptions when test finishes - cluster.filters().allVerbs().to(3).drop(); - cluster.filters().allVerbs().from(3).drop(); - cluster.get(1).nodetoolResult("cms", "initialize").asserts().failure(); // node3 unreachable - cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", "127.0.0.1").asserts().failure(); // can't ignore localhost - cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", "127.0.0.3").asserts().success(); - }).run(); - } -} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java index 87399d8a44..930933425c 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java @@ -27,6 +27,12 @@ import org.apache.cassandra.distributed.Constants; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; import static org.junit.Assert.assertFalse; @@ -79,4 +85,34 @@ public class ClusterMetadataUpgradeTest extends UpgradeTestBase assertTrue(Arrays.toString(desc[0]).contains("NetworkTopologyStrategy")); }).run(); } + + + @Test + public void upgradeMismatchTest() throws Throwable + { + new TestCase() + .nodes(3) + .nodesToUpgrade(1, 2, 3) + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP) + .set(Constants.KEY_DTEST_FULL_STARTUP, true)) + .upgradesToCurrentFrom(v50) + .setup((cluster) -> { + cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor':2}")); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + }) + .runAfterClusterUpgrade((cluster) -> { + IInvokableInstance n3 = ((IInvokableInstance) cluster.get(3)); + n3.runOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + Directory diffingDirectory = metadata.directory.with(new NodeAddresses(InetAddressAndPort.getByNameUnchecked("127.0.0.99")), new Location("aaa", "bbb")); + ClusterMetadata diffing = ClusterMetadata.current().transformer().with(diffingDirectory).buildForGossipMode(); + ClusterMetadataService.instance().setFromGossip(diffing); + }); + cluster.get(1).nodetoolResult("cms", "initialize").asserts().failure(); + cluster.get(3).logs().watchFor("Initiator directory different from our"); + cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", "127.0.0.3").asserts().success(); + cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int primary key)")); + }).run(); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org