This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 086fbe30cd14866fe64ad8f3f3f6cbfa31de8718 Author: Alex Petrov <[email protected]> AuthorDate: Mon Mar 6 18:46:40 2023 +0100 [CEP-21] Ensure that global log replication factor is maintained after decommission patch by Alex Petrov; reviewed by Marcus Eriksson and Sam Tunnicliffe for CASSANDRA-18416 --- .../apache/cassandra/net/ResponseVerbHandler.java | 1 + .../apache/cassandra/service/StorageService.java | 36 ++++-- .../cassandra/tcm/AbstractLocalProcessor.java | 10 +- .../cassandra/tcm/ClusterMetadataService.java | 61 ++++----- src/java/org/apache/cassandra/tcm/Commit.java | 18 ++- .../apache/cassandra/tcm/PaxosBackedProcessor.java | 12 +- .../GossipProcessor.java => Processor.java} | 32 +++-- .../org/apache/cassandra/tcm/RemoteProcessor.java | 2 +- src/java/org/apache/cassandra/tcm/Startup.java | 6 +- .../org/apache/cassandra/tcm/Transformation.java | 12 +- .../cassandra/tcm/migration/GossipProcessor.java | 4 +- .../apache/cassandra/tcm/sequences/AddToCMS.java | 64 ++++++---- .../cms/BaseMembershipTransformation.java | 6 + .../{FinishAddMember.java => FinishAddToCMS.java} | 18 ++- .../cms/{RemoveMember.java => RemoveFromCMS.java} | 10 +- .../{StartAddMember.java => StartAddToCMS.java} | 20 ++- .../distributed/test/log/CMSTestBase.java | 3 +- .../test/log/CoordinatorPathTestBase.java | 16 +-- .../distributed/test/log/TestProcessor.java | 8 +- .../distributed/test/ring/CMSMembershipTest.java | 136 +++++++++++++++++++++ .../test/ring/ConsistentBootstrapTest.java | 27 ---- .../unit/org/apache/cassandra/ServerTestUtils.java | 3 +- 22 files changed, 327 insertions(+), 178 deletions(-) diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 3b4a1c69d4..2dff292627 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -48,6 +48,7 @@ class ResponseVerbHandler implements IVerbHandler // that executes something on the gossip stage as well. !Stage.GOSSIP.executor().inExecutor()) { + logger.debug("Learned about next epoch {} from {} in {}", message.epoch(), message.from(), message.verb()); ClusterMetadataService.instance().maybeCatchup(message.epoch()); } diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 88fa90019b..791a8b564b 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -68,13 +68,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; -import com.google.common.collect.Sets; +import com.google.common.collect.*; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; @@ -189,6 +183,7 @@ import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.InProgressSequence; import org.apache.cassandra.tcm.Transformation; @@ -196,8 +191,10 @@ import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.migration.GossipCMSListener; import org.apache.cassandra.tcm.ownership.TokenMap; +import org.apache.cassandra.tcm.sequences.AddToCMS; import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.InProgressSequences; +import org.apache.cassandra.tcm.sequences.ProgressBarrier; import org.apache.cassandra.tcm.transformations.CancelInProgressSequence; import org.apache.cassandra.tcm.transformations.PrepareJoin; import org.apache.cassandra.tcm.transformations.PrepareLeave; @@ -206,6 +203,7 @@ import org.apache.cassandra.tcm.transformations.PrepareReplace; import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.tcm.transformations.Startup; import org.apache.cassandra.tcm.transformations.UnsafeJoin; +import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS; import org.apache.cassandra.transport.ClientResourceLimits; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.Clock; @@ -249,7 +247,6 @@ import static org.apache.cassandra.tcm.membership.NodeState.BOOTSTRAPPING; import static org.apache.cassandra.tcm.membership.NodeState.JOINED; import static org.apache.cassandra.tcm.membership.NodeState.LEAVING; import static org.apache.cassandra.tcm.membership.NodeState.MOVING; -import static org.apache.cassandra.tcm.membership.NodeState.REGISTERED; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; import static org.apache.cassandra.utils.FBUtilities.now; @@ -3483,6 +3480,29 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new IllegalStateException("This cluster is migrating to cluster metadata, can't decommission until that is done."); ClusterMetadata metadata = ClusterMetadata.current(); NodeId self = metadata.myNodeId(); + + Set<InetAddressAndPort> cmsMembers = metadata.placements.get(ReplicationParams.meta()).reads.byEndpoint().keySet(); + if (cmsMembers.contains(FBUtilities.getBroadcastAddressAndPort())) + { + if (metadata.directory.peerIds().size() == cmsMembers.size()) + throw new IllegalStateException("Can not decomission the node as it will decrease the replication factor of CMS keyspace"); + + NodeId replacement = null; + for (Entry<NodeId, NodeAddresses> e : metadata.directory.addresses.entrySet()) + { + if (!cmsMembers.contains(e.getValue().broadcastAddress)) + { + logger.info("Nominating an alternative CMS node ({}) before decommission.", e.getValue().broadcastAddress); + AddToCMS.initiate(e.getKey(), e.getValue().broadcastAddress); + replacement = e.getKey(); + break; + } + } + + Epoch epoch = ClusterMetadataService.instance().commit(new RemoveFromCMS(FBUtilities.getBroadcastAddressAndPort())).epoch; + new ProgressBarrier(epoch, ImmutableSet.of(replacement), false).await(); + } + logger.info("starting decom with {} {}", metadata.epoch, self); if (InProgressSequences.isLeave(metadata.inProgressSequences.get(self))) { diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java index 679adcbe5f..07950ce6e9 100644 --- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java @@ -23,11 +23,12 @@ import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.tcm.log.Replication; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; +import org.apache.cassandra.tcm.log.Replication; +import org.apache.cassandra.utils.FBUtilities; -public abstract class AbstractLocalProcessor implements ClusterMetadataService.Processor +public abstract class AbstractLocalProcessor implements Processor { private static final Logger logger = LoggerFactory.getLogger(PaxosBackedProcessor.class); @@ -57,10 +58,11 @@ public abstract class AbstractLocalProcessor implements ClusterMetadataService.P { Replication replication; if (lastKnown == null || lastKnown.isDirectlyBefore(result.success().metadata.epoch)) + { replication = Replication.of(new Entry(entryId, result.success().metadata.epoch, transform)); + } else { - // TODO: catch up at most to this epoch replication = log.getCommittedEntries(lastKnown); } @@ -84,6 +86,8 @@ public abstract class AbstractLocalProcessor implements ClusterMetadataService.P while (true) { ClusterMetadata previous = log.waitForHighestConsecutive(); + if (!previous.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort())) + throw new IllegalStateException("Node is not a member of CMS anymore"); Transformation.Result result = transform.execute(previous); // if we're rejected, just try to catch up to the latest distributed state if (result.isRejected()) diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index be827cc9d6..82eecd461f 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -50,6 +50,7 @@ import org.apache.cassandra.tcm.ownership.UniformRangePlacement; import org.apache.cassandra.tcm.sequences.AddToCMS; import org.apache.cassandra.tcm.transformations.SealPeriod; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import static java.util.stream.Collectors.toSet; import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; @@ -86,28 +87,9 @@ public class ClusterMetadataService return instance; } - public interface Processor - { - // TODO: remove entry id from the interface? - /** - * Method is _only_ responsible to commit the transformation to the cluster metadata. Implementers _have to ensure_ - * local visibility and enactment of the metadata! - */ - Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown); - // TODO: add a debounce to requestReplay. Right now, because of ResponseVerbHandler, it is possible to send - // a barage of these requests. - /** - * Replays to the highest known epoch. - * - * Upon replay, all items _at least_ up to returned epoch will be visible. - */ - ClusterMetadata replayAndWait(); - } - private final PlacementProvider placementProvider; private final Processor processor; private final LocalLog log; - private final Commit.Replicator replicator; private final MetadataSnapshots snapshots; private final Replication.ReplicationHandler replicationHandler; @@ -146,15 +128,15 @@ public class ClusterMetadataService Processor localProcessor = wrapProcessor.apply(new PaxosBackedProcessor(log)); RemoteProcessor remoteProcessor = new RemoteProcessor(log, Discovery.instance::discoveredNodes); GossipProcessor gossipProcessor = new GossipProcessor(); - replicator = new Commit.DefaultReplicator(() -> log.metadata().directory); + Commit.Replicator replicator = new Commit.DefaultReplicator(() -> log.metadata().directory); currentEpochHandler = new CurrentEpochRequestHandler(); replayRequestHandler = new SwitchableHandler<>(new Replay.Handler(), cmsStateSupplier); commitRequestHandler = new SwitchableHandler<>(new Commit.Handler(localProcessor, replicator), cmsStateSupplier); processor = new SwitchableProcessor(localProcessor, remoteProcessor, gossipProcessor, + replicator, cmsStateSupplier); - replicationHandler = new Replication.ReplicationHandler(log); logNotifyHandler = new Replication.LogNotifyHandler(log); } @@ -169,8 +151,7 @@ public class ClusterMetadataService { this.placementProvider = placementProvider; this.log = log; - this.processor = processor; - this.replicator = replicator; + this.processor = new SwitchableProcessor(processor, null, null, replicator, () -> State.LOCAL); this.snapshots = snapshots; replicationHandler = new Replication.ReplicationHandler(log); @@ -185,7 +166,6 @@ public class ClusterMetadataService MetadataSnapshots snapshots, LocalLog log, Processor processor, - Commit.Replicator replicator, Replication.ReplicationHandler replicationHandler, Replication.LogNotifyHandler logNotifyHandler, CurrentEpochRequestHandler currentEpochHandler, @@ -196,7 +176,6 @@ public class ClusterMetadataService this.snapshots = snapshots; this.log = log; this.processor = processor; - this.replicator = replicator; this.replicationHandler = replicationHandler; this.logNotifyHandler = logNotifyHandler; this.currentEpochHandler = currentEpochHandler; @@ -217,7 +196,6 @@ public class ClusterMetadataService MetadataSnapshots.NO_OP, log, new AtomicLongBackedProcessor(log), - Commit.Replicator.NO_OP, new Replication.ReplicationHandler(log), new Replication.LogNotifyHandler(log), new CurrentEpochRequestHandler(), @@ -343,11 +321,6 @@ public class ClusterMetadataService if (result.isSuccess()) { - // TODO: we could actually move replicator to processor, if we have a source node attached to the transformation. - // In other words, we simply know who was a submitter, so we wouldn't replicate to the submitter, since the - // submitter is going to learn about the result of execution by other means, namely the response. - if (state() == LOCAL) replicator.send(result, null); - while (!backoff.reachedMax()) { try @@ -542,34 +515,50 @@ public class ClusterMetadataService private final RemoteProcessor remote; private final GossipProcessor gossip; private final Supplier<State> cmsStateSupplier; + private final Commit.Replicator replicator; - SwitchableProcessor(Processor local, RemoteProcessor remote, GossipProcessor gossip, Supplier<State> cmsStateSupplier) + SwitchableProcessor(Processor local, + RemoteProcessor remote, + GossipProcessor gossip, + Commit.Replicator replicator, + Supplier<State> cmsStateSupplier) { this.local = local; this.remote = remote; this.gossip = gossip; + this.replicator = replicator; this.cmsStateSupplier = cmsStateSupplier; } @VisibleForTesting public Processor delegate() + { + return delegateInternal().right; + } + + private Pair<State, Processor> delegateInternal() { State state = cmsStateSupplier.get(); switch (state) { case LOCAL: - return local; + return Pair.create(state, local); case REMOTE: - return remote; + return Pair.create(state, remote); case GOSSIP: - return gossip; + return Pair.create(state, gossip); } throw new IllegalStateException("Bad CMS state: " + state); } + @Override public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown) { - return delegate().commit(entryId, transform, lastKnown); + Pair<State, Processor> delegate = delegateInternal(); + Commit.Result result = delegate.right.commit(entryId, transform, lastKnown); + if (delegate.left == State.LOCAL) + replicator.send(result, null); + return result; } public ClusterMetadata replayAndWait() diff --git a/src/java/org/apache/cassandra/tcm/Commit.java b/src/java/org/apache/cassandra/tcm/Commit.java index e4548085ee..a68e2368b6 100644 --- a/src/java/org/apache/cassandra/tcm/Commit.java +++ b/src/java/org/apache/cassandra/tcm/Commit.java @@ -40,8 +40,6 @@ import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.vint.VIntCoding; -import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; - public class Commit { private static final Logger logger = LoggerFactory.getLogger(Commit.class); @@ -239,26 +237,26 @@ public class Commit } @VisibleForTesting - public static IVerbHandler<Commit> handlerForTests(ClusterMetadataService.Processor processor, Replicator replicator, BiConsumer<Message<?>, InetAddressAndPort> messagingService) + public static IVerbHandler<Commit> handlerForTests(Processor processor, Replicator replicator, BiConsumer<Message<?>, InetAddressAndPort> messagingService) { return new Handler(processor, replicator, messagingService); } static class Handler implements IVerbHandler<Commit> { - private final ClusterMetadataService.Processor processor; - private final Replicator replicate; + private final Processor processor; + private final Replicator replicator; private final BiConsumer<Message<?>, InetAddressAndPort> messagingService; - Handler(ClusterMetadataService.Processor processor, Replicator replicator) + Handler(Processor processor, Replicator replicator) { this(processor, replicator, MessagingService.instance()::send); } - Handler(ClusterMetadataService.Processor processor, Replicator replicator, BiConsumer<Message<?>, InetAddressAndPort> messagingService) + Handler(Processor processor, Replicator replicator, BiConsumer<Message<?>, InetAddressAndPort> messagingService) { this.processor = processor; - this.replicate = replicator; + this.replicator = replicator; this.messagingService = messagingService; } @@ -269,7 +267,7 @@ public class Commit if (result.isSuccess()) { Result.Success success = result.success(); - replicate.send(success, message.from()); + replicator.send(success, message.from()); logger.info("Responding with full result {} to sender {}", result, message.from()); // TODO: this response message can get lost; how do we re-discover this on the other side? // TODO: what if we have holes after replaying? @@ -300,7 +298,7 @@ public class Commit public void send(Result result, InetAddressAndPort source) { - if (!result.isSuccess() || ClusterMetadataService.state() != LOCAL) + if (!result.isSuccess()) return; Result.Success success = result.success(); diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java index d4ccb20425..2ab2f921e9 100644 --- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java @@ -29,10 +29,10 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.Verb; -import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogState; +import org.apache.cassandra.tcm.log.LogStorage; import org.apache.cassandra.utils.concurrent.CountDownLatch; import static org.apache.cassandra.schema.DistributedMetadataLogKeyspace.tryCommit; @@ -46,12 +46,6 @@ public class PaxosBackedProcessor extends AbstractLocalProcessor super(log); } - @Override - public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown) - { - return super.commit(entryId, transform, lastKnown); - } - @Override protected boolean tryCommitOne(Entry.Id entryId, Transformation transform, Epoch previousEpoch, Epoch nextEpoch, @@ -77,7 +71,7 @@ public class PaxosBackedProcessor extends AbstractLocalProcessor // TODO: test applying LogStates from multiple responses if (replica.isSelf()) { - log.append(DistributedMetadataLogKeyspace.getLogState(metadata.epoch)); + log.append(LogStorage.SystemKeyspace.getLogState(metadata.epoch)); latch.decrement(); } else @@ -97,6 +91,4 @@ public class PaxosBackedProcessor extends AbstractLocalProcessor latch.awaitUninterruptibly(10, TimeUnit.SECONDS); return log.waitForHighestConsecutive(); } - - } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java b/src/java/org/apache/cassandra/tcm/Processor.java similarity index 53% copy from src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java copy to src/java/org/apache/cassandra/tcm/Processor.java index 71bb0424d8..22fa4d14a4 100644 --- a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java +++ b/src/java/org/apache/cassandra/tcm/Processor.java @@ -16,26 +16,24 @@ * limitations under the License. */ -package org.apache.cassandra.tcm.migration; +package org.apache.cassandra.tcm; -import org.apache.cassandra.tcm.ClusterMetadataService; -import org.apache.cassandra.tcm.Commit; import org.apache.cassandra.tcm.log.Entry; -import org.apache.cassandra.tcm.Epoch; -import org.apache.cassandra.tcm.Transformation; -import org.apache.cassandra.tcm.ClusterMetadata; -public class GossipProcessor implements ClusterMetadataService.Processor +public interface Processor { - @Override - public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown) - { - throw new IllegalStateException("Can't commit transformations when running in gossip mode. Enable the ClusterMetadataService with `nodetool addtocms`."); - } + /** + * Method is _only_ responsible to commit the transformation to the cluster metadata. Implementers _have to ensure_ + * local visibility and enactment of the metadata! + */ + Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown); + // TODO: add a debounce to requestReplay. Right now, because of ResponseVerbHandler, it is possible to send + // a barage of these requests. - @Override - public ClusterMetadata replayAndWait() - { - return ClusterMetadata.current(); - } + /** + * Replays to the highest known epoch. + * <p> + * Upon replay, all items _at least_ up to returned epoch will be visible. + */ + ClusterMetadata replayAndWait(); } diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index 4761e8bb49..ca6c56654f 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -52,7 +52,7 @@ import org.apache.cassandra.utils.concurrent.Promise; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.tcm.ClusterMetadataService.State.REMOTE; -public final class RemoteProcessor implements ClusterMetadataService.Processor +public final class RemoteProcessor implements Processor { private static final Logger logger = LoggerFactory.getLogger(RemoteProcessor.class); private final Supplier<Collection<InetAddressAndPort>> discoveryNodes; diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 488df2e00f..f5b93d2619 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -62,7 +62,7 @@ package org.apache.cassandra.tcm; } public static void initialize(Set<InetAddressAndPort> seeds, - Function<ClusterMetadataService.Processor, ClusterMetadataService.Processor> wrapProcessor, + Function<Processor, Processor> wrapProcessor, Runnable initMessaging) throws InterruptedException, ExecutionException { switch (StartupMode.get(seeds)) @@ -108,7 +108,7 @@ package org.apache.cassandra.tcm; ClusterMetadataService.instance().commit(initialize); } - public static void initializeAsNonCmsNode(Function<ClusterMetadataService.Processor, ClusterMetadataService.Processor> wrapProcessor) + public static void initializeAsNonCmsNode(Function<Processor, Processor> wrapProcessor) { ClusterMetadata initial = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); initial.schema.initializeKeyspaceInstances(DistributedSchema.empty()); @@ -163,7 +163,7 @@ package org.apache.cassandra.tcm; /** * This should only be called during startup. */ - public static void initializeFromGossip(Function<ClusterMetadataService.Processor, ClusterMetadataService.Processor> wrapProcessor, Runnable initMessaging) + public static void initializeFromGossip(Function<Processor, Processor> wrapProcessor, Runnable initMessaging) { ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(); emptyFromSystemTables.schema.initializeKeyspaceInstances(DistributedSchema.empty()); diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java b/src/java/org/apache/cassandra/tcm/Transformation.java index 953d3d174c..389b90f0a7 100644 --- a/src/java/org/apache/cassandra/tcm/Transformation.java +++ b/src/java/org/apache/cassandra/tcm/Transformation.java @@ -46,11 +46,11 @@ import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.tcm.transformations.SealPeriod; import org.apache.cassandra.tcm.transformations.Startup; import org.apache.cassandra.tcm.transformations.UnsafeJoin; -import org.apache.cassandra.tcm.transformations.cms.FinishAddMember; +import org.apache.cassandra.tcm.transformations.cms.FinishAddToCMS; import org.apache.cassandra.tcm.transformations.cms.Initialize; import org.apache.cassandra.tcm.transformations.cms.PreInitialize; -import org.apache.cassandra.tcm.transformations.cms.RemoveMember; -import org.apache.cassandra.tcm.transformations.cms.StartAddMember; +import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS; +import org.apache.cassandra.tcm.transformations.cms.StartAddToCMS; public interface Transformation { @@ -178,9 +178,9 @@ public interface Transformation CANCEL_SEQUENCE(() -> CancelInProgressSequence.serializer), - START_ADD_TO_CMS(() -> StartAddMember.serializer), - FINISH_ADD_TO_CMS(() -> FinishAddMember.serializer), - REMOVE_FROM_CMS(() -> RemoveMember.serializer), + START_ADD_TO_CMS(() -> StartAddToCMS.serializer), + FINISH_ADD_TO_CMS(() -> FinishAddToCMS.serializer), + REMOVE_FROM_CMS(() -> RemoveFromCMS.serializer), STARTUP(() -> Startup.serializer), diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java index 71bb0424d8..d0c340608b 100644 --- a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java +++ b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java @@ -18,14 +18,14 @@ package org.apache.cassandra.tcm.migration; -import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Commit; +import org.apache.cassandra.tcm.Processor; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.ClusterMetadata; -public class GossipProcessor implements ClusterMetadataService.Processor +public class GossipProcessor implements Processor { @Override public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown) diff --git a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java index 8026689e5f..121b5122d2 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java +++ b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java @@ -24,11 +24,13 @@ import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; @@ -47,14 +49,17 @@ import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; -import org.apache.cassandra.tcm.transformations.cms.FinishAddMember; -import org.apache.cassandra.tcm.transformations.cms.StartAddMember; +import org.apache.cassandra.tcm.transformations.cms.FinishAddToCMS; +import org.apache.cassandra.tcm.transformations.cms.StartAddToCMS; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.db.TypeSizes.sizeof; import static org.apache.cassandra.tcm.sequences.InProgressSequences.Kind.JOIN_OWNERSHIP_GROUP; import static org.apache.cassandra.tcm.transformations.cms.EntireRange.*; +/** + * Add this or another node as a member of CMS. + */ public class AddToCMS implements InProgressSequence<AddToCMS> { private static final Logger logger = LoggerFactory.getLogger(AddToCMS.class); @@ -62,15 +67,19 @@ public class AddToCMS implements InProgressSequence<AddToCMS> private final ProgressBarrier barrier; private final List<InetAddressAndPort> streamCandidates; - private final FinishAddMember finishJoin; + private final FinishAddToCMS finishJoin; public static void initiate() { - NodeId self = ClusterMetadata.current().myNodeId(); + initiate(ClusterMetadata.current().myNodeId(), FBUtilities.getBroadcastAddressAndPort()); + } + + public static void initiate(NodeId nodeId, InetAddressAndPort addr) + { InProgressSequence<?> continuation = ClusterMetadataService.instance() - .commit(new StartAddMember(FBUtilities.getBroadcastAddressAndPort()), - (metadata) -> !metadata.inProgressSequences.contains(self), - (metadata) -> metadata.inProgressSequences.get(self), + .commit(new StartAddToCMS(addr), + (metadata) -> !metadata.inProgressSequences.contains(nodeId), + (metadata) -> metadata.inProgressSequences.get(nodeId), (metadata, reason) -> { throw new IllegalStateException("Can't join ownership group: " + reason); }); @@ -81,7 +90,7 @@ public class AddToCMS implements InProgressSequence<AddToCMS> continuation.executeNext(); } - public AddToCMS(ProgressBarrier barrier, List<InetAddressAndPort> streamCandidates, FinishAddMember join) + public AddToCMS(ProgressBarrier barrier, List<InetAddressAndPort> streamCandidates, FinishAddToCMS join) { this.barrier = barrier; this.streamCandidates = streamCandidates; @@ -101,13 +110,26 @@ public class AddToCMS implements InProgressSequence<AddToCMS> private void streamRanges() throws ExecutionException, InterruptedException { - // TODO: iterate over stream candidates StreamPlan streamPlan = new StreamPlan(StreamOperation.BOOTSTRAP, 1, true, null, PreviewKind.NONE); - streamPlan.requestRanges(streamCandidates.get(0), - SchemaConstants.METADATA_KEYSPACE_NAME, - new RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).add(finishJoin.replicaForStreaming()).build(), - new RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).build(), - DistributedMetadataLogKeyspace.TABLE_NAME); + // Current node is the streaming target. We can pick any other live CMS node as a streaming source + if (finishJoin.getEndpoint().equals(FBUtilities.getBroadcastAddressAndPort())) + { + Optional<InetAddressAndPort> streamingSource = streamCandidates.stream().filter(FailureDetector.instance::isAlive).findFirst(); + if (!streamingSource.isPresent()) + throw new IllegalStateException(String.format("Can not start range streaming as all candidates (%s) are down", streamCandidates)); + streamPlan.requestRanges(streamingSource.get(), + SchemaConstants.METADATA_KEYSPACE_NAME, + new RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).add(finishJoin.replicaForStreaming()).build(), + new RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).build(), + DistributedMetadataLogKeyspace.TABLE_NAME); + } + else + { + streamPlan.transferRanges(finishJoin.getEndpoint(), + SchemaConstants.METADATA_KEYSPACE_NAME, + new RangesAtEndpoint.Builder(finishJoin.replicaForStreaming().endpoint()).add(finishJoin.replicaForStreaming()).build(), + DistributedMetadataLogKeyspace.TABLE_NAME); + } streamPlan.execute().get(); } @@ -164,10 +186,10 @@ public class AddToCMS implements InProgressSequence<AddToCMS> { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - AddToCMS addToCMS = (AddToCMS) o; - return Objects.equals(barrier, addToCMS.barrier) && - Objects.equals(streamCandidates, addToCMS.streamCandidates) && - Objects.equals(finishJoin, addToCMS.finishJoin); + AddToCMS addMember = (AddToCMS) o; + return Objects.equals(barrier, addMember.barrier) && + Objects.equals(streamCandidates, addMember.streamCandidates) && + Objects.equals(finishJoin, addMember.finishJoin); } @Override @@ -183,7 +205,7 @@ public class AddToCMS implements InProgressSequence<AddToCMS> { AddToCMS seq = (AddToCMS) t; ProgressBarrier.serializer.serialize(t.barrier(), out, version); - FinishAddMember.serializer.serialize(seq.finishJoin, out, version); + FinishAddToCMS.serializer.serialize(seq.finishJoin, out, version); out.writeInt(seq.streamCandidates.size()); for (InetAddressAndPort ep : seq.streamCandidates) InetAddressAndPort.MetadataSerializer.serializer.serialize(ep, out, version); @@ -193,7 +215,7 @@ public class AddToCMS implements InProgressSequence<AddToCMS> public AddToCMS deserialize(DataInputPlus in, Version version) throws IOException { ProgressBarrier barrier = ProgressBarrier.serializer.deserialize(in, version); - FinishAddMember finish = FinishAddMember.serializer.deserialize(in, version); + FinishAddToCMS finish = FinishAddToCMS.serializer.deserialize(in, version); int streamCandidatesSize = in.readInt(); List<InetAddressAndPort> streamCandidates = new ArrayList<>(); @@ -207,7 +229,7 @@ public class AddToCMS implements InProgressSequence<AddToCMS> { AddToCMS seq = (AddToCMS) t; long size = ProgressBarrier.serializer.serializedSize(t.barrier(), version); - size += FinishAddMember.serializer.serializedSize(seq.finishJoin, version); + size += FinishAddToCMS.serializer.serializedSize(seq.finishJoin, version); size += sizeof(seq.streamCandidates.size()); for (InetAddressAndPort ep : seq.streamCandidates) size += InetAddressAndPort.MetadataSerializer.serializer.serializedSize(ep, version); diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java b/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java index f8cebf22a1..9708e20656 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java @@ -39,6 +39,12 @@ public abstract class BaseMembershipTransformation implements Transformation this.replica = EntireRange.replica(endpoint); } + // TODO: to node id + public InetAddressAndPort getEndpoint() + { + return endpoint; + } + public static abstract class SerializerBase<T extends BaseMembershipTransformation> implements AsymmetricMetadataSerializer<Transformation, T> { public void serialize(Transformation t, DataOutputPlus out, Version version) throws IOException diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java b/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddToCMS.java similarity index 86% rename from src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java rename to src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddToCMS.java index 2034105e09..de203d044d 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddToCMS.java @@ -32,17 +32,17 @@ import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import static org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges; -public class FinishAddMember extends BaseMembershipTransformation +public class FinishAddToCMS extends BaseMembershipTransformation { - public static final AsymmetricMetadataSerializer<Transformation, FinishAddMember> serializer = new SerializerBase<FinishAddMember>() + public static final AsymmetricMetadataSerializer<Transformation, FinishAddToCMS> serializer = new SerializerBase<FinishAddToCMS>() { - public FinishAddMember createTransformation(InetAddressAndPort addr) + public FinishAddToCMS createTransformation(InetAddressAndPort addr) { - return new FinishAddMember(addr); + return new FinishAddToCMS(addr); } }; - public FinishAddMember(InetAddressAndPort addr) + public FinishAddToCMS(InetAddressAndPort addr) { super(addr); } @@ -77,4 +77,12 @@ public class FinishAddMember extends BaseMembershipTransformation .with(prev.inProgressSequences.without(targetNode)); return success(transformer, affectedRanges); } + + public String toString() + { + return "FinishAddMember{" + + "endpoint=" + endpoint + + ", replica=" + replica + + '}'; + } } diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java b/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveFromCMS.java similarity index 86% rename from src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java rename to src/java/org/apache/cassandra/tcm/transformations/cms/RemoveFromCMS.java index efc29bd53c..10f459f107 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveFromCMS.java @@ -27,17 +27,17 @@ import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import static org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges; -public class RemoveMember extends BaseMembershipTransformation +public class RemoveFromCMS extends BaseMembershipTransformation { - public static final AsymmetricMetadataSerializer<Transformation, RemoveMember> serializer = new SerializerBase<RemoveMember>() + public static final AsymmetricMetadataSerializer<Transformation, RemoveFromCMS> serializer = new SerializerBase<RemoveFromCMS>() { - public RemoveMember createTransformation(InetAddressAndPort addr) + public RemoveFromCMS createTransformation(InetAddressAndPort addr) { - return new RemoveMember(addr); + return new RemoveFromCMS(addr); } }; - public RemoveMember(InetAddressAndPort addr) + public RemoveFromCMS(InetAddressAndPort addr) { super(addr); } diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java b/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddToCMS.java similarity index 86% rename from src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java rename to src/java/org/apache/cassandra/tcm/transformations/cms/StartAddToCMS.java index 4dcf2e52a5..0726e65c96 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddToCMS.java @@ -34,17 +34,17 @@ import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import static org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges; -public class StartAddMember extends BaseMembershipTransformation +public class StartAddToCMS extends BaseMembershipTransformation { - public static final AsymmetricMetadataSerializer<Transformation, StartAddMember> serializer = new SerializerBase<StartAddMember>() + public static final AsymmetricMetadataSerializer<Transformation, StartAddToCMS> serializer = new SerializerBase<StartAddToCMS>() { - public StartAddMember createTransformation(InetAddressAndPort addr) + public StartAddToCMS createTransformation(InetAddressAndPort addr) { - return new StartAddMember(addr); + return new StartAddToCMS(addr); } }; - public StartAddMember(InetAddressAndPort addr) + public StartAddToCMS(InetAddressAndPort addr) { super(addr); } @@ -76,9 +76,17 @@ public class StartAddMember extends BaseMembershipTransformation } ProgressBarrier barrier = new ProgressBarrier(prev.nextEpoch(), affectedRanges.toPeers(prev.placements, prev.directory), false); - AddToCMS joinSequence = new AddToCMS(barrier, streamCandidates, new FinishAddMember(endpoint)); + AddToCMS joinSequence = new AddToCMS(barrier, streamCandidates, new FinishAddToCMS(endpoint)); return success(transformer.with(prev.inProgressSequences.with(prev.directory.peerId(replica.endpoint()), joinSequence)), affectedRanges); } + + public String toString() + { + return "StartAddMember{" + + "endpoint=" + endpoint + + ", replica=" + replica + + '}'; + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java index 877b6a0409..912d0a7aa5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java @@ -37,6 +37,7 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Commit; import org.apache.cassandra.tcm.MetadataSnapshots; +import org.apache.cassandra.tcm.Processor; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogStorage; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; @@ -82,7 +83,7 @@ public class CMSTestBase public final SchemaProvider schemaProvider; public final ReplicationFactor replication; - public CMSSut(IIsolatedExecutor.SerializableFunction<LocalLog, ClusterMetadataService.Processor> processorFactory, boolean addListeners, int rf) + public CMSSut(IIsolatedExecutor.SerializableFunction<LocalLog, Processor> processorFactory, boolean addListeners, int rf) { partitioner = Murmur3Partitioner.instance; replication = ReplicationFactor.fullOnly(rf); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java index ddf38ff419..2b6f647679 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java @@ -68,13 +68,7 @@ import org.apache.cassandra.gms.GossipDigestAck; import org.apache.cassandra.gms.GossipDigestSyn; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.SimpleSeedProvider; -import org.apache.cassandra.tcm.AtomicLongBackedProcessor; -import org.apache.cassandra.tcm.Commit; -import org.apache.cassandra.tcm.Discovery; -import org.apache.cassandra.tcm.Epoch; -import org.apache.cassandra.tcm.Transformation; -import org.apache.cassandra.tcm.MetadataSnapshots; -import org.apache.cassandra.tcm.Replay; +import org.apache.cassandra.tcm.*; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogState; @@ -86,12 +80,10 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.NoPayload; import org.apache.cassandra.net.Verb; -import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; -import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.utils.AssertUtil; import org.apache.cassandra.utils.Closeable; import org.apache.cassandra.utils.FBUtilities; @@ -709,7 +701,6 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase // We would like all messages directed to the node under test to be delivered it. this.nodes.put(nodeUnderTestAddr, new RealSimulatedNode(this, 1, nodeUnderTestAddr.toString(), tokenSupplier.token(1)) { -// @Test // todo - having this here makes the build fail public boolean test(Message<?> message) { realCluster.get(1).receiveMessage(Instance.serializeMessage(message.from(), nodeUnderTestAddr, message)); @@ -723,7 +714,6 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase assert executor == null; LogStorage logStorage = new AtomicLongBackedProcessor.InMemoryStorage(); LocalLog log = LocalLog.sync(new ClusterMetadata(partitioner), logStorage, false); - AtomicLongBackedProcessor processor = new AtomicLongBackedProcessor(log); // Replicator only replicates to the node under test, as there are no other nodes in reality Commit.Replicator replicator = (result, source) -> { @@ -732,6 +722,8 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase Message.out(Verb.TCM_REPLICATION, result.success().replication))); }; + AtomicLongBackedProcessor processor = new AtomicLongBackedProcessor(log); + ClusterMetadataService service = new ClusterMetadataService(new UniformRangePlacement(), new AtomicLongBackedProcessor.InMemoryMetadataSnapshots(), log, @@ -804,7 +796,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase new ClusterMetadataService(new UniformRangePlacement(), MetadataSnapshots.NO_OP, log, - new ClusterMetadataService.Processor() + new Processor() { public Commit.Result commit(Entry.Id entryId, Transformation event, Epoch lastKnown) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java index dd4f1ec74c..ba0d5e4387 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java @@ -26,25 +26,25 @@ import java.util.function.BiFunction; import java.util.function.Predicate; import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Commit; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.Processor; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.utils.concurrent.WaitQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TestProcessor implements ClusterMetadataService.Processor +public class TestProcessor implements Processor { private static final Logger logger = LoggerFactory.getLogger(TestProcessor.class); private final AtomicBoolean isPaused = new AtomicBoolean(); private final WaitQueue waiters; private final List<Predicate<Transformation>> waitPredicates; private final List<BiFunction<Transformation, Commit.Result, Boolean>> commitPredicates; - private final ClusterMetadataService.Processor delegate; + private final Processor delegate; - public TestProcessor(ClusterMetadataService.Processor delegate) + public TestProcessor(Processor delegate) { this.waiters = WaitQueue.newWaitQueue(); this.waitPredicates = new ArrayList<>(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/CMSMembershipTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/CMSMembershipTest.java new file mode 100644 index 0000000000..c3c6698544 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/CMSMembershipTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.ring; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.distributed.test.log.FuzzTestBase; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.sequences.AddToCMS; +import org.apache.cassandra.tcm.transformations.CustomTransformation; +import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS; +import org.apache.cassandra.utils.FBUtilities; + +public class CMSMembershipTest extends FuzzTestBase +{ + @Test + public void joinCmsTest() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + for (int idx : new int[]{ 2, 3 }) + { + cluster.get(idx).runOnInstance(() -> { + AddToCMS.initiate(); + ClusterMetadataService.instance().commit(CustomTransformation.make(idx)); + + }); + } + } + } + + @Test + public void expandCmsTest() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + cluster.get(1).runOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + for (NodeId nodeId : metadata.directory.peerIds()) + { + if (nodeId.equals(metadata.myNodeId())) + continue; + AddToCMS.initiate(nodeId, metadata.directory.getNodeAddresses(nodeId).broadcastAddress); + } + }); + + for (int idx : new int[]{ 1, 2, 3 }) + { + cluster.get(idx).runOnInstance(() -> { + ClusterMetadataService.instance().commit(CustomTransformation.make(idx)); + }); + } + + for (int idx : new int[]{ 1, 2, 3 }) + { + cluster.get(idx).runOnInstance(() -> { + ClusterMetadataService.instance().replayAndWait(); + ClusterMetadata metadata = ClusterMetadata.current(); + Assert.assertTrue(metadata.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort())); + }); + } + } + } + + @Test + public void shrinkCmsTest() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + cluster.get(1).runOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + for (NodeId nodeId : metadata.directory.peerIds()) + { + if (nodeId.equals(metadata.myNodeId())) + continue; + AddToCMS.initiate(nodeId, metadata.directory.getNodeAddresses(nodeId).broadcastAddress); + } + }); + + IMessageFilters.Filter filter = cluster.filters().allVerbs().to(3).drop(); + + cluster.get(1).runOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + for (InetAddressAndPort addr : metadata.directory.allAddresses()) + { + if (addr.toString().contains("127.0.0.3")) + { + ClusterMetadataService.instance().commit(new RemoveFromCMS(addr)); + return; + } + + } + }); + + filter.off(); + + for (int idx : new int[]{ 1, 2, 3 }) + { + cluster.get(idx).runOnInstance(() -> { + ClusterMetadataService.instance().commit(CustomTransformation.make(idx)); + }); + } + + for (int idx : new int[]{ 1, 2, 3 }) + { + cluster.get(idx).runOnInstance(() -> { + ClusterMetadataService.instance().replayAndWait(); + ClusterMetadata metadata = ClusterMetadata.current(); + Assert.assertEquals(idx != 3, metadata.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort())); + }); + } + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java index b248115195..8f11c02553 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java @@ -43,9 +43,7 @@ import org.apache.cassandra.distributed.test.log.FuzzTestBase; import org.apache.cassandra.distributed.test.log.TestProcessor; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; -import org.apache.cassandra.tcm.sequences.AddToCMS; import org.apache.cassandra.tcm.transformations.PrepareJoin; -import org.apache.cassandra.tcm.transformations.CustomTransformation; public class ConsistentBootstrapTest extends FuzzTestBase { @@ -129,29 +127,4 @@ public class ConsistentBootstrapTest extends FuzzTestBase model.validateAll(); } } - - @Test - public void joinCmsTest() throws Throwable - { - try (Cluster cluster = builder().withNodes(3) - .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4)) - .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0")) - .start()) - { - for (int idx : new int[]{ 2, 3 }) - { - cluster.get(idx).runOnInstance(() -> { - AddToCMS.initiate(); - try - { - ClusterMetadataService.instance().commit(CustomTransformation.make(idx)); - } - catch (Throwable e) - { - e.printStackTrace(); - } - }); - } - } - } } diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java b/test/unit/org/apache/cassandra/ServerTestUtils.java index 489766174a..5e260c648e 100644 --- a/test/unit/org/apache/cassandra/ServerTestUtils.java +++ b/test/unit/org/apache/cassandra/ServerTestUtils.java @@ -52,6 +52,7 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Commit; import org.apache.cassandra.tcm.MetadataSnapshots; +import org.apache.cassandra.tcm.Processor; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogStorage; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; @@ -228,7 +229,7 @@ public final class ServerTestUtils public static void initCMS() { - Function<LocalLog, ClusterMetadataService.Processor> processorFactory = AtomicLongBackedProcessor::new; + Function<LocalLog, Processor> processorFactory = AtomicLongBackedProcessor::new; IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); boolean addListeners = true; ClusterMetadata initial = new ClusterMetadata(partitioner); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
