This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 91829d20697d7a3a3a0469482e1d8e4b5f292d65
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Mar 17 17:14:16 2023 +0000

    [CEP-21] During multi step operations, defer token map update until 
completion of final step
    
    patch by Sam Tunnicliffe; reviewed by Alex Petrov and Marcus Eriksson
    for CASSANDRA-18419
---
 .../apache/cassandra/net/ResponseVerbHandler.java  |  5 +-
 .../org/apache/cassandra/tcm/ClusterMetadata.java  |  6 +++
 .../cassandra/tcm/ClusterMetadataService.java      |  6 ++-
 .../tcm/ownership/UniformRangePlacement.java       |  2 +
 .../cassandra/tcm/sequences/BootstrapAndJoin.java  |  6 +--
 .../org/apache/cassandra/tcm/sequences/Move.java   |  1 -
 .../cassandra/tcm/transformations/PrepareJoin.java | 52 +++++++++++++++++---
 .../tcm/transformations/PrepareLeave.java          |  1 -
 .../cassandra/tcm/transformations/PrepareMove.java |  1 -
 .../cassandra/tcm/transformations/UnsafeJoin.java  |  1 +
 .../distributed/test/log/PlacementSimulator.java   | 57 +++++++++-------------
 .../InProgressSequenceCancellationTest.java        |  5 +-
 12 files changed, 87 insertions(+), 56 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 2dff292627..6ae760e491 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -48,8 +48,9 @@ class ResponseVerbHandler implements IVerbHandler
             // that executes something on the gossip stage as well.
             !Stage.GOSSIP.executor().inExecutor())
         {
-            logger.debug("Learned about next epoch {} from {} in {}", 
message.epoch(), message.from(), message.verb());
-            ClusterMetadataService.instance().maybeCatchup(message.epoch());
+            boolean caughtUp = 
ClusterMetadataService.instance().maybeCatchup(message.epoch());
+            if (caughtUp)
+                logger.debug("Learned about next epoch {} from {} in {}", 
message.epoch(), message.from(), message.verb());
         }
 
         RequestCallbacks.CallbackInfo callbackInfo = 
MessagingService.instance().callbacks.remove(message.id(), message.from());
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index b1c9b3f00f..ab412cfd05 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -344,6 +344,11 @@ public class ClusterMetadata
         public Transformer proposeToken(NodeId nodeId, Collection<Token> 
tokens)
         {
             tokenMap = tokenMap.assignTokens(nodeId, tokens);
+            return this;
+        }
+
+        public Transformer addToRackAndDC(NodeId nodeId)
+        {
             directory = directory.withRackAndDC(nodeId);
             return this;
         }
@@ -373,6 +378,7 @@ public class ClusterMetadata
             tokenMap = tokenMap.unassignTokens(replaced)
                                .assignTokens(replacement, transferringTokens);
             directory = directory.without(replaced)
+                                 .withRackAndDC(replacement)
                                  .withNodeState(replacement, NodeState.JOINED);
             return this;
         }
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 82eecd461f..653f9c9b77 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -415,7 +415,7 @@ public class ClusterMetadataService
      * Utility methods
      */
 
-    public void maybeCatchup(Epoch theirEpoch)
+    public boolean maybeCatchup(Epoch theirEpoch)
     {
         Epoch ourEpoch = ClusterMetadata.current().epoch;
         if (!theirEpoch.isBefore(Epoch.FIRST) && theirEpoch.isAfter(ourEpoch))
@@ -423,7 +423,7 @@ public class ClusterMetadataService
             if (state() == State.GOSSIP)
             {
                 logger.warn("TODO: can't catchup in gossip mode (their epoch = 
{})", theirEpoch); //todo: we have seen a message with epoch > EMPTY, we are 
probably racing with migration, or we missed the finish migration message, 
handle!
-                return;
+                return false;
             }
 
             replayAndWait();
@@ -433,7 +433,9 @@ public class ClusterMetadataService
                 throw new IllegalArgumentException(String.format("Could not 
catch up to epoch %s even after replay. Highest seen after replay is %s.",
                                                                  theirEpoch, 
ourEpoch));
             }
+            return true;
         }
