This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 91829d20697d7a3a3a0469482e1d8e4b5f292d65 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Fri Mar 17 17:14:16 2023 +0000 [CEP-21] During multi step operations, defer token map update until completion of final step patch by Sam Tunnicliffe; reviewed by Alex Petrov and Marcus Eriksson for CASSANDRA-18419 --- .../apache/cassandra/net/ResponseVerbHandler.java | 5 +- .../org/apache/cassandra/tcm/ClusterMetadata.java | 6 +++ .../cassandra/tcm/ClusterMetadataService.java | 6 ++- .../tcm/ownership/UniformRangePlacement.java | 2 + .../cassandra/tcm/sequences/BootstrapAndJoin.java | 6 +-- .../org/apache/cassandra/tcm/sequences/Move.java | 1 - .../cassandra/tcm/transformations/PrepareJoin.java | 52 +++++++++++++++++--- .../tcm/transformations/PrepareLeave.java | 1 - .../cassandra/tcm/transformations/PrepareMove.java | 1 - .../cassandra/tcm/transformations/UnsafeJoin.java | 1 + .../distributed/test/log/PlacementSimulator.java | 57 +++++++++------------- .../InProgressSequenceCancellationTest.java | 5 +- 12 files changed, 87 insertions(+), 56 deletions(-) diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 2dff292627..6ae760e491 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -48,8 +48,9 @@ class ResponseVerbHandler implements IVerbHandler // that executes something on the gossip stage as well. !Stage.GOSSIP.executor().inExecutor()) { - logger.debug("Learned about next epoch {} from {} in {}", message.epoch(), message.from(), message.verb()); - ClusterMetadataService.instance().maybeCatchup(message.epoch()); + boolean caughtUp = ClusterMetadataService.instance().maybeCatchup(message.epoch()); + if (caughtUp) + logger.debug("Learned about next epoch {} from {} in {}", message.epoch(), message.from(), message.verb()); } RequestCallbacks.CallbackInfo callbackInfo = MessagingService.instance().callbacks.remove(message.id(), message.from()); diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index b1c9b3f00f..ab412cfd05 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -344,6 +344,11 @@ public class ClusterMetadata public Transformer proposeToken(NodeId nodeId, Collection<Token> tokens) { tokenMap = tokenMap.assignTokens(nodeId, tokens); + return this; + } + + public Transformer addToRackAndDC(NodeId nodeId) + { directory = directory.withRackAndDC(nodeId); return this; } @@ -373,6 +378,7 @@ public class ClusterMetadata tokenMap = tokenMap.unassignTokens(replaced) .assignTokens(replacement, transferringTokens); directory = directory.without(replaced) + .withRackAndDC(replacement) .withNodeState(replacement, NodeState.JOINED); return this; } diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 82eecd461f..653f9c9b77 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -415,7 +415,7 @@ public class ClusterMetadataService * Utility methods */ - public void maybeCatchup(Epoch theirEpoch) + public boolean maybeCatchup(Epoch theirEpoch) { Epoch ourEpoch = ClusterMetadata.current().epoch; if (!theirEpoch.isBefore(Epoch.FIRST) && theirEpoch.isAfter(ourEpoch)) @@ -423,7 +423,7 @@ public class ClusterMetadataService if (state() == State.GOSSIP) { logger.warn("TODO: can't catchup in gossip mode (their epoch = {})", theirEpoch); //todo: we have seen a message with epoch > EMPTY, we are probably racing with migration, or we missed the finish migration message, handle! - return; + return false; } replayAndWait(); @@ -433,7 +433,9 @@ public class ClusterMetadataService throw new IllegalArgumentException(String.format("Could not catch up to epoch %s even after replay. Highest seen after replay is %s.", theirEpoch, ourEpoch)); } + return true; } + return false; } public ClusterMetadata replayAndWait() diff --git a/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java b/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java index c0e7be3e4f..a88c7086df 100644 --- a/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java +++ b/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java @@ -91,6 +91,7 @@ public class UniformRangePlacement implements PlacementProvider { DataPlacements placements = calculatePlacements(metadata.transformer() .proposeToken(joining, tokens) + .addToRackAndDC(joining) .build() .metadata, keyspaces); @@ -109,6 +110,7 @@ public class UniformRangePlacement implements PlacementProvider DataPlacements finalPlacements = calculatePlacements(metadata.transformer() .proposeToken(joining, tokens) + .addToRackAndDC(joining) .build() .metadata, keyspaces); diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java index ece89e1c16..4a95c069d3 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java @@ -41,7 +41,6 @@ import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.InProgressSequence; import org.apache.cassandra.tcm.Transformation; -import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.ownership.PlacementDeltas; @@ -159,9 +158,7 @@ public class BootstrapAndJoin implements InProgressSequence<BootstrapAndJoin> case START_JOIN: try { - NodeId nodeId = ClusterMetadata.current().myNodeId(); - SystemKeyspace.updateTokens(ClusterMetadata.current().tokenMap.tokens(nodeId)); - + SystemKeyspace.updateTokens(finishJoin.tokens); ClusterMetadataService.instance().commit(startJoin); } catch (Throwable e) @@ -244,7 +241,6 @@ public class BootstrapAndJoin implements InProgressSequence<BootstrapAndJoin> } LockedRanges newLockedRanges = metadata.lockedRanges.unlock(lockKey); return metadata.transformer() - .unproposeTokens(startJoin.nodeId()) .withNodeState(startJoin.nodeId(), NodeState.REGISTERED) .with(placements) .with(newLockedRanges); diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index 650eadb329..3de26c2a79 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -268,7 +268,6 @@ public class Move implements InProgressSequence<Move> LockedRanges newLockedRanges = metadata.lockedRanges.unlock(lockKey); return metadata.transformer() - .unproposeTokens(startMove.nodeId(), tokens) .withNodeState(startMove.nodeId(), NodeState.JOINED) .with(placements) .with(newLockedRanges); diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java b/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java index 12fc33eca2..5200eab52b 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java +++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java @@ -127,7 +127,6 @@ public class PrepareJoin implements Transformation @Override public Result execute(ClusterMetadata prev) { - ClusterMetadata.Transformer proposed = prev.transformer().proposeToken(nodeId, tokens); PlacementTransitionPlan transitionPlan = placementProvider.planForJoin(prev, nodeId, tokens, prev.schema.getKeyspaces()); LockedRanges.AffectedRanges rangesToLock = transitionPlan.affectedRanges(); @@ -141,7 +140,7 @@ public class PrepareJoin implements Transformation LockedRanges.Key lockKey = LockedRanges.keyFor(prev.nextEpoch()); StartJoin startJoin = new StartJoin(nodeId, transitionPlan.addToWrites(), lockKey); MidJoin midJoin = new MidJoin(nodeId, transitionPlan.moveReads(), lockKey); - FinishJoin finishJoin = new FinishJoin(nodeId, transitionPlan.removeFromWrites(), lockKey); + FinishJoin finishJoin = new FinishJoin(nodeId, tokens, transitionPlan.removeFromWrites(), lockKey); ProgressBarrier barrier = ProgressBarrier.immediate(rangesToLock.toPeers(prev.placements, prev.directory)); BootstrapAndJoin plan = new BootstrapAndJoin(barrier, @@ -154,9 +153,10 @@ public class PrepareJoin implements Transformation LockedRanges newLockedRanges = prev.lockedRanges.lock(lockKey, rangesToLock); DataPlacements startingPlacements = transitionPlan.toSplit.apply(prev.placements); - proposed = proposed.with(newLockedRanges) - .with(startingPlacements) - .with(prev.inProgressSequences.with(nodeId, plan)); + ClusterMetadata.Transformer proposed = prev.transformer() + .with(newLockedRanges) + .with(startingPlacements) + .with(prev.inProgressSequences.with(nodeId, plan)); return success(proposed, rangesToLock); } @@ -278,10 +278,12 @@ public class PrepareJoin implements Transformation public static class FinishJoin extends ApplyPlacementDeltas { public static final Serializer serializer = new Serializer(); + public final Set<Token> tokens; - public FinishJoin(NodeId nodeId, PlacementDeltas delta, LockedRanges.Key unlockKey) + public FinishJoin(NodeId nodeId, Set<Token> tokens, PlacementDeltas delta, LockedRanges.Key unlockKey) { super(nodeId, delta, unlockKey, true); + this.tokens = tokens; } @Override @@ -293,14 +295,50 @@ public class PrepareJoin implements Transformation public ClusterMetadata.Transformer transform(ClusterMetadata prev, ClusterMetadata.Transformer transformer) { return transformer.join(nodeId) + .proposeToken(nodeId, tokens) + .addToRackAndDC(nodeId) .with(prev.inProgressSequences.without(nodeId)); } public static final class Serializer extends SerializerBase<FinishJoin> { + @Override + public void serialize(Transformation t, DataOutputPlus out, Version version) throws IOException + { + super.serialize(t, out, version); + Set<Token> tokens = ((FinishJoin)t).tokens; + out.writeUnsignedVInt32(tokens.size()); + for (Token token : tokens) + Token.metadataSerializer.serialize(token, out, version); + } + + @Override + public FinishJoin deserialize(DataInputPlus in, Version version) throws IOException + { + NodeId nodeId = NodeId.serializer.deserialize(in, version); + PlacementDeltas delta = PlacementDeltas.serializer.deserialize(in, version); + LockedRanges.Key lockKey = LockedRanges.Key.serializer.deserialize(in, version); + int numTokens = in.readUnsignedVInt32(); + Set<Token> tokens = new HashSet<>(); + for (int i = 0; i < numTokens; i++) + tokens.add(Token.metadataSerializer.deserialize(in, version)); + return new FinishJoin(nodeId, tokens, delta, lockKey); + } + + @Override + public long serializedSize(Transformation t, Version version) + { + long size = super.serializedSize(t, version); + Set<Token> tokens = ((FinishJoin)t).tokens; + size += TypeSizes.sizeofUnsignedVInt(tokens.size()); + for (Token token : tokens) + size += Token.metadataSerializer.serializedSize(token, version); + return size; + } + FinishJoin construct(NodeId nodeId, PlacementDeltas delta, LockedRanges.Key lockKey) { - return new FinishJoin(nodeId, delta, lockKey); + throw new IllegalStateException(); } } } diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java b/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java index b1363d4b8c..cdbc281856 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java +++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java @@ -100,7 +100,6 @@ public class PrepareLeave implements Transformation PlacementDeltas finishDelta = transitionPlan.removeFromWrites(); LockedRanges.Key unlockKey = LockedRanges.keyFor(proposed.epoch); - InetAddressAndPort leaving = prev.directory.endpoint(nodeId); StartLeave start = new StartLeave(nodeId, startDelta, unlockKey); MidLeave mid = new MidLeave(nodeId, midDelta, unlockKey); diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java b/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java index f8e178150c..ae9619c01a 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java +++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java @@ -108,7 +108,6 @@ public class PrepareMove implements Transformation return success(prev.transformer() .withNodeState(nodeId, NodeState.MOVING) - .proposeToken(nodeId, tokens) .with(prev.lockedRanges.lock(lockKey, rangesToLock)) .with(transitionPlan.toSplit.apply(prev.placements)) .with(prev.inProgressSequences.with(nodeId, sequence)), diff --git a/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java b/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java index 802718ff93..60dac7cebc 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java +++ b/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java @@ -55,6 +55,7 @@ public class UnsafeJoin extends PrepareJoin { return "UnsafeJoin{" + "id=" + nodeId + + ", tokens=" + tokens + '}'; } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java b/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java index 57105d1b55..40de4dd8be 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java @@ -301,18 +301,13 @@ public class PlacementSimulator // add the new node to the system and split ranges according to its token, while retaining current // placement. This step will always be executed immediately, whereas subsequent steps may be deferred debug.log("Splitting ranges to prepare for join of " + node + "\n"); - return model.withNodes(move(splitNodes, token, node)) - .withReadPlacements(splitReplicated(baseState.readPlacements, token)) + return model.withReadPlacements(splitReplicated(baseState.readPlacements, token)) .withWritePlacements(splitReplicated(baseState.writePlacements, token)); }, (model) -> { // revert // final stage of reverting a join is to undo the range splits performed by preparing the operation debug.log("Reverting range splits from prepare-join of " + node + "\n"); - List<Node> newNodes = new ArrayList<>(model.nodes); - Node toRemove = new Node(token, node); - newNodes.remove(toRemove); - return model.withNodes(newNodes) - .withWritePlacements(mergeReplicated(model.writePlacements, token)) + return model.withWritePlacements(mergeReplicated(model.writePlacements, token)) .withReadPlacements(mergeReplicated(model.readPlacements, token)); }) ); @@ -364,7 +359,11 @@ public class PlacementSimulator "\twriteModifications=\n%s", node, token, diffsToString(step3WriteCommands))); - return model.withWritePlacements(PlacementSimulator.apply(model.writePlacements, step3WriteCommands)); + List<Node> newNodes = new ArrayList<>(model.nodes); + newNodes.add(new Node(token, node)); + Collections.sort(newNodes, Node::compareTo); + return model.withNodes(newNodes) + .withWritePlacements(PlacementSimulator.apply(model.writePlacements, step3WriteCommands)); }, (model) -> { //revert throw new IllegalStateException("Can't revert finish-join of " + node + ", operation is already complete\n"); @@ -383,7 +382,6 @@ public class PlacementSimulator .get(); List<Node> origNodes = new ArrayList<>(baseState.nodes); - List<Node> finalNodes = new ArrayList<>(); for (int i = 0; i < origNodes.size(); i++) { @@ -405,26 +403,12 @@ public class PlacementSimulator steps.add(new Transformation( (model) -> { // apply debug.log(String.format("Splitting ranges to prepare for move of %s to %d\n", node, newToken)); - List<Node> newNodes = new ArrayList<>(model.nodes); - newNodes.add(new Node(newToken, node)); - Collections.sort(newNodes, Node::compareTo); - - return model.withNodes(newNodes) - .withReadPlacements(splitReplicated(model.readPlacements, newToken)) + return model.withReadPlacements(splitReplicated(model.readPlacements, newToken)) .withWritePlacements(splitReplicated(model.writePlacements, newToken)); }, (model) -> { // revert debug.log(String.format("Reverting range splits from prepare move of %s to %d\n", node, newToken)); - List<Node> revertedNodes = new ArrayList<>(); - for (Node n : model.nodes) - { - if (n.token == newToken) - continue; - revertedNodes.add(n); - } - Collections.sort(revertedNodes, Node::compareTo); - return model.withNodes(revertedNodes) - .withWritePlacements(mergeReplicated(model.writePlacements, newToken)) + return model.withWritePlacements(mergeReplicated(model.writePlacements, newToken)) .withReadPlacements(mergeReplicated(model.readPlacements, newToken)); })); @@ -479,21 +463,23 @@ public class PlacementSimulator "\twriteModifications=\n%s", node, newToken, diffsToString(diff))); - Map<Range, List<Node>> writePlacements = model.writePlacements; - - writePlacements = PlacementSimulator.apply(writePlacements, diff); - - List<Node> nodes = new ArrayList<>(); - for (Node n : model.nodes) + List<Node> currentNodes = new ArrayList<>(model.nodes); + List<Node> newNodes = new ArrayList<>(); + for (int i = 0; i < currentNodes.size(); i++) { - if (n.token == oldLocation.token) + if (currentNodes.get(i).id == oldLocation.id) continue; - nodes.add(n); + newNodes.add(currentNodes.get(i)); } - Collections.sort(nodes, Node::compareTo); + newNodes.add(new Node(newToken, node)); + Collections.sort(newNodes, Node::compareTo); + + Map<Range, List<Node>> writePlacements = model.writePlacements; + writePlacements = PlacementSimulator.apply(writePlacements, diff); + return model.withWritePlacements(mergeReplicated(writePlacements, oldLocation.token)) .withReadPlacements(mergeReplicated(model.readPlacements, oldLocation.token)) - .withNodes(nodes); + .withNodes(newNodes); }, (model) -> { throw new IllegalStateException(String.format("Can't revert finish-move of %d, operation is already complete", newToken)); @@ -583,6 +569,7 @@ public class PlacementSimulator diffsToString(step3WriteCommands))); List<Node> newNodes = new ArrayList<>(model.nodes); newNodes.remove(toRemove); + Collections.sort(newNodes, Node::compareTo); Map<Range, List<Node>> writes = PlacementSimulator.apply(model.writePlacements, step3WriteCommands); return model.withReadPlacements(mergeReplicated(model.readPlacements, toRemove.token)) .withWritePlacements(mergeReplicated(writes, toRemove.token)) diff --git a/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java b/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java index 5e76fed0c0..5335df8627 100644 --- a/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java +++ b/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java @@ -125,13 +125,15 @@ public class InProgressSequenceCancellationTest // No need to create a deltas or placements for after FINISH_JOIN because it's too late to cancel by then PlacementDeltas finishDeltas = PlacementDeltas.empty(); + Set<Token> tokens = Collections.singleton(token(random.nextLong())); + BootstrapAndJoin plan = new BootstrapAndJoin(ProgressBarrier.NONE, key, Transformation.Kind.FINISH_JOIN, prepareDeltas, new PrepareJoin.StartJoin(nodeId, startDeltas, key), new PrepareJoin.MidJoin(nodeId, midDeltas, key), - new PrepareJoin.FinishJoin(nodeId, finishDeltas, key), + new PrepareJoin.FinishJoin(nodeId, tokens, finishDeltas, key), false, false); @@ -142,7 +144,6 @@ public class InProgressSequenceCancellationTest .with(afterMid) .with(locked) .withNodeState(nodeId, NodeState.BOOTSTRAPPING) - .proposeToken(nodeId, Collections.singleton(token(random.nextLong()))) .with(before.inProgressSequences.with(nodeId, plan)) .build().metadata; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
