This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new c5c4cd4e57 Update use of transition plan in PrepareReplace c5c4cd4e57 is described below commit c5c4cd4e57515785d91ebaa918d533baf3222215 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Tue Apr 2 15:54:04 2024 +0200 Update use of transition plan in PrepareReplace Patch by Sam Tunnicliffe and marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-19132 --- .../tcm/ownership/PlacementTransitionPlan.java | 12 ++++---- .../cassandra/tcm/transformations/PrepareJoin.java | 2 +- .../tcm/transformations/PrepareLeave.java | 2 +- .../cassandra/tcm/transformations/PrepareMove.java | 2 +- .../tcm/transformations/PrepareReplace.java | 33 ++++------------------ .../tcm/ownership/PlacementTransitionPlanTest.java | 30 ++++++++++++-------- 6 files changed, 33 insertions(+), 48 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java index 1d95df4122..c93d33f632 100644 --- a/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java +++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,17 +160,14 @@ public class PlacementTransitionPlan * @return null if everything is good, otherwise a Transformation.Result rejection containing information about the bad replica */ @Nullable - public static void assertPreExistingWriteReplica(DataPlacements placements, PlacementTransitionPlan transitionPlan) + public void assertPreExistingWriteReplica(DataPlacements placements) { - assertPreExistingWriteReplica(placements, - transitionPlan.toSplit, - transitionPlan.addToWrites(), - transitionPlan.moveReads(), - transitionPlan.removeFromWrites()); + assertPreExistingWriteReplica(placements, toSplit, addToWrites(), moveReads(), removeFromWrites()); } @Nullable - public static void assertPreExistingWriteReplica(DataPlacements placements, PlacementDeltas ... deltasInOrder) + @VisibleForTesting + protected void assertPreExistingWriteReplica(DataPlacements placements, PlacementDeltas... deltasInOrder) { for (PlacementDeltas deltas : deltasInOrder) { diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java b/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java index a684e76d50..6a518975b4 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java +++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java @@ -167,7 +167,7 @@ public class PrepareJoin implements Transformation void assertPreExistingWriteReplica(DataPlacements placements, PlacementTransitionPlan transitionPlan) { - PlacementTransitionPlan.assertPreExistingWriteReplica(placements, transitionPlan); + transitionPlan.assertPreExistingWriteReplica(placements); } public static abstract class Serializer<T extends PrepareJoin> implements AsymmetricMetadataSerializer<Transformation, T> diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java b/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java index e8512b1ce9..0ad37f8bcd 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java +++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareLeave.java @@ -115,7 +115,7 @@ public class PrepareLeave implements Transformation PlacementDeltas startDelta = transitionPlan.addToWrites(); PlacementDeltas midDelta = transitionPlan.moveReads(); PlacementDeltas finishDelta = transitionPlan.removeFromWrites(); - PlacementTransitionPlan.assertPreExistingWriteReplica(prev.placements, transitionPlan); + transitionPlan.assertPreExistingWriteReplica(prev.placements); LockedRanges.Key unlockKey = LockedRanges.keyFor(proposed.epoch); diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java b/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java index 42d276b816..e7e278d0d2 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java +++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareMove.java @@ -109,7 +109,7 @@ public class PrepareMove implements Transformation StartMove startMove = new StartMove(nodeId, transitionPlan.addToWrites(), lockKey); MidMove midMove = new MidMove(nodeId, transitionPlan.moveReads(), lockKey); FinishMove finishMove = new FinishMove(nodeId, tokens, transitionPlan.removeFromWrites(), lockKey); - PlacementTransitionPlan.assertPreExistingWriteReplica(prev.placements, transitionPlan); + transitionPlan.assertPreExistingWriteReplica(prev.placements); Move sequence = Move.newSequence(prev.nextEpoch(), lockKey, diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareReplace.java b/src/java/org/apache/cassandra/tcm/transformations/PrepareReplace.java index 809ce9b95f..8b1577822b 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/PrepareReplace.java +++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareReplace.java @@ -96,29 +96,8 @@ public class PrepareReplace implements Transformation replaced, replacement, prev.schema.getKeyspaces()); - PlacementDeltas.Builder addNewNodeToWrites = PlacementDeltas.builder(); - PlacementDeltas.Builder addNewNodeToReads = PlacementDeltas.builder(); - PlacementDeltas.Builder removeOldNodeFromWrites = PlacementDeltas.builder(); - - // Only addition of the new node to the write groups is done as a consequence of the first transformation. Adding the new - // node to the various read groups is deferred until the second transformation, after bootstrap. Also, track which ranges - // are going to be affected by this operation (i.e. which will be the "pending" ranges for the new node. If the - // plan is accepted those ranges will be locked to prevent other plans submitted later from interacting with the - // same ranges. - LockedRanges.AffectedRangesBuilder affectedRanges = LockedRanges.AffectedRanges.builder(); - transitionPlan.toMaximal.forEach((replication, delta) -> { - delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); - addNewNodeToWrites.put(replication, delta.onlyWrites().onlyAdditions()); - addNewNodeToReads.put(replication, delta.onlyReads()); - }); - - transitionPlan.toFinal.forEach((replication, delta) -> { - delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); - addNewNodeToReads.put(replication, delta.onlyReads()); - removeOldNodeFromWrites.put(replication, delta.onlyWrites().onlyRemovals()); - }); - - LockedRanges.AffectedRanges rangesToLock = affectedRanges.build(); + + LockedRanges.AffectedRanges rangesToLock = transitionPlan.affectedRanges(); LockedRanges.Key alreadyLockedBy = lockedRanges.intersects(rangesToLock); if (!alreadyLockedBy.equals(LockedRanges.NOT_LOCKED)) @@ -127,10 +106,10 @@ public class PrepareReplace implements Transformation alreadyLockedBy, lockedRanges, rangesToLock)); } - StartReplace start = new StartReplace(replaced, replacement, addNewNodeToWrites.build(), unlockKey); - MidReplace mid = new MidReplace(replaced, replacement, addNewNodeToReads.build(), unlockKey); - FinishReplace finish = new FinishReplace(replaced, replacement, removeOldNodeFromWrites.build(), unlockKey); - PlacementTransitionPlan.assertPreExistingWriteReplica(prev.placements, start.delta, mid.delta, finish.delta); + StartReplace start = new StartReplace(replaced, replacement, transitionPlan.addToWrites(), unlockKey); + MidReplace mid = new MidReplace(replaced, replacement, transitionPlan.moveReads(), unlockKey); + FinishReplace finish = new FinishReplace(replaced, replacement, transitionPlan.removeFromWrites(), unlockKey); + transitionPlan.assertPreExistingWriteReplica(prev.placements); Set<Token> tokens = new HashSet<>(prev.tokenMap.tokens(replaced)); BootstrapAndReplace plan = BootstrapAndReplace.newSequence(prev.nextEpoch(), diff --git a/test/unit/org/apache/cassandra/tcm/ownership/PlacementTransitionPlanTest.java b/test/unit/org/apache/cassandra/tcm/ownership/PlacementTransitionPlanTest.java index 4da753dcdf..74d8778afd 100644 --- a/test/unit/org/apache/cassandra/tcm/ownership/PlacementTransitionPlanTest.java +++ b/test/unit/org/apache/cassandra/tcm/ownership/PlacementTransitionPlanTest.java @@ -44,7 +44,7 @@ public class PlacementTransitionPlanTest PlacementDeltas addRead = PlacementDeltas.builder() .put(params, addReadDelta(newReads)).build(); - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addRead); + assertPreExistingWriteReplica(startPlacements, addRead); } @Test @@ -59,7 +59,7 @@ public class PlacementTransitionPlanTest PlacementDeltas addRead = PlacementDeltas.builder() .put(params, addReadDelta(newReplica)).build(); - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addWrite, addRead); + assertPreExistingWriteReplica(startPlacements, addWrite, addRead); } @Test @@ -74,7 +74,7 @@ public class PlacementTransitionPlanTest PlacementDeltas addRead = PlacementDeltas.builder() .put(params, addReadDelta(readReplicas)).build(); - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addWrite, addRead); + assertPreExistingWriteReplica(startPlacements, addWrite, addRead); } @Test public void testAddSplitReadReplica() @@ -88,7 +88,7 @@ public class PlacementTransitionPlanTest PlacementDeltas addRead = PlacementDeltas.builder() .put(params, addReadDelta(readReplicas)).build(); - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addWrite, addRead); + assertPreExistingWriteReplica(startPlacements, addWrite, addRead); } @Test @@ -103,7 +103,7 @@ public class PlacementTransitionPlanTest PlacementDeltas addRead = PlacementDeltas.builder() .put(params, addReadDelta(readReplicas)).build(); - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addWrite, addRead); + assertPreExistingWriteReplica(startPlacements, addWrite, addRead); } @Test(expected = Transformation.RejectedTransformationException.class) @@ -118,7 +118,7 @@ public class PlacementTransitionPlanTest PlacementDeltas addRead = PlacementDeltas.builder() .put(params, addReadDelta(readReplicas)).build(); - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addWrite, addRead); + assertPreExistingWriteReplica(startPlacements, addWrite, addRead); } @Test @@ -138,7 +138,7 @@ public class PlacementTransitionPlanTest .put(params, addReadDelta(readReplicas)).build(); // first delta adds (0, 20] as write, second (20, 40] - make sure both are in placements when adding the read replica; - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addWrite1, addWrite2, addRead); + assertPreExistingWriteReplica(startPlacements, addWrite1, addWrite2, addRead); } @Test(expected = Transformation.RejectedTransformationException.class) @@ -154,7 +154,7 @@ public class PlacementTransitionPlanTest PlacementDeltas addRead = PlacementDeltas.builder() .put(params, addReadDelta(fullRead)).build(); - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addWrite, addRead); + assertPreExistingWriteReplica(startPlacements, addWrite, addRead); } @Test @@ -170,7 +170,7 @@ public class PlacementTransitionPlanTest PlacementDeltas addRead = PlacementDeltas.builder() .put(params, addReadDelta(transientRead)).build(); - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addWrite, addRead); + assertPreExistingWriteReplica(startPlacements, addWrite, addRead); } @Test @@ -186,7 +186,7 @@ public class PlacementTransitionPlanTest PlacementDeltas addRead = PlacementDeltas.builder() .put(params, addReadDelta(transientRead)).build(); - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addWrite, addRead); + assertPreExistingWriteReplica(startPlacements, addWrite, addRead); } @Test(expected = Transformation.RejectedTransformationException.class) @@ -206,7 +206,15 @@ public class PlacementTransitionPlanTest PlacementDeltas addRead = PlacementDeltas.builder() .put(params, addReadDelta(readReplicas)).build(); - PlacementTransitionPlan.assertPreExistingWriteReplica(startPlacements, addWriteFull, addWriteTransient, addRead); + assertPreExistingWriteReplica(startPlacements, addWriteFull, addWriteTransient, addRead); + } + + private void assertPreExistingWriteReplica(DataPlacements start, PlacementDeltas ... deltasInOrder) + { + new PlacementTransitionPlan(PlacementDeltas.empty(), + PlacementDeltas.empty(), + PlacementDeltas.empty(), + PlacementDeltas.empty()).assertPreExistingWriteReplica(start, deltasInOrder); } private PlacementDeltas.PlacementDelta addReadDelta(RangesByEndpoint replica) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org