+        return false;
     }
 
     public ClusterMetadata replayAndWait()
diff --git 
a/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java 
b/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java
index c0e7be3e4f..a88c7086df 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java
@@ -91,6 +91,7 @@ public class UniformRangePlacement implements 
PlacementProvider
         {
             DataPlacements placements = 
calculatePlacements(metadata.transformer()
                                                                     
.proposeToken(joining, tokens)
+                                                                    
.addToRackAndDC(joining)
                                                                     .build()
                                                             .metadata,
                                                             keyspaces);
@@ -109,6 +110,7 @@ public class UniformRangePlacement implements 
PlacementProvider
 
         DataPlacements finalPlacements = 
calculatePlacements(metadata.transformer()
                                                                      
.proposeToken(joining, tokens)
+                                                                     
.addToRackAndDC(joining)
                                                                      .build()
                                                              .metadata,
                                                              keyspaces);
diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java 
b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
index ece89e1c16..4a95c069d3 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
@@ -41,7 +41,6 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.InProgressSequence;
 import org.apache.cassandra.tcm.Transformation;
-import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.ownership.DataPlacements;
 import org.apache.cassandra.tcm.ownership.PlacementDeltas;
@@ -159,9 +158,7 @@ public class BootstrapAndJoin implements 
InProgressSequence<BootstrapAndJoin>
             case START_JOIN:
                 try
                 {
-                    NodeId nodeId = ClusterMetadata.current().myNodeId();
-                    
SystemKeyspace.updateTokens(ClusterMetadata.current().tokenMap.tokens(nodeId));
-
+                    SystemKeyspace.updateTokens(finishJoin.tokens);
                     ClusterMetadataService.instance().commit(startJoin);
                 }
                 catch (Throwable e)
@@ -244,7 +241,6 @@ public class BootstrapAndJoin implements 
InProgressSequence<BootstrapAndJoin>
         }
         LockedRanges newLockedRanges = metadata.lockedRanges.unlock(lockKey);
         return metadata.transformer()
-                       .unproposeTokens(startJoin.nodeId())
                        .withNodeState(startJoin.nodeId(), NodeState.REGISTERED)
                        .with(placements)
                        .with(newLockedRanges);
diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java 
b/src/java/org/apache/cassandra/tcm/sequences/Move.java
index 650eadb329..3de26c2a79 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/Move.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java
@@ -268,7 +268,6 @@ public class Move implements InProgressSequence<Move>
 
         LockedRanges newLockedRanges = metadata.lockedRanges.unlock(lockKey);
         return metadata.transformer()
-                       .unproposeTokens(startMove.nodeId(), tokens)
                        .withNodeState(startMove.nodeId(), NodeState.JOINED)
                        .with(placements)
                        .with(newLockedRanges);
diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java 
b/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java
index 12fc33eca2..5200eab52b 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java
@@ -127,7 +127,6 @@ public class PrepareJoin implements Transformation
     @Override
     public Result execute(ClusterMetadata prev)
     {
-        ClusterMetadata.Transformer proposed = 
prev.transformer().proposeToken(nodeId, tokens);
         PlacementTransitionPlan transitionPlan = 
placementProvider.planForJoin(prev, nodeId, tokens, prev.schema.getKeyspaces());
 
         LockedRanges.AffectedRanges rangesToLock = 
transitionPlan.affectedRanges();
@@ -141,7 +140,7 @@ public class PrepareJoin implements Transformation
         LockedRanges.Key lockKey = LockedRanges.keyFor(prev.nextEpoch());
         StartJoin startJoin = new StartJoin(nodeId, 
transitionPlan.addToWrites(), lockKey);
         MidJoin midJoin = new MidJoin(nodeId, transitionPlan.moveReads(), 
lockKey);
-        FinishJoin finishJoin = new FinishJoin(nodeId, 
transitionPlan.removeFromWrites(), lockKey);
+        FinishJoin finishJoin = new FinishJoin(nodeId, tokens, 
transitionPlan.removeFromWrites(), lockKey);
 
         ProgressBarrier barrier = 
ProgressBarrier.immediate(rangesToLock.toPeers(prev.placements, 
prev.directory));
         BootstrapAndJoin plan = new BootstrapAndJoin(barrier,
@@ -154,9 +153,10 @@ public class PrepareJoin implements Transformation
 
         LockedRanges newLockedRanges = prev.lockedRanges.lock(lockKey, 
rangesToLock);
         DataPlacements startingPlacements = 
transitionPlan.toSplit.apply(prev.placements);
-        proposed = proposed.with(newLockedRanges)
-                           .with(startingPlacements)
-                           .with(prev.inProgressSequences.with(nodeId, plan));
+        ClusterMetadata.Transformer proposed = prev.transformer()
+                                                   .with(newLockedRanges)
+                                                   .with(startingPlacements)
+                                                   
.with(prev.inProgressSequences.with(nodeId, plan));
 
         return success(proposed, rangesToLock);
     }
@@ -278,10 +278,12 @@ public class PrepareJoin implements Transformation
     public static class FinishJoin extends ApplyPlacementDeltas
     {
         public static final Serializer serializer = new Serializer();
+        public final Set<Token> tokens;
 
-        public FinishJoin(NodeId nodeId, PlacementDeltas delta, 
LockedRanges.Key unlockKey)
+        public FinishJoin(NodeId nodeId, Set<Token> tokens, PlacementDeltas 
delta, LockedRanges.Key unlockKey)
         {
             super(nodeId, delta, unlockKey, true);
+            this.tokens = tokens;
         }
 
         @Override
@@ -293,14 +295,50 @@ public class PrepareJoin implements Transformation
         public ClusterMetadata.Transformer transform(ClusterMetadata prev, 
ClusterMetadata.Transformer transformer)
         {
             return transformer.join(nodeId)
+                              .proposeToken(nodeId, tokens)
+                              .addToRackAndDC(nodeId)
                               .with(prev.inProgressSequences.without(nodeId));
         }
 
         public static final class Serializer extends SerializerBase<FinishJoin>
         {
+            @Override
+            public void serialize(Transformation t, DataOutputPlus out, 
Version version) throws IOException
+            {
+                super.serialize(t, out, version);
+                Set<Token> tokens = ((FinishJoin)t).tokens;
+                out.writeUnsignedVInt32(tokens.size());
+                for (Token token : tokens)
+                    Token.metadataSerializer.serialize(token, out, version);
+            }
+
+            @Override
+            public FinishJoin deserialize(DataInputPlus in, Version version) 
throws IOException
+            {
+                NodeId nodeId = NodeId.serializer.deserialize(in, version);
+                PlacementDeltas delta = 
PlacementDeltas.serializer.deserialize(in, version);
+                LockedRanges.Key lockKey = 
LockedRanges.Key.serializer.deserialize(in, version);
+                int numTokens = in.readUnsignedVInt32();
+                Set<Token> tokens = new HashSet<>();
+                for (int i = 0; i < numTokens; i++)
+                    tokens.add(Token.metadataSerializer.deserialize(in, 
version));
+                return new FinishJoin(nodeId, tokens, delta, lockKey);
+            }
+
+            @Override
+            public long serializedSize(Transformation t, Version version)
+            {
+                long size = super.serializedSize(t, version);
+                Set<Token> tokens = ((FinishJoin)t).tokens;
+                size += TypeSizes.sizeofUnsignedVInt(tokens.size());
+                for (Token token : tokens)
+                    size += Token.metadataSerializer.serializedSize(token, 
version);
+                return size;
+            }
+
             FinishJoin construct(NodeId nodeId, PlacementDeltas delta, 
LockedRanges.Key lockKey)
             {
-                return new FinishJoin(nodeId, delta, lockKey);
+                throw new IllegalStateException();
             }
         }
     }
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java 
b/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java
index b1363d4b8c..cdbc281856 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java
@@ -100,7 +100,6 @@ public class PrepareLeave implements Transformation
         PlacementDeltas finishDelta = transitionPlan.removeFromWrites();
 
         LockedRanges.Key unlockKey = LockedRanges.keyFor(proposed.epoch);
-        InetAddressAndPort leaving = prev.directory.endpoint(nodeId);
 
         StartLeave start = new StartLeave(nodeId, startDelta, unlockKey);
         MidLeave mid = new MidLeave(nodeId, midDelta, unlockKey);
diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java 
b/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java
index f8e178150c..ae9619c01a 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java
@@ -108,7 +108,6 @@ public class PrepareMove implements Transformation
 
         return success(prev.transformer()
                            .withNodeState(nodeId, NodeState.MOVING)
-                           .proposeToken(nodeId, tokens)
                            .with(prev.lockedRanges.lock(lockKey, rangesToLock))
                            .with(transitionPlan.toSplit.apply(prev.placements))
                            .with(prev.inProgressSequences.with(nodeId, 
sequence)),
diff --git a/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java 
b/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java
index 802718ff93..60dac7cebc 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/UnsafeJoin.java
@@ -55,6 +55,7 @@ public class UnsafeJoin extends PrepareJoin
     {
         return "UnsafeJoin{" +
                "id=" + nodeId +
+               ", tokens=" + tokens +
                '}';
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java
index 57105d1b55..40de4dd8be 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java
@@ -301,18 +301,13 @@ public class PlacementSimulator
                 // add the new node to the system and split ranges according 
to its token, while retaining current
                 // placement. This step will always be executed immediately, 
whereas subsequent steps may be deferred
                 debug.log("Splitting ranges to prepare for join of " + node + 
"\n");
-                return model.withNodes(move(splitNodes, token, node))
-                            
.withReadPlacements(splitReplicated(baseState.readPlacements, token))
+                return 
model.withReadPlacements(splitReplicated(baseState.readPlacements, token))
                             
.withWritePlacements(splitReplicated(baseState.writePlacements, token));
             },
             (model) -> { // revert
                 // final stage of reverting a join is to undo the range splits 
performed by preparing the operation
                 debug.log("Reverting range splits from prepare-join of " + 
node + "\n");
-                List<Node> newNodes = new ArrayList<>(model.nodes);
-                Node toRemove = new Node(token, node);
-                newNodes.remove(toRemove);
-                return model.withNodes(newNodes)
-                            
.withWritePlacements(mergeReplicated(model.writePlacements, token))
+                return 
model.withWritePlacements(mergeReplicated(model.writePlacements, token))
                             
.withReadPlacements(mergeReplicated(model.readPlacements, token));
             })
         );
