This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 63c6261856 Reimplement ClusterMetadata::writePlacementAllSettled to step through InProgressSequences to determine state when finished. 63c6261856 is described below commit 63c62618560ad65b5b3e9f4d34b70b8b6dd0a75b Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Tue Mar 12 08:31:05 2024 +0100 Reimplement ClusterMetadata::writePlacementAllSettled to step through InProgressSequences to determine state when finished. Patch by marcuse; reviewed by Alex Petrov for CASSANDRA-19193 --- .../org/apache/cassandra/tcm/ClusterMetadata.java | 46 +++-------------- .../apache/cassandra/tcm/MultiStepOperation.java | 34 +++++++++++++ .../apache/cassandra/tcm/sequences/AddToCMS.java | 6 +++ .../cassandra/tcm/sequences/BootstrapAndJoin.java | 7 +++ .../tcm/sequences/BootstrapAndReplace.java | 7 +++ .../tcm/sequences/InProgressSequences.java | 13 +++-- .../org/apache/cassandra/tcm/sequences/Move.java | 10 +++- .../cassandra/tcm/sequences/ReconfigureCMS.java | 21 ++++++++ .../tcm/sequences/UnbootstrapAndLeave.java | 7 +++ .../cassandra/tcm/transformations/UnsafeJoin.java | 32 ++---------- .../test/log/MetadataChangeSimulationTest.java | 58 +++++++++++++++++++++- .../apache/cassandra/tcm/ClusterMetadataTest.java | 2 +- 12 files changed, 170 insertions(+), 73 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 5e6b9a6060..3224b29466 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -64,7 +65,6 @@ import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator; import org.apache.cassandra.tcm.ownership.PlacementForRange; import org.apache.cassandra.tcm.ownership.TokenMap; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; -import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.InProgressSequences; import org.apache.cassandra.tcm.sequences.LockedRanges; import org.apache.cassandra.tcm.serialization.MetadataSerializer; @@ -309,45 +309,15 @@ public class ClusterMetadata public DataPlacement writePlacementAllSettled(KeyspaceMetadata ksm) { - List<NodeId> joining = new ArrayList<>(); - List<NodeId> leaving = new ArrayList<>(); - List<NodeId> moving = new ArrayList<>(); - - for (Map.Entry<NodeId, NodeState> entry : directory.states.entrySet()) - { - switch (entry.getValue()) - { - case BOOTSTRAPPING: - joining.add(entry.getKey()); - break; - case LEAVING: - leaving.add(entry.getKey()); - break; - case MOVING: - moving.add(entry.getKey()); - break; - } - } - - Transformer t = transformer(); - for (NodeId node: joining) + ClusterMetadata metadata = this; + Iterator<MultiStepOperation<?>> iter = metadata.inProgressSequences.iterator(); + while (iter.hasNext()) { - MultiStepOperation<?> joinSequence = inProgressSequences.get(node); - assert joinSequence instanceof BootstrapAndJoin; - Set<Token> tokens = ((BootstrapAndJoin)joinSequence).finishJoin.tokens; - t = t.proposeToken(node, tokens); + Transformation.Result result = iter.next().applyTo(metadata); + assert result.isSuccess(); + metadata = result.success().metadata; } - for (NodeId node : leaving) - t = t.proposeRemoveNode(node); - // todo: add tests for move! - for (NodeId node : moving) - t = t.proposeRemoveNode(node).proposeToken(node, tokenMap.tokens(node)); - - ClusterMetadata proposed = t.build().metadata; - return ClusterMetadataService.instance() - .placementProvider() - .calculatePlacements(epoch, proposed.tokenMap.toRanges(), proposed, Keyspaces.of(ksm)) - .get(ksm.params.replication); + return metadata.placements.get(ksm.params.replication); } // TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges diff --git a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java index b315aa9b42..9e8f21c6ec 100644 --- a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java +++ b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java @@ -18,9 +18,14 @@ package org.apache.cassandra.tcm; +import java.util.List; + +import com.google.common.collect.ImmutableSet; + import org.apache.cassandra.tcm.sequences.AddToCMS; import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.BootstrapAndReplace; +import org.apache.cassandra.tcm.sequences.LockedRanges; import org.apache.cassandra.tcm.sequences.Move; import org.apache.cassandra.tcm.sequences.ReconfigureCMS; import org.apache.cassandra.tcm.sequences.SequenceState; @@ -138,6 +143,35 @@ public abstract class MultiStepOperation<CONTEXT> */ public abstract SequenceState executeNext(); + /** + * Apply the remaining steps of this MSO - resulting metadata will have epoch = metadata.epoch + 1 + */ + public abstract Transformation.Result applyTo(ClusterMetadata metadata); + + /** + * Helper method for the standard applyTo implementations where we just execute a list of transformations, starting at `next` + * @return + */ + public static Transformation.Result applyMultipleTransformations(ClusterMetadata metadata, Transformation.Kind next, List<Transformation> transformations) + { + ImmutableSet.Builder<MetadataKey> modifiedKeys = ImmutableSet.builder(); + Epoch lastModifiedEpoch = metadata.epoch; + boolean foundStart = false; + for (Transformation nextTransformation : transformations) + { + if (nextTransformation.kind() == next) + foundStart = true; + if (foundStart) + { + Transformation.Result result = nextTransformation.execute(metadata); + assert result.isSuccess(); + metadata = result.success().metadata.forceEpoch(lastModifiedEpoch); + modifiedKeys.addAll(result.success().affectedMetadata); + } + } + return new Transformation.Success(metadata.forceEpoch(lastModifiedEpoch.nextEpoch()), LockedRanges.AffectedRanges.EMPTY, modifiedKeys.build()); + } + /** * Advance the state of an in-progress operation after successfully executing a step. Essentially, this "bumps the * pointer" into the list (actual or logical) of steps which comprise the operation. It is most commonly called by diff --git a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java index f0a2e6f6c8..149c99b8b2 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java +++ b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java @@ -119,6 +119,12 @@ public class AddToCMS extends MultiStepOperation<Epoch> return Transformation.Kind.FINISH_ADD_TO_CMS; } + @Override + public Transformation.Result applyTo(ClusterMetadata metadata) + { + return finishJoin.execute(metadata); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java index bd8d752316..008fc659a9 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java @@ -62,6 +62,7 @@ import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.vint.VIntCoding; +import static com.google.common.collect.ImmutableList.of; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.cassandra.tcm.Transformation.Kind.FINISH_JOIN; import static org.apache.cassandra.tcm.Transformation.Kind.MID_JOIN; @@ -168,6 +169,12 @@ public class BootstrapAndJoin extends MultiStepOperation<Epoch> return indexToNext(idx); } + @Override + public Transformation.Result applyTo(ClusterMetadata metadata) + { + return applyMultipleTransformations(metadata, next, of(startJoin, midJoin, finishJoin)); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java index 60d9ff35d1..46f39da387 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java @@ -64,6 +64,7 @@ import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.vint.VIntCoding; +import static com.google.common.collect.ImmutableList.of; import static org.apache.cassandra.tcm.Transformation.Kind.FINISH_REPLACE; import static org.apache.cassandra.tcm.Transformation.Kind.MID_REPLACE; import static org.apache.cassandra.tcm.Transformation.Kind.START_REPLACE; @@ -170,6 +171,12 @@ public class BootstrapAndReplace extends MultiStepOperation<Epoch> return indexToNext(idx); } + @Override + public Transformation.Result applyTo(ClusterMetadata metadata) + { + return applyMultipleTransformations(metadata, next, of(startReplace, midReplace, finishReplace)); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java b/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java index 86b449c346..86a8ec019a 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java +++ b/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java @@ -19,15 +19,14 @@ package org.apache.cassandra.tcm.sequences; import java.io.IOException; +import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -46,7 +45,7 @@ import static org.apache.cassandra.db.TypeSizes.sizeof; import static org.apache.cassandra.tcm.MultiStepOperation.Kind.LEAVE; import static org.apache.cassandra.tcm.serialization.Version.V2; -public class InProgressSequences implements MetadataValue<InProgressSequences> +public class InProgressSequences implements MetadataValue<InProgressSequences>, Iterable<MultiStepOperation<?>> { public static final Serializer serializer = new Serializer(); @@ -183,8 +182,6 @@ public class InProgressSequences implements MetadataValue<InProgressSequences> return Objects.hash(state, lastModified); } - public static Set<MultiStepOperation.Kind> STARTUP_SEQUENCE_KINDS = ImmutableSet.of(MultiStepOperation.Kind.JOIN, MultiStepOperation.Kind.REPLACE); - @VisibleForTesting public static BiFunction<MultiStepOperation<?>, SequenceState, SequenceState> listener = (s, o) -> o; @@ -215,6 +212,12 @@ public class InProgressSequences implements MetadataValue<InProgressSequences> return sequence.kind() == LEAVE; } + @Override + public Iterator<MultiStepOperation<?>> iterator() + { + return state.values().iterator(); + } + public static class Serializer implements MetadataSerializer<InProgressSequences> { public void serialize(InProgressSequences t, DataOutputPlus out, Version version) throws IOException diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index 1210969ea0..b5750e5e93 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -71,6 +71,7 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.vint.VIntCoding; +import static com.google.common.collect.ImmutableList.of; import static org.apache.cassandra.tcm.Transformation.Kind.FINISH_MOVE; import static org.apache.cassandra.tcm.Transformation.Kind.MID_MOVE; import static org.apache.cassandra.tcm.Transformation.Kind.START_MOVE; @@ -169,11 +170,18 @@ public class Move extends MultiStepOperation<Epoch> return NodeId.serializer; } - @Override public Transformation.Kind nextStep() + @Override + public Transformation.Kind nextStep() { return indexToNext(idx); } + @Override + public Transformation.Result applyTo(ClusterMetadata metadata) + { + return applyMultipleTransformations(metadata, next, of(startMove, midMove, finishMove)); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java index afa0e5d62b..e67f0b8f85 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java +++ b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,7 @@ import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.MetadataKey; import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.Retry; import org.apache.cassandra.tcm.Transformation; @@ -130,6 +132,25 @@ public class ReconfigureCMS extends MultiStepOperation<AdvanceCMSReconfiguration return next.kind(); } + @Override + public Transformation.Result applyTo(ClusterMetadata metadata) + { + MultiStepOperation<?> sequence = metadata.inProgressSequences.get(SequenceKey.instance); + if (sequence.kind() != MultiStepOperation.Kind.RECONFIGURE_CMS) + throw new IllegalStateException(String.format("Can not apply in-progress sequence, since its kind is %s, but not %s", sequence.kind(), MultiStepOperation.Kind.RECONFIGURE_CMS)); + Epoch lastModifiedEpoch = metadata.epoch; + ImmutableSet.Builder<MetadataKey> modifiedKeys = ImmutableSet.builder(); + while (metadata.inProgressSequences.contains(SequenceKey.instance)) + { + ReconfigureCMS transitionCMS = (ReconfigureCMS) metadata.inProgressSequences.get(SequenceKey.instance); + Transformation.Result result = transitionCMS.next.execute(metadata); + assert result.isSuccess(); + metadata = result.success().metadata.forceEpoch(lastModifiedEpoch); + modifiedKeys.addAll(result.success().affectedMetadata); + } + return new Transformation.Success(metadata.forceEpoch(lastModifiedEpoch.nextEpoch()), LockedRanges.AffectedRanges.EMPTY, modifiedKeys.build()); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java index f2d470556a..bd93d7b84c 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java +++ b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java @@ -47,6 +47,7 @@ import org.apache.cassandra.tcm.transformations.PrepareLeave; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.vint.VIntCoding; +import static com.google.common.collect.ImmutableList.of; import static org.apache.cassandra.tcm.Transformation.Kind.FINISH_LEAVE; import static org.apache.cassandra.tcm.Transformation.Kind.MID_LEAVE; import static org.apache.cassandra.tcm.Transformation.Kind.START_LEAVE; @@ -148,6 +149,12 @@ public class UnbootstrapAndLeave extends MultiStepOperation<Epoch> return indexToNext(idx); } + @Override + public Transformation.Result applyTo(ClusterMetadata metadata) + { + return applyMultipleTransformations(metadata, next, of(startLeave, midLeave, finishLeave)); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java b/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java index c6ccfbab27..34f1826574 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java +++ b/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java @@ -20,13 +20,9 @@ package org.apache.cassandra.tcm.transformations; import java.util.Set; -import com.google.common.collect.ImmutableSet; - import org.apache.cassandra.dht.Token; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; -import org.apache.cassandra.tcm.Epoch; -import org.apache.cassandra.tcm.MetadataKey; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.ownership.PlacementProvider; import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; @@ -73,30 +69,12 @@ public class UnsafeJoin extends PrepareJoin if (result.isRejected()) return result; - Success success = result.success(); - ClusterMetadata metadata = success.metadata; - Epoch forceEpoch = metadata.epoch; - metadata = success.metadata.forceEpoch(prev.epoch); - + ClusterMetadata metadata = result.success().metadata.forceEpoch(prev.epoch); BootstrapAndJoin plan = (BootstrapAndJoin) metadata.inProgressSequences.get(nodeId()); - - ImmutableSet.Builder<MetadataKey> modifiedKeys = ImmutableSet.builder(); - - success = plan.startJoin.execute(metadata).success(); - metadata = success.metadata.forceEpoch(prev.epoch); - modifiedKeys.addAll(success.affectedMetadata); - - success = plan.midJoin.execute(metadata).success(); - metadata = success.metadata.forceEpoch(prev.epoch); - modifiedKeys.addAll(success.affectedMetadata); - - success = plan.finishJoin.execute(metadata).success(); - metadata = success.metadata; - modifiedKeys.addAll(success.affectedMetadata); - - assert metadata.epoch.is(forceEpoch) : String.format("Epoch should have been %s but was %s", forceEpoch, metadata.epoch); - - return new Success(metadata, LockedRanges.AffectedRanges.EMPTY, modifiedKeys.build()); + Result res = plan.applyTo(metadata); + metadata = res.success().metadata; + assert metadata.epoch.isDirectlyAfter(prev.epoch); + return new Success(metadata, LockedRanges.AffectedRanges.EMPTY, res.success().affectedMetadata); } public static void unsafeJoin(NodeId nodeId, Set<Token> tokens) diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java index dabe363ca0..5a324fa70c 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java @@ -49,6 +49,7 @@ import org.apache.cassandra.locator.CMSPlacementStrategy; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.tcm.AtomicLongBackedProcessor; import org.apache.cassandra.tcm.ClusterMetadata; @@ -874,4 +875,59 @@ public class MetadataChangeSimulationTest extends CMSTestBase overreplicated >= expectedOverReplicated && overreplicated <= (expectedOverReplicated * rf.total() + 2 + movingNodes * rf.total())); } } -} \ No newline at end of file + + @Test + public void testPlacementsAllSettled() throws Throwable + { + Random random = new Random(1); + ReplicationFactor rf = new NtsReplicationFactor(1, 3); + + try(CMSSut sut = new CMSSut(AtomicLongBackedProcessor::new, false, rf)) + { + ModelState state = ModelState.empty(nodeFactory(), 300, 1); + Node toJoin = null; + Node toMove = null; + Node toReplace = null; + Node toLeave = null; + for (Map.Entry<String, Integer> e : rf.asMap().entrySet()) + { + int dc = Integer.parseInt(e.getKey().replace("datacenter", "")); + + for (int i = 0; i < 100; i++) + { + ModelChecker.Pair<ModelState, Node> registration = registerNewNode(state, sut, dc, random.nextInt(5) + 1); + state = registration.l; + if (i == 50) + toJoin = registration.r; + else + { + state = SimulatedOperation.joinWithoutBootstrap(registration.l, sut, registration.r); + if (i == 75) + toMove = registration.r; + if (i == 25) + toReplace = registration.r; + if (i == 10) + toLeave = registration.r; + } + } + } + state = SimulatedOperation.join(sut, state, toJoin); + state = SimulatedOperation.move(sut, state, toMove, toMove.overrideToken(toMove.token() + 1)); + + ModelChecker.Pair<ModelState, Node> replacement = registerNewNode(state, sut, toReplace.tokenIdx(), toReplace.dcIdx(), toReplace.rackIdx());; + state = SimulatedOperation.replace(sut, replacement.l, toReplace, replacement.r); + state = SimulatedOperation.leave(sut, state, toLeave); + + KeyspaceMetadata ksm = sut.service.metadata().schema.getKeyspaces().get("test").get(); + DataPlacement allSettled = sut.service.metadata().writePlacementAllSettled(ksm); + Assert.assertEquals(4, state.inFlightOperations.size()); // make sure none was rejected + while (!state.inFlightOperations.isEmpty()) + { + state = state.inFlightOperations.get(random.nextInt(state.inFlightOperations.size())).advance(state); + Assert.assertEquals(allSettled, sut.service.metadata().writePlacementAllSettled(ksm)); + validatePlacements(sut, state); + } + Assert.assertEquals(allSettled, sut.service.metadata().placements.get(ksm.params.replication)); + } + } +} diff --git a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java index 249ea32551..c17067521b 100644 --- a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java +++ b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java @@ -54,7 +54,7 @@ public class ClusterMetadataTest } @Test - public void testWritePlacementAllSettledLeaving() throws ExecutionException, InterruptedException + public void testWritePlacementAllSettledLeaving() { for (int i = 1; i <= 4; i++) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org