This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 086fbe30cd14866fe64ad8f3f3f6cbfa31de8718
Author: Alex Petrov <[email protected]>
AuthorDate: Mon Mar 6 18:46:40 2023 +0100

    [CEP-21] Ensure that global log replication factor is maintained after 
decommission
    
    patch by Alex Petrov; reviewed by Marcus Eriksson and Sam Tunnicliffe
    for CASSANDRA-18416
---
 .../apache/cassandra/net/ResponseVerbHandler.java  |   1 +
 .../apache/cassandra/service/StorageService.java   |  36 ++++--
 .../cassandra/tcm/AbstractLocalProcessor.java      |  10 +-
 .../cassandra/tcm/ClusterMetadataService.java      |  61 ++++-----
 src/java/org/apache/cassandra/tcm/Commit.java      |  18 ++-
 .../apache/cassandra/tcm/PaxosBackedProcessor.java |  12 +-
 .../GossipProcessor.java => Processor.java}        |  32 +++--
 .../org/apache/cassandra/tcm/RemoteProcessor.java  |   2 +-
 src/java/org/apache/cassandra/tcm/Startup.java     |   6 +-
 .../org/apache/cassandra/tcm/Transformation.java   |  12 +-
 .../cassandra/tcm/migration/GossipProcessor.java   |   4 +-
 .../apache/cassandra/tcm/sequences/AddToCMS.java   |  64 ++++++----
 .../cms/BaseMembershipTransformation.java          |   6 +
 .../{FinishAddMember.java => FinishAddToCMS.java}  |  18 ++-
 .../cms/{RemoveMember.java => RemoveFromCMS.java}  |  10 +-
 .../{StartAddMember.java => StartAddToCMS.java}    |  20 ++-
 .../distributed/test/log/CMSTestBase.java          |   3 +-
 .../test/log/CoordinatorPathTestBase.java          |  16 +--
 .../distributed/test/log/TestProcessor.java        |   8 +-
 .../distributed/test/ring/CMSMembershipTest.java   | 136 +++++++++++++++++++++
 .../test/ring/ConsistentBootstrapTest.java         |  27 ----
 .../unit/org/apache/cassandra/ServerTestUtils.java |   3 +-
 22 files changed, 327 insertions(+), 178 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 3b4a1c69d4..2dff292627 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -48,6 +48,7 @@ class ResponseVerbHandler implements IVerbHandler
             // that executes something on the gossip stage as well.
             !Stage.GOSSIP.executor().inExecutor())
         {
+            logger.debug("Learned about next epoch {} from {} in {}", 
message.epoch(), message.from(), message.verb());
             ClusterMetadataService.instance().maybeCatchup(message.epoch());
         }
 
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 88fa90019b..791a8b564b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -68,13 +68,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.Uninterruptibles;
 
@@ -189,6 +183,7 @@ import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.InProgressSequence;
 import org.apache.cassandra.tcm.Transformation;
@@ -196,8 +191,10 @@ import org.apache.cassandra.tcm.membership.NodeAddresses;
 import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.migration.GossipCMSListener;
 import org.apache.cassandra.tcm.ownership.TokenMap;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
 import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
 import org.apache.cassandra.tcm.sequences.InProgressSequences;
+import org.apache.cassandra.tcm.sequences.ProgressBarrier;
 import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
 import org.apache.cassandra.tcm.transformations.PrepareJoin;
 import org.apache.cassandra.tcm.transformations.PrepareLeave;
@@ -206,6 +203,7 @@ import 
org.apache.cassandra.tcm.transformations.PrepareReplace;
 import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.tcm.transformations.UnsafeJoin;
+import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS;
 import org.apache.cassandra.transport.ClientResourceLimits;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.Clock;
@@ -249,7 +247,6 @@ import static 
org.apache.cassandra.tcm.membership.NodeState.BOOTSTRAPPING;
 import static org.apache.cassandra.tcm.membership.NodeState.JOINED;
 import static org.apache.cassandra.tcm.membership.NodeState.LEAVING;
 import static org.apache.cassandra.tcm.membership.NodeState.MOVING;