@@ -364,7 +359,11 @@ public class PlacementSimulator
                                      "\twriteModifications=\n%s",
                                      node, token,
                                      diffsToString(step3WriteCommands)));
-                return 
model.withWritePlacements(PlacementSimulator.apply(model.writePlacements, 
step3WriteCommands));
+                List<Node> newNodes = new ArrayList<>(model.nodes);
+                newNodes.add(new Node(token, node));
+                Collections.sort(newNodes, Node::compareTo);
+                return model.withNodes(newNodes)
+                            
.withWritePlacements(PlacementSimulator.apply(model.writePlacements, 
step3WriteCommands));
             },
             (model) -> { //revert
                 throw new IllegalStateException("Can't revert finish-join of " 
+ node + ", operation is already complete\n");
@@ -383,7 +382,6 @@ public class PlacementSimulator
                            .get();
 
         List<Node> origNodes = new ArrayList<>(baseState.nodes);
-
         List<Node> finalNodes = new ArrayList<>();
         for (int i = 0; i < origNodes.size(); i++)
         {
@@ -405,26 +403,12 @@ public class PlacementSimulator
         steps.add(new Transformation(
         (model) -> { // apply
             debug.log(String.format("Splitting ranges to prepare for move of 
%s to %d\n", node, newToken));
-            List<Node> newNodes = new ArrayList<>(model.nodes);
-            newNodes.add(new Node(newToken, node));
-            Collections.sort(newNodes, Node::compareTo);
-
-            return model.withNodes(newNodes)
-                        
.withReadPlacements(splitReplicated(model.readPlacements, newToken))
+            return 
model.withReadPlacements(splitReplicated(model.readPlacements, newToken))
                         
.withWritePlacements(splitReplicated(model.writePlacements, newToken));
         },
         (model) -> { // revert
             debug.log(String.format("Reverting range splits from prepare move 
of %s to %d\n", node, newToken));
-            List<Node> revertedNodes = new ArrayList<>();
-            for (Node n : model.nodes)
-            {
-                if (n.token == newToken)
-                    continue;
-                revertedNodes.add(n);
-            }
-            Collections.sort(revertedNodes, Node::compareTo);
-            return model.withNodes(revertedNodes)
-                        
.withWritePlacements(mergeReplicated(model.writePlacements, newToken))
+            return 
model.withWritePlacements(mergeReplicated(model.writePlacements, newToken))
                         
.withReadPlacements(mergeReplicated(model.readPlacements, newToken));
         }));
 
@@ -479,21 +463,23 @@ public class PlacementSimulator
                                     "\twriteModifications=\n%s",
                                     node, newToken, diffsToString(diff)));
 
-            Map<Range, List<Node>> writePlacements = model.writePlacements;
-
-            writePlacements = PlacementSimulator.apply(writePlacements, diff);
-
-            List<Node> nodes = new ArrayList<>();
-            for (Node n : model.nodes)
+            List<Node> currentNodes = new ArrayList<>(model.nodes);
+            List<Node> newNodes = new ArrayList<>();
+            for (int i = 0; i < currentNodes.size(); i++)
             {
-                if (n.token == oldLocation.token)
+                if (currentNodes.get(i).id == oldLocation.id)
                     continue;
-                nodes.add(n);
+                newNodes.add(currentNodes.get(i));
             }
-            Collections.sort(nodes, Node::compareTo);
+            newNodes.add(new Node(newToken, node));
+            Collections.sort(newNodes, Node::compareTo);
+
+            Map<Range, List<Node>> writePlacements = model.writePlacements;
+            writePlacements = PlacementSimulator.apply(writePlacements, diff);
+
             return model.withWritePlacements(mergeReplicated(writePlacements, 
oldLocation.token))
                         
.withReadPlacements(mergeReplicated(model.readPlacements, oldLocation.token))
-                        .withNodes(nodes);
+                        .withNodes(newNodes);
         },
         (model) -> {
             throw new IllegalStateException(String.format("Can't revert 
finish-move of %d, operation is already complete", newToken));
@@ -583,6 +569,7 @@ public class PlacementSimulator
                                      diffsToString(step3WriteCommands)));
                 List<Node> newNodes = new ArrayList<>(model.nodes);
                 newNodes.remove(toRemove);
+                Collections.sort(newNodes, Node::compareTo);
                 Map<Range, List<Node>> writes = 
PlacementSimulator.apply(model.writePlacements, step3WriteCommands);
                 return 
model.withReadPlacements(mergeReplicated(model.readPlacements, toRemove.token))
                             .withWritePlacements(mergeReplicated(writes, 
toRemove.token))
diff --git 
a/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
 
b/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
index 5e76fed0c0..5335df8627 100644
--- 
a/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
+++ 
b/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
@@ -125,13 +125,15 @@ public class InProgressSequenceCancellationTest
         // No need to create a deltas or placements for after FINISH_JOIN 
because it's too late to cancel by then
         PlacementDeltas finishDeltas = PlacementDeltas.empty();
 
+        Set<Token> tokens = Collections.singleton(token(random.nextLong()));
+
         BootstrapAndJoin plan = new BootstrapAndJoin(ProgressBarrier.NONE,
                                                      key,
                                                      
Transformation.Kind.FINISH_JOIN,
                                                      prepareDeltas,
                                                      new 
PrepareJoin.StartJoin(nodeId, startDeltas, key),
                                                      new 
PrepareJoin.MidJoin(nodeId, midDeltas, key),
-                                                     new 
PrepareJoin.FinishJoin(nodeId, finishDeltas, key),
+                                                     new 
PrepareJoin.FinishJoin(nodeId, tokens, finishDeltas, key),
                                                      false,
                                                      false);
 
@@ -142,7 +144,6 @@ public class InProgressSequenceCancellationTest
                                        .with(afterMid)
                                        .with(locked)
                                        .withNodeState(nodeId, 
NodeState.BOOTSTRAPPING)
-                                       .proposeToken(nodeId, 
Collections.singleton(token(random.nextLong())))
                                        
.with(before.inProgressSequences.with(nodeId, plan))
                                        .build().metadata;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to