-import static org.apache.cassandra.tcm.membership.NodeState.REGISTERED;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 import static org.apache.cassandra.utils.FBUtilities.now;
@@ -3483,6 +3480,29 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             throw new IllegalStateException("This cluster is migrating to 
cluster metadata, can't decommission until that is done.");
         ClusterMetadata metadata = ClusterMetadata.current();
         NodeId self = metadata.myNodeId();
+
+        Set<InetAddressAndPort> cmsMembers = 
metadata.placements.get(ReplicationParams.meta()).reads.byEndpoint().keySet();
+        if (cmsMembers.contains(FBUtilities.getBroadcastAddressAndPort()))
+        {
+            if (metadata.directory.peerIds().size() == cmsMembers.size())
+                throw new IllegalStateException("Can not decomission the node 
as it will decrease the replication factor of CMS keyspace");
+
+            NodeId replacement = null;
+            for (Entry<NodeId, NodeAddresses> e : 
metadata.directory.addresses.entrySet())
+            {
+                if (!cmsMembers.contains(e.getValue().broadcastAddress))
+                {
+                    logger.info("Nominating an alternative CMS node ({}) 
before decommission.", e.getValue().broadcastAddress);
+                    AddToCMS.initiate(e.getKey(), 
e.getValue().broadcastAddress);
+                    replacement = e.getKey();
+                    break;
+                }
+            }
+
+            Epoch epoch = ClusterMetadataService.instance().commit(new 
RemoveFromCMS(FBUtilities.getBroadcastAddressAndPort())).epoch;
+            new ProgressBarrier(epoch, ImmutableSet.of(replacement), 
false).await();
+        }
+
         logger.info("starting decom with {} {}", metadata.epoch, self);
         if 
(InProgressSequences.isLeave(metadata.inProgressSequences.get(self)))
         {
diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java 
b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
index 679adcbe5f..07950ce6e9 100644
--- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
@@ -23,11 +23,12 @@ import java.util.concurrent.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.tcm.log.Replication;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
+import org.apache.cassandra.tcm.log.Replication;
+import org.apache.cassandra.utils.FBUtilities;
 
-public abstract class AbstractLocalProcessor implements 
ClusterMetadataService.Processor
+public abstract class AbstractLocalProcessor implements Processor
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PaxosBackedProcessor.class);
 
@@ -57,10 +58,11 @@ public abstract class AbstractLocalProcessor implements 
ClusterMetadataService.P
         {
             Replication replication;
             if (lastKnown == null || 
lastKnown.isDirectlyBefore(result.success().metadata.epoch))
+            {
                 replication = Replication.of(new Entry(entryId, 
result.success().metadata.epoch, transform));
+            }
             else
             {
-                // TODO: catch up at most to this epoch
                 replication = log.getCommittedEntries(lastKnown);
             }
 
@@ -84,6 +86,8 @@ public abstract class AbstractLocalProcessor implements 
ClusterMetadataService.P
         while (true)
         {
             ClusterMetadata previous = log.waitForHighestConsecutive();
+            if 
(!previous.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort()))
+                throw new IllegalStateException("Node is not a member of CMS 
anymore");
             Transformation.Result result = transform.execute(previous);
             // if we're rejected, just try to catch up to the latest 
distributed state
             if (result.isRejected())
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index be827cc9d6..82eecd461f 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -50,6 +50,7 @@ import 
org.apache.cassandra.tcm.ownership.UniformRangePlacement;
 import org.apache.cassandra.tcm.sequences.AddToCMS;
 import org.apache.cassandra.tcm.transformations.SealPeriod;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import static java.util.stream.Collectors.toSet;
 import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL;
@@ -86,28 +87,9 @@ public class ClusterMetadataService
         return instance;
     }
 
-    public interface Processor
-    {
-        // TODO: remove entry id from the interface?
-        /**
-         * Method is _only_ responsible to commit the transformation to the 
cluster metadata. Implementers _have to ensure_
-         * local visibility and enactment of the metadata!
-         */
-        Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch 
lastKnown);
-        // TODO: add a debounce to requestReplay. Right now, because of 
ResponseVerbHandler, it is possible to send
-        // a barage of these requests.
-        /**
-         * Replays to the highest known epoch.
-         *
-         * Upon replay, all items _at least_ up to returned epoch will be 
visible.
-         */
-        ClusterMetadata replayAndWait();
-    }
-
     private final PlacementProvider placementProvider;
     private final Processor processor;
     private final LocalLog log;
-    private final Commit.Replicator replicator;
     private final MetadataSnapshots snapshots;
 
     private final Replication.ReplicationHandler replicationHandler;
@@ -146,15 +128,15 @@ public class ClusterMetadataService
         Processor localProcessor = wrapProcessor.apply(new 
PaxosBackedProcessor(log));
         RemoteProcessor remoteProcessor = new RemoteProcessor(log, 
Discovery.instance::discoveredNodes);
         GossipProcessor gossipProcessor = new GossipProcessor();
-        replicator = new Commit.DefaultReplicator(() -> 
log.metadata().directory);
+        Commit.Replicator replicator = new Commit.DefaultReplicator(() -> 
log.metadata().directory);
         currentEpochHandler = new CurrentEpochRequestHandler();
         replayRequestHandler = new SwitchableHandler<>(new Replay.Handler(), 
cmsStateSupplier);
         commitRequestHandler = new SwitchableHandler<>(new 
Commit.Handler(localProcessor, replicator), cmsStateSupplier);
         processor = new SwitchableProcessor(localProcessor,
                                             remoteProcessor,
                                             gossipProcessor,
+                                            replicator,
                                             cmsStateSupplier);
-
         replicationHandler = new Replication.ReplicationHandler(log);
         logNotifyHandler = new Replication.LogNotifyHandler(log);
     }
@@ -169,8 +151,7 @@ public class ClusterMetadataService
     {
         this.placementProvider = placementProvider;
         this.log = log;
-        this.processor = processor;
-        this.replicator = replicator;
+        this.processor = new SwitchableProcessor(processor, null, null, 
replicator, () -> State.LOCAL);
         this.snapshots = snapshots;
 
         replicationHandler = new Replication.ReplicationHandler(log);
@@ -185,7 +166,6 @@ public class ClusterMetadataService
                                    MetadataSnapshots snapshots,
                                    LocalLog log,
                                    Processor processor,
-                                   Commit.Replicator replicator,
                                    Replication.ReplicationHandler 
replicationHandler,
                                    Replication.LogNotifyHandler 
logNotifyHandler,
                                    CurrentEpochRequestHandler 
currentEpochHandler,
@@ -196,7 +176,6 @@ public class ClusterMetadataService
         this.snapshots = snapshots;
         this.log = log;
         this.processor = processor;
-        this.replicator = replicator;
         this.replicationHandler = replicationHandler;
         this.logNotifyHandler = logNotifyHandler;
         this.currentEpochHandler = currentEpochHandler;
@@ -217,7 +196,6 @@ public class ClusterMetadataService
                                                                 
MetadataSnapshots.NO_OP,
                                                                 log,
                                                                 new 
AtomicLongBackedProcessor(log),
-                                                                
Commit.Replicator.NO_OP,
                                                                 new 
Replication.ReplicationHandler(log),
                                                                 new 
Replication.LogNotifyHandler(log),
                                                                 new 
CurrentEpochRequestHandler(),
@@ -343,11 +321,6 @@ public class ClusterMetadataService
 
             if (result.isSuccess())
             {
-                // TODO: we could actually move replicator to processor, if we 
have a source node attached to the transformation.
-                // In other words, we simply know who was a submitter, so we 
wouldn't replicate to the submitter, since the
-                // submitter is going to learn about the result of execution 
by other means, namely the response.
-                if (state() == LOCAL) replicator.send(result, null);
-
                 while (!backoff.reachedMax())
                 {
                     try
@@ -542,34 +515,50 @@ public class ClusterMetadataService
         private final RemoteProcessor remote;
         private final GossipProcessor gossip;
         private final Supplier<State> cmsStateSupplier;
+        private final Commit.Replicator replicator;
 
-        SwitchableProcessor(Processor local, RemoteProcessor remote, 
GossipProcessor gossip, Supplier<State> cmsStateSupplier)
+        SwitchableProcessor(Processor local,
+                            RemoteProcessor remote,
+                            GossipProcessor gossip,
+                            Commit.Replicator replicator,
+                            Supplier<State> cmsStateSupplier)
         {
             this.local = local;
             this.remote = remote;
             this.gossip = gossip;
+            this.replicator = replicator;
             this.cmsStateSupplier = cmsStateSupplier;
         }
 
         @VisibleForTesting
         public Processor delegate()
+        {
+            return delegateInternal().right;
+        }
+
+        private Pair<State, Processor> delegateInternal()
         {
             State state = cmsStateSupplier.get();
             switch (state)
             {
                 case LOCAL:
-                    return local;
+                    return Pair.create(state, local);
                 case REMOTE:
-                    return remote;
+                    return Pair.create(state, remote);
                 case GOSSIP:
-                    return gossip;
+                    return Pair.create(state, gossip);
             }
             throw new IllegalStateException("Bad CMS state: " + state);
         }
 
+        @Override
         public Commit.Result commit(Entry.Id entryId, Transformation 
transform, Epoch lastKnown)
         {
-            return delegate().commit(entryId, transform, lastKnown);
+            Pair<State, Processor> delegate = delegateInternal();
+            Commit.Result result = delegate.right.commit(entryId, transform, 
lastKnown);
+            if (delegate.left == State.LOCAL)
+                replicator.send(result, null);
+            return result;
         }
 
         public ClusterMetadata replayAndWait()
diff --git a/src/java/org/apache/cassandra/tcm/Commit.java 
b/src/java/org/apache/cassandra/tcm/Commit.java
index e4548085ee..a68e2368b6 100644
--- a/src/java/org/apache/cassandra/tcm/Commit.java
+++ b/src/java/org/apache/cassandra/tcm/Commit.java
@@ -40,8 +40,6 @@ import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
-import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL;
-
 public class Commit
 {
     private static final Logger logger = LoggerFactory.getLogger(Commit.class);
@@ -239,26 +237,26 @@ public class Commit
     }
 
     @VisibleForTesting
-    public static IVerbHandler<Commit> 
handlerForTests(ClusterMetadataService.Processor processor, Replicator 
replicator, BiConsumer<Message<?>, InetAddressAndPort> messagingService)
+    public static IVerbHandler<Commit> handlerForTests(Processor processor, 
Replicator replicator, BiConsumer<Message<?>, InetAddressAndPort> 
messagingService)
     {
         return new Handler(processor, replicator, messagingService);
     }
 
     static class Handler implements IVerbHandler<Commit>
     {
-        private final ClusterMetadataService.Processor processor;
-        private final Replicator replicate;
+        private final Processor processor;
+        private final Replicator replicator;
         private final BiConsumer<Message<?>, InetAddressAndPort> 
messagingService;
 
-        Handler(ClusterMetadataService.Processor processor, Replicator 
replicator)
+        Handler(Processor processor, Replicator replicator)
         {
             this(processor, replicator, MessagingService.instance()::send);
         }
 
-        Handler(ClusterMetadataService.Processor processor, Replicator 
replicator, BiConsumer<Message<?>, InetAddressAndPort> messagingService)
+        Handler(Processor processor, Replicator replicator, 
BiConsumer<Message<?>, InetAddressAndPort> messagingService)
         {
             this.processor = processor;
-            this.replicate = replicator;
+            this.replicator = replicator;
             this.messagingService = messagingService;
         }
 
@@ -269,7 +267,7 @@ public class Commit
             if (result.isSuccess())
             {
                 Result.Success success = result.success();
-                replicate.send(success, message.from());
+                replicator.send(success, message.from());
                 logger.info("Responding with full result {} to sender {}", 
result, message.from());
                 // TODO: this response message can get lost; how do we 
re-discover this on the other side?
                 // TODO: what if we have holes after replaying?
@@ -300,7 +298,7 @@ public class Commit
 
         public void send(Result result, InetAddressAndPort source)
         {
-            if (!result.isSuccess() || ClusterMetadataService.state() != LOCAL)
+            if (!result.isSuccess())
                 return;
 
             Result.Success success = result.success();
diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java 
b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
index d4ccb20425..2ab2f921e9 100644
--- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
@@ -29,10 +29,10 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogState;
+import org.apache.cassandra.tcm.log.LogStorage;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 
 import static 
org.apache.cassandra.schema.DistributedMetadataLogKeyspace.tryCommit;
@@ -46,12 +46,6 @@ public class PaxosBackedProcessor extends 
AbstractLocalProcessor
         super(log);
     }
 
-    @Override
-    public Commit.Result commit(Entry.Id entryId, Transformation transform, 
Epoch lastKnown)
-    {
-        return super.commit(entryId, transform, lastKnown);
-    }
-
     @Override
     protected boolean tryCommitOne(Entry.Id entryId, Transformation transform,
                                    Epoch previousEpoch, Epoch nextEpoch,
@@ -77,7 +71,7 @@ public class PaxosBackedProcessor extends 
AbstractLocalProcessor
             // TODO: test applying LogStates from multiple responses
             if (replica.isSelf())
             {
-                
log.append(DistributedMetadataLogKeyspace.getLogState(metadata.epoch));
+                
log.append(LogStorage.SystemKeyspace.getLogState(metadata.epoch));
                 latch.decrement();
             }
             else
@@ -97,6 +91,4 @@ public class PaxosBackedProcessor extends 
AbstractLocalProcessor
         latch.awaitUninterruptibly(10, TimeUnit.SECONDS);
         return log.waitForHighestConsecutive();
     }
-
-
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java 
b/src/java/org/apache/cassandra/tcm/Processor.java
similarity index 53%
copy from src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
copy to src/java/org/apache/cassandra/tcm/Processor.java
index 71bb0424d8..22fa4d14a4 100644
--- a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/Processor.java
@@ -16,26 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.tcm.migration;
+package org.apache.cassandra.tcm;
 
-import org.apache.cassandra.tcm.ClusterMetadataService;
-import org.apache.cassandra.tcm.Commit;
 import org.apache.cassandra.tcm.log.Entry;
-import org.apache.cassandra.tcm.Epoch;
-import org.apache.cassandra.tcm.Transformation;
-import org.apache.cassandra.tcm.ClusterMetadata;
 
-public class GossipProcessor implements ClusterMetadataService.Processor
+public interface Processor
 {
-    @Override
-    public Commit.Result commit(Entry.Id entryId, Transformation transform, 
Epoch lastKnown)
-    {
-        throw new IllegalStateException("Can't commit transformations when 
running in gossip mode. Enable the ClusterMetadataService with `nodetool 
addtocms`.");
-    }
+    /**
+     * Method is _only_ responsible to commit the transformation to the 
cluster metadata. Implementers _have to ensure_
+     * local visibility and enactment of the metadata!
+     */
+    Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch 
lastKnown);
+    // TODO: add a debounce to requestReplay. Right now, because of 
ResponseVerbHandler, it is possible to send
+    // a barage of these requests.
 
-    @Override
-    public ClusterMetadata replayAndWait()
-    {
-        return ClusterMetadata.current();
-    }
+    /**
+     * Replays to the highest known epoch.
+     * <p>
+     * Upon replay, all items _at least_ up to returned epoch will be visible.
+     */
+    ClusterMetadata replayAndWait();
 }
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index 4761e8bb49..ca6c56654f 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -52,7 +52,7 @@ import org.apache.cassandra.utils.concurrent.Promise;
 import static org.apache.cassandra.net.NoPayload.noPayload;
 import static org.apache.cassandra.tcm.ClusterMetadataService.State.REMOTE;
 
-public final class RemoteProcessor implements ClusterMetadataService.Processor
+public final class RemoteProcessor implements Processor
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RemoteProcessor.class);
     private final Supplier<Collection<InetAddressAndPort>> discoveryNodes;
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index 488df2e00f..f5b93d2619 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -62,7 +62,7 @@ package org.apache.cassandra.tcm;
      }
 
      public static void initialize(Set<InetAddressAndPort> seeds,
-                                   Function<ClusterMetadataService.Processor, 
ClusterMetadataService.Processor> wrapProcessor,
+                                   Function<Processor, Processor> 
wrapProcessor,
                                    Runnable initMessaging) throws 
InterruptedException, ExecutionException
      {
          switch (StartupMode.get(seeds))
@@ -108,7 +108,7 @@ package org.apache.cassandra.tcm;
          ClusterMetadataService.instance().commit(initialize);
      }
 
-     public static void 
initializeAsNonCmsNode(Function<ClusterMetadataService.Processor, 
ClusterMetadataService.Processor> wrapProcessor)
+     public static void initializeAsNonCmsNode(Function<Processor, Processor> 
wrapProcessor)
      {
          ClusterMetadata initial = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
          initial.schema.initializeKeyspaceInstances(DistributedSchema.empty());
@@ -163,7 +163,7 @@ package org.apache.cassandra.tcm;
      /**
       * This should only be called during startup.
       */
-     public static void 
initializeFromGossip(Function<ClusterMetadataService.Processor, 
ClusterMetadataService.Processor> wrapProcessor, Runnable initMessaging)
+     public static void initializeFromGossip(Function<Processor, Processor> 
wrapProcessor, Runnable initMessaging)
      {
          ClusterMetadata emptyFromSystemTables = 
emptyWithSchemaFromSystemTables();
          
emptyFromSystemTables.schema.initializeKeyspaceInstances(DistributedSchema.empty());
diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java 
b/src/java/org/apache/cassandra/tcm/Transformation.java
index 953d3d174c..389b90f0a7 100644
--- a/src/java/org/apache/cassandra/tcm/Transformation.java
+++ b/src/java/org/apache/cassandra/tcm/Transformation.java
@@ -46,11 +46,11 @@ import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.tcm.transformations.SealPeriod;
 import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.tcm.transformations.UnsafeJoin;
-import org.apache.cassandra.tcm.transformations.cms.FinishAddMember;
+import org.apache.cassandra.tcm.transformations.cms.FinishAddToCMS;
 import org.apache.cassandra.tcm.transformations.cms.Initialize;
 import org.apache.cassandra.tcm.transformations.cms.PreInitialize;
-import org.apache.cassandra.tcm.transformations.cms.RemoveMember;
-import org.apache.cassandra.tcm.transformations.cms.StartAddMember;
+import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS;
+import org.apache.cassandra.tcm.transformations.cms.StartAddToCMS;
 
 public interface Transformation
 {
@@ -178,9 +178,9 @@ public interface Transformation
 
         CANCEL_SEQUENCE(() -> CancelInProgressSequence.serializer),
 
-        START_ADD_TO_CMS(() -> StartAddMember.serializer),
-        FINISH_ADD_TO_CMS(() -> FinishAddMember.serializer),
-        REMOVE_FROM_CMS(() -> RemoveMember.serializer),
+        START_ADD_TO_CMS(() -> StartAddToCMS.serializer),
+        FINISH_ADD_TO_CMS(() -> FinishAddToCMS.serializer),
+        REMOVE_FROM_CMS(() -> RemoveFromCMS.serializer),
 
         STARTUP(() -> Startup.serializer),
 
diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java 
b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
index 71bb0424d8..d0c340608b 100644
--- a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
@@ -18,14 +18,14 @@
 
 package org.apache.cassandra.tcm.migration;
 
-import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Commit;
+import org.apache.cassandra.tcm.Processor;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.ClusterMetadata;
 
-public class GossipProcessor implements ClusterMetadataService.Processor
+public class GossipProcessor implements Processor
 {
     @Override
     public Commit.Result commit(Entry.Id entryId, Transformation transform, 
Epoch lastKnown)
diff --git a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java 
b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
index 8026689e5f..121b5122d2 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
@@ -24,11 +24,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -47,14 +49,17 @@ import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
-import org.apache.cassandra.tcm.transformations.cms.FinishAddMember;
-import org.apache.cassandra.tcm.transformations.cms.StartAddMember;
+import org.apache.cassandra.tcm.transformations.cms.FinishAddToCMS;
+import org.apache.cassandra.tcm.transformations.cms.StartAddToCMS;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.db.TypeSizes.sizeof;
 import static 
org.apache.cassandra.tcm.sequences.InProgressSequences.Kind.JOIN_OWNERSHIP_GROUP;
 import static org.apache.cassandra.tcm.transformations.cms.EntireRange.*;
 
+/**
+ * Add this or another node as a member of CMS.
+ */
 public class AddToCMS implements InProgressSequence<AddToCMS>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AddToCMS.class);
@@ -62,15 +67,19 @@ public class AddToCMS implements 
InProgressSequence<AddToCMS>
 
     private final ProgressBarrier barrier;
     private final List<InetAddressAndPort> streamCandidates;
-    private final FinishAddMember finishJoin;
+    private final FinishAddToCMS finishJoin;
 
     public static void initiate()
     {
-        NodeId self = ClusterMetadata.current().myNodeId();
+        initiate(ClusterMetadata.current().myNodeId(), 
FBUtilities.getBroadcastAddressAndPort());
+    }
+
+    public static void initiate(NodeId nodeId, InetAddressAndPort addr)
+    {
         InProgressSequence<?> continuation = ClusterMetadataService.instance()
-                                                                   .commit(new 
StartAddMember(FBUtilities.getBroadcastAddressAndPort()),
-                                                                           
(metadata) -> !metadata.inProgressSequences.contains(self),
-                                                                           
(metadata) -> metadata.inProgressSequences.get(self),
+                                                                   .commit(new 
StartAddToCMS(addr),
+                                                                           
(metadata) -> !metadata.inProgressSequences.contains(nodeId),
+                                                                           
(metadata) -> metadata.inProgressSequences.get(nodeId),
                                                                            
(metadata, reason) -> {
                                                                                
throw new IllegalStateException("Can't join ownership group: " + reason);
                                                                            });
@@ -81,7 +90,7 @@ public class AddToCMS implements InProgressSequence<AddToCMS>
         continuation.executeNext();
     }
 
-    public AddToCMS(ProgressBarrier barrier, List<InetAddressAndPort> 
streamCandidates, FinishAddMember join)
+    public AddToCMS(ProgressBarrier barrier, List<InetAddressAndPort> 
streamCandidates, FinishAddToCMS join)
     {
         this.barrier = barrier;
         this.streamCandidates = streamCandidates;
@@ -101,13 +110,26 @@ public class AddToCMS implements 
InProgressSequence<AddToCMS>
 
     private void streamRanges() throws ExecutionException, InterruptedException
     {
-        // TODO: iterate over stream candidates
         StreamPlan streamPlan = new StreamPlan(StreamOperation.BOOTSTRAP, 1, 
true, null, PreviewKind.NONE);
-        streamPlan.requestRanges(streamCandidates.get(0),
-                                 SchemaConstants.METADATA_KEYSPACE_NAME,
-                                 new 
RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).add(finishJoin.replicaForStreaming()).build(),
-                                 new 
RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).build(),
-                                 DistributedMetadataLogKeyspace.TABLE_NAME);
+        // Current node is the streaming target. We can pick any other live 
CMS node as a streaming source
+        if 
(finishJoin.getEndpoint().equals(FBUtilities.getBroadcastAddressAndPort()))
+        {
+            Optional<InetAddressAndPort> streamingSource = 
streamCandidates.stream().filter(FailureDetector.instance::isAlive).findFirst();
+            if (!streamingSource.isPresent())
+                throw new IllegalStateException(String.format("Can not start 
range streaming as all candidates (%s) are down", streamCandidates));
+            streamPlan.requestRanges(streamingSource.get(),
+                                     SchemaConstants.METADATA_KEYSPACE_NAME,
+                                     new 
RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).add(finishJoin.replicaForStreaming()).build(),
+                                     new 
RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).build(),
+                                     
DistributedMetadataLogKeyspace.TABLE_NAME);
+        }
+        else
+        {
+            streamPlan.transferRanges(finishJoin.getEndpoint(),
+                                      SchemaConstants.METADATA_KEYSPACE_NAME,
+                                      new 
RangesAtEndpoint.Builder(finishJoin.replicaForStreaming().endpoint()).add(finishJoin.replicaForStreaming()).build(),
+                                      
DistributedMetadataLogKeyspace.TABLE_NAME);
+        }
         streamPlan.execute().get();
     }
 
@@ -164,10 +186,10 @@ public class AddToCMS implements 
InProgressSequence<AddToCMS>
     {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-        AddToCMS addToCMS = (AddToCMS) o;
-        return Objects.equals(barrier, addToCMS.barrier) &&
-               Objects.equals(streamCandidates, addToCMS.streamCandidates) &&
-               Objects.equals(finishJoin, addToCMS.finishJoin);
+        AddToCMS addMember = (AddToCMS) o;
+        return Objects.equals(barrier, addMember.barrier) &&
+               Objects.equals(streamCandidates, addMember.streamCandidates) &&
+               Objects.equals(finishJoin, addMember.finishJoin);
     }
 
     @Override
@@ -183,7 +205,7 @@ public class AddToCMS implements 
InProgressSequence<AddToCMS>
         {
             AddToCMS seq = (AddToCMS) t;
             ProgressBarrier.serializer.serialize(t.barrier(), out, version);
-            FinishAddMember.serializer.serialize(seq.finishJoin, out, version);
+            FinishAddToCMS.serializer.serialize(seq.finishJoin, out, version);
             out.writeInt(seq.streamCandidates.size());
             for (InetAddressAndPort ep : seq.streamCandidates)
                 InetAddressAndPort.MetadataSerializer.serializer.serialize(ep, 
out, version);
@@ -193,7 +215,7 @@ public class AddToCMS implements 
InProgressSequence<AddToCMS>
         public AddToCMS deserialize(DataInputPlus in, Version version) throws 
IOException
         {
             ProgressBarrier barrier = 
ProgressBarrier.serializer.deserialize(in, version);
-            FinishAddMember finish = 
FinishAddMember.serializer.deserialize(in, version);
+            FinishAddToCMS finish = FinishAddToCMS.serializer.deserialize(in, 
version);
             int streamCandidatesSize = in.readInt();
             List<InetAddressAndPort> streamCandidates = new ArrayList<>();
 
@@ -207,7 +229,7 @@ public class AddToCMS implements 
InProgressSequence<AddToCMS>
         {
             AddToCMS seq = (AddToCMS) t;
             long size = ProgressBarrier.serializer.serializedSize(t.barrier(), 
version);
-            size += FinishAddMember.serializer.serializedSize(seq.finishJoin, 
version);
+            size += FinishAddToCMS.serializer.serializedSize(seq.finishJoin, 
version);
             size += sizeof(seq.streamCandidates.size());
             for (InetAddressAndPort ep : seq.streamCandidates)
                 size += 
InetAddressAndPort.MetadataSerializer.serializer.serializedSize(ep, version);
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java
 
b/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java
index f8cebf22a1..9708e20656 100644
--- 
a/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java
+++ 
b/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java
@@ -39,6 +39,12 @@ public abstract class BaseMembershipTransformation 
implements Transformation
         this.replica = EntireRange.replica(endpoint);
     }
 
+    // TODO: to node id
+    public InetAddressAndPort getEndpoint()
+    {
+        return endpoint;
+    }
+
     public static abstract class SerializerBase<T extends 
BaseMembershipTransformation> implements 
AsymmetricMetadataSerializer<Transformation, T>
     {
         public void serialize(Transformation t, DataOutputPlus out, Version 
version) throws IOException
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddToCMS.java
similarity index 86%
rename from 
src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java
rename to 
src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddToCMS.java
index 2034105e09..de203d044d 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddToCMS.java
@@ -32,17 +32,17 @@ import 
org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
 
 import static 
org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges;
 
-public class FinishAddMember extends BaseMembershipTransformation
+public class FinishAddToCMS extends BaseMembershipTransformation
 {
-    public static final AsymmetricMetadataSerializer<Transformation, 
FinishAddMember> serializer = new SerializerBase<FinishAddMember>()
+    public static final AsymmetricMetadataSerializer<Transformation, 
FinishAddToCMS> serializer = new SerializerBase<FinishAddToCMS>()
     {
-        public FinishAddMember createTransformation(InetAddressAndPort addr)
+        public FinishAddToCMS createTransformation(InetAddressAndPort addr)
         {
-            return new FinishAddMember(addr);
+            return new FinishAddToCMS(addr);
         }
     };
 
-    public FinishAddMember(InetAddressAndPort addr)
+    public FinishAddToCMS(InetAddressAndPort addr)
     {
         super(addr);
     }
@@ -77,4 +77,12 @@ public class FinishAddMember extends 
BaseMembershipTransformation
                                  
.with(prev.inProgressSequences.without(targetNode));
         return success(transformer, affectedRanges);
     }
+
+    public String toString()
+    {
+        return "FinishAddMember{" +
+               "endpoint=" + endpoint +
+               ", replica=" + replica +
+               '}';
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveFromCMS.java
similarity index 86%
rename from 
src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java
rename to 
src/java/org/apache/cassandra/tcm/transformations/cms/RemoveFromCMS.java
index efc29bd53c..10f459f107 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveFromCMS.java
@@ -27,17 +27,17 @@ import 
org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
 
 import static 
org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges;
 
-public class RemoveMember extends BaseMembershipTransformation
+public class RemoveFromCMS extends BaseMembershipTransformation
 {
-    public static final AsymmetricMetadataSerializer<Transformation, 
RemoveMember> serializer = new SerializerBase<RemoveMember>()
+    public static final AsymmetricMetadataSerializer<Transformation, 
RemoveFromCMS> serializer = new SerializerBase<RemoveFromCMS>()
     {
-        public RemoveMember createTransformation(InetAddressAndPort addr)
+        public RemoveFromCMS createTransformation(InetAddressAndPort addr)
         {
-            return new RemoveMember(addr);
+            return new RemoveFromCMS(addr);
         }
     };
 
-    public RemoveMember(InetAddressAndPort addr)
+    public RemoveFromCMS(InetAddressAndPort addr)
     {
         super(addr);
     }
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddToCMS.java
similarity index 86%
rename from 
src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java
rename to 
src/java/org/apache/cassandra/tcm/transformations/cms/StartAddToCMS.java
index 4dcf2e52a5..0726e65c96 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddToCMS.java
@@ -34,17 +34,17 @@ import 
org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
 
 import static 
org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges;
 
-public class StartAddMember extends BaseMembershipTransformation
+public class StartAddToCMS extends BaseMembershipTransformation
 {
-    public static final AsymmetricMetadataSerializer<Transformation, 
StartAddMember> serializer = new SerializerBase<StartAddMember>()
+    public static final AsymmetricMetadataSerializer<Transformation, 
StartAddToCMS> serializer = new SerializerBase<StartAddToCMS>()
     {
-        public StartAddMember createTransformation(InetAddressAndPort addr)
+        public StartAddToCMS createTransformation(InetAddressAndPort addr)
         {
-            return new StartAddMember(addr);
+            return new StartAddToCMS(addr);
         }
     };
 
-    public StartAddMember(InetAddressAndPort addr)
+    public StartAddToCMS(InetAddressAndPort addr)
     {
         super(addr);
     }
@@ -76,9 +76,17 @@ public class StartAddMember extends 
BaseMembershipTransformation
         }
 
         ProgressBarrier barrier = new ProgressBarrier(prev.nextEpoch(), 
affectedRanges.toPeers(prev.placements, prev.directory), false);
-        AddToCMS joinSequence = new AddToCMS(barrier, streamCandidates, new 
FinishAddMember(endpoint));
+        AddToCMS joinSequence = new AddToCMS(barrier, streamCandidates, new 
FinishAddToCMS(endpoint));
 
         return 
success(transformer.with(prev.inProgressSequences.with(prev.directory.peerId(replica.endpoint()),
 joinSequence)),
                        affectedRanges);
     }
+
+    public String toString()
+    {
+        return "StartAddMember{" +
+               "endpoint=" + endpoint +
+               ", replica=" + replica +
+               '}';
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
index 877b6a0409..912d0a7aa5 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Commit;
 import org.apache.cassandra.tcm.MetadataSnapshots;
+import org.apache.cassandra.tcm.Processor;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogStorage;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
@@ -82,7 +83,7 @@ public class CMSTestBase
         public final SchemaProvider schemaProvider;
         public final ReplicationFactor replication;
 
-        public CMSSut(IIsolatedExecutor.SerializableFunction<LocalLog, 
ClusterMetadataService.Processor> processorFactory, boolean addListeners, int 
rf)
+        public CMSSut(IIsolatedExecutor.SerializableFunction<LocalLog, 
Processor> processorFactory, boolean addListeners, int rf)
         {
             partitioner = Murmur3Partitioner.instance;
             replication = ReplicationFactor.fullOnly(rf);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
index ddf38ff419..2b6f647679 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
@@ -68,13 +68,7 @@ import org.apache.cassandra.gms.GossipDigestAck;
 import org.apache.cassandra.gms.GossipDigestSyn;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.SimpleSeedProvider;
-import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
-import org.apache.cassandra.tcm.Commit;
-import org.apache.cassandra.tcm.Discovery;
-import org.apache.cassandra.tcm.Epoch;
-import org.apache.cassandra.tcm.Transformation;
-import org.apache.cassandra.tcm.MetadataSnapshots;
-import org.apache.cassandra.tcm.Replay;
+import org.apache.cassandra.tcm.*;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogState;
@@ -86,12 +80,10 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.NoPayload;
 import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.membership.Location;
 import org.apache.cassandra.tcm.membership.NodeAddresses;
 import org.apache.cassandra.tcm.membership.NodeVersion;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
-import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.utils.AssertUtil;
 import org.apache.cassandra.utils.Closeable;
 import org.apache.cassandra.utils.FBUtilities;
@@ -709,7 +701,6 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
 
             // We would like all messages directed to the node under test to 
be delivered it.
             this.nodes.put(nodeUnderTestAddr, new RealSimulatedNode(this, 1, 
nodeUnderTestAddr.toString(), tokenSupplier.token(1)) {
-//                @Test // todo - having this here makes the build fail
                 public boolean test(Message<?> message)
                 {
                     
realCluster.get(1).receiveMessage(Instance.serializeMessage(message.from(), 
nodeUnderTestAddr, message));
@@ -723,7 +714,6 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
             assert executor == null;
             LogStorage logStorage = new 
AtomicLongBackedProcessor.InMemoryStorage();
             LocalLog log = LocalLog.sync(new ClusterMetadata(partitioner), 
logStorage, false);
-            AtomicLongBackedProcessor processor = new 
AtomicLongBackedProcessor(log);
 
             // Replicator only replicates to the node under test, as there are 
no other nodes in reality
             Commit.Replicator replicator = (result, source) -> {
@@ -732,6 +722,8 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
                                                                      
Message.out(Verb.TCM_REPLICATION, result.success().replication)));
             };
 
+            AtomicLongBackedProcessor processor = new 
AtomicLongBackedProcessor(log);
+
             ClusterMetadataService service = new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                         new 
AtomicLongBackedProcessor.InMemoryMetadataSnapshots(),
                                                                         log,
@@ -804,7 +796,7 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
             new ClusterMetadataService(new UniformRangePlacement(),
                                        MetadataSnapshots.NO_OP,
                                        log,
-                                       new ClusterMetadataService.Processor()
+                                       new Processor()
                                        {
                                            public Commit.Result 
commit(Entry.Id entryId, Transformation event, Epoch lastKnown)
                                            {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java
index dd4f1ec74c..ba0d5e4387 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java
@@ -26,25 +26,25 @@ import java.util.function.BiFunction;
 import java.util.function.Predicate;
 
 import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Commit;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.Processor;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TestProcessor implements ClusterMetadataService.Processor
+public class TestProcessor implements Processor
 {
     private static final Logger logger = 
LoggerFactory.getLogger(TestProcessor.class);
     private final AtomicBoolean isPaused = new AtomicBoolean();
     private final WaitQueue waiters;
     private final List<Predicate<Transformation>> waitPredicates;
     private final List<BiFunction<Transformation, Commit.Result, Boolean>> 
commitPredicates;
-    private final ClusterMetadataService.Processor delegate;
+    private final Processor delegate;
 
-    public TestProcessor(ClusterMetadataService.Processor delegate)
+    public TestProcessor(Processor delegate)
     {
         this.waiters = WaitQueue.newWaitQueue();
         this.waitPredicates = new ArrayList<>();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/CMSMembershipTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/CMSMembershipTest.java
new file mode 100644
index 0000000000..c3c6698544
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/CMSMembershipTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.test.log.FuzzTestBase;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
+import org.apache.cassandra.tcm.transformations.CustomTransformation;
+import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CMSMembershipTest extends FuzzTestBase
+{
+    @Test
+    public void joinCmsTest() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(3).start())
+        {
+            for (int idx : new int[]{ 2, 3 })
+            {
+                cluster.get(idx).runOnInstance(() -> {
+                    AddToCMS.initiate();
+                    
ClusterMetadataService.instance().commit(CustomTransformation.make(idx));
+
+                });
+            }
+        }
+    }
+
+    @Test
+    public void expandCmsTest() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(3).start())
+        {
+            cluster.get(1).runOnInstance(() -> {
+                ClusterMetadata metadata = ClusterMetadata.current();
+                for (NodeId nodeId : metadata.directory.peerIds())
+                {
+                    if (nodeId.equals(metadata.myNodeId()))
+                        continue;
+                    AddToCMS.initiate(nodeId, 
metadata.directory.getNodeAddresses(nodeId).broadcastAddress);
+                }
+            });
+
+            for (int idx : new int[]{ 1, 2, 3 })
+            {
+                cluster.get(idx).runOnInstance(() -> {
+                    
ClusterMetadataService.instance().commit(CustomTransformation.make(idx));
+                });
+            }
+
+            for (int idx : new int[]{ 1, 2, 3 })
+            {
+                cluster.get(idx).runOnInstance(() -> {
+                    ClusterMetadataService.instance().replayAndWait();
+                    ClusterMetadata metadata = ClusterMetadata.current();
+                    
Assert.assertTrue(metadata.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort()));
+                });
+            }
+        }
+    }
+
+    @Test
+    public void shrinkCmsTest() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(3).start())
+        {
+            cluster.get(1).runOnInstance(() -> {
+                ClusterMetadata metadata = ClusterMetadata.current();
+                for (NodeId nodeId : metadata.directory.peerIds())
+                {
+                    if (nodeId.equals(metadata.myNodeId()))
+                        continue;
+                    AddToCMS.initiate(nodeId, 
metadata.directory.getNodeAddresses(nodeId).broadcastAddress);
+                }
+            });
+
+            IMessageFilters.Filter filter = 
cluster.filters().allVerbs().to(3).drop();
+
+            cluster.get(1).runOnInstance(() -> {
+                ClusterMetadata metadata = ClusterMetadata.current();
+                for (InetAddressAndPort addr : 
metadata.directory.allAddresses())
+                {
+                    if (addr.toString().contains("127.0.0.3"))
+                    {
+                        ClusterMetadataService.instance().commit(new 
RemoveFromCMS(addr));
+                        return;
+                    }
+
+                }
+            });
+
+            filter.off();
+
+            for (int idx : new int[]{ 1, 2, 3 })
+            {
+                cluster.get(idx).runOnInstance(() -> {
+                    
ClusterMetadataService.instance().commit(CustomTransformation.make(idx));
+                });
+            }
+
+            for (int idx : new int[]{ 1, 2, 3 })
+            {
+                cluster.get(idx).runOnInstance(() -> {
+                    ClusterMetadataService.instance().replayAndWait();
+                    ClusterMetadata metadata = ClusterMetadata.current();
+                    Assert.assertEquals(idx != 3, 
metadata.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort()));
+                });
+            }
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java
index b248115195..8f11c02553 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java
@@ -43,9 +43,7 @@ import org.apache.cassandra.distributed.test.log.FuzzTestBase;
 import org.apache.cassandra.distributed.test.log.TestProcessor;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
-import org.apache.cassandra.tcm.sequences.AddToCMS;
 import org.apache.cassandra.tcm.transformations.PrepareJoin;
-import org.apache.cassandra.tcm.transformations.CustomTransformation;
 
 public class ConsistentBootstrapTest extends FuzzTestBase
 {
@@ -129,29 +127,4 @@ public class ConsistentBootstrapTest extends FuzzTestBase
             model.validateAll();
         }
     }
-
-    @Test
-    public void joinCmsTest() throws Throwable
-    {
-        try (Cluster cluster = builder().withNodes(3)
-                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
-                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
-                                        .start())
-        {
-            for (int idx : new int[]{ 2, 3 })
-            {
-                cluster.get(idx).runOnInstance(() -> {
-                    AddToCMS.initiate();
-                    try
-                    {
-                        
ClusterMetadataService.instance().commit(CustomTransformation.make(idx));
-                    }
-                    catch (Throwable e)
-                    {
-                        e.printStackTrace();
-                    }
-                });
-            }
-        }
-    }
 }
diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java 
b/test/unit/org/apache/cassandra/ServerTestUtils.java
index 489766174a..5e260c648e 100644
--- a/test/unit/org/apache/cassandra/ServerTestUtils.java
+++ b/test/unit/org/apache/cassandra/ServerTestUtils.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Commit;
 import org.apache.cassandra.tcm.MetadataSnapshots;
+import org.apache.cassandra.tcm.Processor;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogStorage;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
@@ -228,7 +229,7 @@ public final class ServerTestUtils
 
     public static void initCMS()
     {
-        Function<LocalLog, ClusterMetadataService.Processor> processorFactory 
= AtomicLongBackedProcessor::new;
+        Function<LocalLog, Processor> processorFactory = 
AtomicLongBackedProcessor::new;
         IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
         boolean addListeners = true;
         ClusterMetadata initial = new ClusterMetadata(partitioner);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to