This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e05cd4c8d Reuse native transport-driven futures in Debounce.
2e05cd4c8d is described below

commit 2e05cd4c8dd22e458eb1d2dad9cd34936b470266
Author: Alex Petrov <[email protected]>
AuthorDate: Tue May 28 16:55:58 2024 +0200

    Reuse native transport-driven futures in Debounce.
    
    Patch by Alex Petrov; reviewed by Sam Tunnicliffe for CASSANDRA-19158.
---
 .../cassandra/auth/CassandraRoleManager.java       | 28 ++++---
 .../config/CassandraRelevantProperties.java        |  1 +
 .../apache/cassandra/gms/GossipVerbHandler.java    |  2 +
 .../apache/cassandra/service/StorageService.java   | 11 ++-
 .../apache/cassandra/tcm/EpochAwareDebounce.java   | 98 ++++++++--------------
 .../org/apache/cassandra/tcm/PeerLogFetcher.java   | 48 +++++++----
 .../org/apache/cassandra/tcm/RemoteProcessor.java  | 49 ++++++-----
 .../distributed/impl/AbstractCluster.java          | 11 ++-
 .../cassandra/distributed/impl/Instance.java       |  6 +-
 .../cassandra/distributed/test/GossipTest.java     |  1 +
 .../test/HintedHandoffAddRemoveNodesTest.java      |  8 +-
 .../distributed/test/log/CMSCatchupTest.java       | 29 ++++---
 .../test/log/FetchLogFromPeersTest.java            | 24 +++++-
 .../distributed/test/ring/DecommissionTest.java    |  5 +-
 14 files changed, 180 insertions(+), 141 deletions(-)

diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java 
b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 921a45dcb2..a046adc2a9 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -137,11 +137,11 @@ public class CassandraRoleManager implements IRoleManager
     public CassandraRoleManager()
     {
         supportedOptions = DatabaseDescriptor.getAuthenticator() instanceof 
PasswordAuthenticator
-                         ? ImmutableSet.of(Option.LOGIN, Option.SUPERUSER, 
Option.PASSWORD, Option.HASHED_PASSWORD)
-                         : ImmutableSet.of(Option.LOGIN, Option.SUPERUSER);
+                           ? ImmutableSet.of(Option.LOGIN, Option.SUPERUSER, 
Option.PASSWORD, Option.HASHED_PASSWORD)
+                           : ImmutableSet.of(Option.LOGIN, Option.SUPERUSER);
         alterableOptions = DatabaseDescriptor.getAuthenticator() instanceof 
PasswordAuthenticator
-                         ? ImmutableSet.of(Option.PASSWORD, 
Option.HASHED_PASSWORD)
-                         : ImmutableSet.<Option>of();
+                           ? ImmutableSet.of(Option.PASSWORD, 
Option.HASHED_PASSWORD)
+                           : ImmutableSet.<Option>of();
     }
 
     @Override
@@ -149,17 +149,23 @@ public class CassandraRoleManager implements IRoleManager
     {
         loadRoleStatement();
         loadIdentityStatement();
-        if (asyncRoleSetup)
+        if (!asyncRoleSetup)
         {
-            scheduleSetupTask(() -> {
+            try
+            {
+                // Try to set up synchronously
                 setupDefaultRole();
-                return null;
-            });
+                return;
+            }
+            catch (Throwable t)
+            {
+                // We tried to execute the task in a sync way, but failed. Try 
asynchronous setup.
+            }
         }
-        else
-        {
+        scheduleSetupTask(() -> {
             setupDefaultRole();
-        }
+            return null;
+        });
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 57f442d1d1..e1f6d28d86 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -474,6 +474,7 @@ public enum CassandraRelevantProperties
     SET_SEP_THREAD_NAME("cassandra.set_sep_thread_name", "true"),
     SHUTDOWN_ANNOUNCE_DELAY_IN_MS("cassandra.shutdown_announce_in_ms", "2000"),
     SIZE_RECORDER_INTERVAL("cassandra.size_recorder_interval", "300"),
+    SKIP_AUTH_SETUP("cassandra.skip_auth_setup", "false"),
     SKIP_GC_INSPECTOR("cassandra.skip_gc_inspector", "false"),
     
SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE("cassandra.skip_paxos_repair_on_topology_change"),
     /** If necessary for operational purposes, permit certain keyspaces to be 
ignored for paxos topology repairs. */
diff --git a/src/java/org/apache/cassandra/gms/GossipVerbHandler.java 
b/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
index 02aeaf4467..bac0854d07 100644
--- a/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
@@ -20,11 +20,13 @@ package org.apache.cassandra.gms;
 
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.tcm.ClusterMetadataService;
 
 public class GossipVerbHandler<T> implements IVerbHandler<T>
 {
     public void doVerb(Message<T> message)
     {
         
Gossiper.instance.setLastProcessedMessageAt(message.creationTimeMillis());
+        
ClusterMetadataService.instance().fetchLogFromPeerAsync(message.from(), 
message.epoch());
     }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index caf302fbd7..0578326b0e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -446,7 +446,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     private boolean isSurveyMode = TEST_WRITE_SURVEY.getBoolean(false);
     /* true if node is rebuilding and receiving data */
     private volatile boolean initialized = false;
-    private final AtomicBoolean authSetupCalled = new AtomicBoolean(false);
+    private final AtomicBoolean authSetupCalled = new 
AtomicBoolean(CassandraRelevantProperties.SKIP_AUTH_SETUP.getBoolean());
     private volatile boolean authSetupComplete = false;
 
     /* the probability for tracing any particular request, 0 disables tracing 
and 1 enables for all */
@@ -1093,12 +1093,17 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         InProgressSequences.finishInProgressSequences(id);
     }
 
+    void doAuthSetup()
+    {
+        doAuthSetup(true);
+    }
+
     @VisibleForTesting
-    public void doAuthSetup()
+    public void doAuthSetup(boolean async)
     {
         if (!authSetupCalled.getAndSet(true))
         {
-            DatabaseDescriptor.getRoleManager().setup();
+            DatabaseDescriptor.getRoleManager().setup(async);
             DatabaseDescriptor.getAuthenticator().setup();
             DatabaseDescriptor.getAuthorizer().setup();
             DatabaseDescriptor.getNetworkAuthorizer().setup();
diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java 
b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
index f65c03d830..25456dbbce 100644
--- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
+++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
@@ -18,23 +18,11 @@
 
 package org.apache.cassandra.tcm;
 
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
+import java.util.function.Supplier;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
-import org.apache.cassandra.tcm.log.LogState;
-import org.apache.cassandra.utils.ExecutorUtils;
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.Closeable;
 import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.Promise;
 
 /**
  * When debouncing from a replica we know exactly which epoch we need, so to 
avoid retries we
@@ -42,86 +30,66 @@ import org.apache.cassandra.utils.concurrent.Promise;
  * comes in, we create a new future. If a request for a newer epoch comes in, 
we simply
  * swap out the current future reference for a new one which is requesting the 
newer epoch.
  */
-public class EpochAwareDebounce
+public class EpochAwareDebounce implements Closeable
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(EpochAwareDebounce.class);
     public static final EpochAwareDebounce instance = new EpochAwareDebounce();
-    private final AtomicReference<EpochAwareAsyncPromise> currentFuture = new 
AtomicReference<>();
-    private final ExecutorPlus executor;
-    private final List<Promise<LogState>> inflightRequests = new 
CopyOnWriteArrayList<>();
+
+    private final AtomicReference<EpochAwareFuture> currentFuture = new 
AtomicReference<>();
 
     private EpochAwareDebounce()
     {
-        // 2 threads since we might start a new debounce for a newer epoch 
while the old one is executing
-        this.executor = 
ExecutorFactory.Global.executorFactory().pooled("debounce", 2);
     }
 
     /**
      * Deduplicate requests to catch up log state based on the desired epoch. 
Callers supply a target epoch and
      * a function obtain the ClusterMetadata that corresponds with it. It is 
expected that this function will make rpc
      * calls to peers, retrieving a LogState which can be applied locally to 
produce the necessary {@code
-     * ClusterMetadata}. The function takes a {@code Promise<LogState>} as 
input, with the expectation that this
-     * specific instance will be used to provide blocking behaviour when 
making the rpc calls that fetch the {@code
-     * LogState}. These promises are memoized in order to cancel them when 
{@link #shutdownAndWait(long, TimeUnit)} is
-     * called. This causes the fetch function to stop waiting on any in flight 
{@code LogState} requests and prevents
-     * shutdown from being blocked.
+     * ClusterMetadata}. 
      *
-     * @param  fetchFunction executes the request for LogState. It's expected 
that this popluates fetchResult with the
-     *                       successful result.
+     * @param fetchFunction supplies the future that, when dereferenced, will 
yield metadata for the desired epoch
      * @param epoch the desired epoch
      * @return
      */
-    public Future<ClusterMetadata> getAsync(Function<Promise<LogState>, 
ClusterMetadata> fetchFunction,
-                                            Epoch epoch)
+    public Future<ClusterMetadata> getAsync(Supplier<Future<ClusterMetadata>> 
fetchFunction, Epoch epoch)
     {
         while (true)
         {
-            EpochAwareAsyncPromise running = currentFuture.get();
-            if (running != null && !running.isDone() && 
running.epoch.isEqualOrAfter(epoch))
-                return running;
+            EpochAwareFuture running = currentFuture.get();
+            // Someone else is about to install a new future
+            if (running == SENTINEL)
+                continue;
 
-            Promise<LogState> fetchResult = new AsyncPromise<>();
+            if (running != null && !running.future.isDone() && 
running.epoch.isEqualOrAfter(epoch))
+                return running.future;
 
-            EpochAwareAsyncPromise promise = new EpochAwareAsyncPromise(epoch);
-            if (currentFuture.compareAndSet(running, promise))
+            if (currentFuture.compareAndSet(running, SENTINEL))
             {
-                fetchResult.addCallback((logState, error) -> {
-                    logger.debug("Removing future remotely requesting epoch {} 
from in flight list", epoch);
-                    inflightRequests.remove(fetchResult);
-                });
-                inflightRequests.add(fetchResult);
-
-                executor.submit(() -> {
-                    try
-                    {
-                        promise.setSuccess(fetchFunction.apply(fetchResult));
-                    }
-                    catch (Throwable t)
-                    {
-                        fetchResult.cancel(true);
-                        inflightRequests.remove(fetchResult);
-                        promise.setFailure(t);
-                    }
-                });
-                return promise;
+                EpochAwareFuture promise = new EpochAwareFuture(epoch, 
fetchFunction.get());
+                boolean res = currentFuture.compareAndSet(SENTINEL, promise);
+                assert res : "Should not have happened";
+                return promise.future;
             }
         }
     }
 
-    private static class EpochAwareAsyncPromise extends 
AsyncPromise<ClusterMetadata>
+    private static final EpochAwareFuture SENTINEL = new 
EpochAwareFuture(Epoch.EMPTY, null);
+
+    @Override
+    public void close()
+    {
+        EpochAwareFuture future = currentFuture.get();
+        if (future != null && future != SENTINEL)
+            future.future.cancel(true);
+    }
+
+    private static class EpochAwareFuture
     {
         private final Epoch epoch;
-        public EpochAwareAsyncPromise(Epoch epoch)
+        private final Future<ClusterMetadata> future;
+        public EpochAwareFuture(Epoch epoch, Future<ClusterMetadata> future)
         {
             this.epoch = epoch;
+            this.future = future;
         }
     }
-
-    public void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
-    {
-        logger.info("Cancelling {} in flight log fetch requests", 
inflightRequests.size());
-        for (Promise<LogState> toCancel : inflightRequests)
-            toCancel.cancel(true);
-        ExecutorUtils.shutdownAndWait(timeout, unit, executor);
-    }
 }
diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java 
b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
index 61cbc63eac..3564ab93f7 100644
--- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
+++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +34,7 @@ import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.Promise;
 
@@ -74,40 +74,54 @@ public class PeerLogFetcher
 
     public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote, 
Epoch awaitAtleast)
     {
-        Function<Promise<LogState>, ClusterMetadata> fn = promise -> 
fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast);
-        return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast);
+        return EpochAwareDebounce.instance.getAsync(() -> 
fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast);
     }
 
-    private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState> 
remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast)
+    private Future<ClusterMetadata> 
fetchLogEntriesAndWaitInternal(InetAddressAndPort remote, Epoch awaitAtleast)
     {
         Epoch before = ClusterMetadata.current().epoch;
         if (before.isEqualOrAfter(awaitAtleast))
-            return ClusterMetadata.current();
+        {
+            Promise<ClusterMetadata> res = new AsyncPromise<>();
+            res.setSuccess(ClusterMetadata.current());
+            return res;
+        }
 
+        Promise<LogState> fetchRes = new AsyncPromise<>();
         logger.info("Fetching log from {}, at least {}", remote, awaitAtleast);
-
         try (Timer.Context ctx = 
TCMMetrics.instance.fetchPeerLogLatency.time())
         {
-            RemoteProcessor.sendWithCallbackAsync(remoteRequest,
+            RemoteProcessor.sendWithCallbackAsync(fetchRes,
                                                   Verb.TCM_FETCH_PEER_LOG_REQ,
                                                   new FetchPeerLog(before),
-                                                  new 
RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
+                                                  new 
RemoteProcessor.CandidateIterator(Collections.singletonList(remote), false),
                                                   
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
                                                                        new 
Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
-            LogState logState = remoteRequest.awaitUninterruptibly().get();
-            log.append(logState);
-            ClusterMetadata fetched = log.waitForHighestConsecutive();
-            if (fetched.epoch.isEqualOrAfter(awaitAtleast))
-            {
-                TCMMetrics.instance.peerLogEntriesFetched(before, 
logState.latestEpoch());
-                return fetched;
-            }
+
+            return fetchRes.map((logState) -> {
+                log.append(logState);
+                ClusterMetadata fetched = log.waitForHighestConsecutive();
+                if (fetched.epoch.isEqualOrAfter(awaitAtleast))
+                {
+                    TCMMetrics.instance.peerLogEntriesFetched(before, 
logState.latestEpoch());
+                    return fetched;
+                }
+                else
+                {
+                    throw new IllegalStateException(String.format("Queried for 
epoch %s, but could not catch up", awaitAtleast));
+                }
+            });
+
         }
         catch (Throwable t)
         {
+            fetchRes.cancel(true);
             JVMStabilityInspector.inspectThrowable(t);
+
             logger.warn("Unable to fetch log entries from " + remote, t);
+            Promise<ClusterMetadata> res = new AsyncPromise<>();
+            res.setFailure(new IllegalStateException("Unable to fetch log 
entries from " + remote, t));
+            return res;
         }
-        return ClusterMetadata.current();
     }
 }
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index a647d4d8b0..c267d140d4 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
 import java.util.function.Supplier;
 
 import org.slf4j.Logger;
@@ -126,17 +125,16 @@ public final class RemoteProcessor implements Processor
     @Override
     public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline 
retryPolicy)
     {
-        Function<Promise<LogState>, ClusterMetadata> fetchFunction =
-            promise -> fetchLogAndWaitInternal(promise,
-                                               new 
CandidateIterator(candidates(true), false),
-                                               log);
         // Synchonous, non-debounced call if we are waiting for the highest 
epoch (without knowing/caring what it is).
         // Should be used sparingly.
         if (waitFor == null)
-            return fetchFunction.apply(new AsyncPromise<>());
+            return fetchLogAndWait(new CandidateIterator(candidates(true), 
false), log);
 
         try
         {
+            Supplier<Future<ClusterMetadata>> fetchFunction = () -> 
fetchLogAndWaitInternal(new CandidateIterator(candidates(true), false),
+                                                                               
             log);
+
             Future<ClusterMetadata> cmFuture = 
EpochAwareDebounce.instance.getAsync(fetchFunction, waitFor);
             return cmFuture.get(retryPolicy.remainingNanos(), 
TimeUnit.NANOSECONDS);
         }
@@ -152,34 +150,37 @@ public final class RemoteProcessor implements Processor
 
     public static ClusterMetadata fetchLogAndWait(CandidateIterator 
candidateIterator, LocalLog log)
     {
-        Promise<LogState> remoteRequest = new AsyncPromise<>();
-        return fetchLogAndWaitInternal(remoteRequest, candidateIterator, log);
+        try
+        {
+            return fetchLogAndWaitInternal(candidateIterator, 
log).awaitUninterruptibly().get();
+        }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
-    private static ClusterMetadata fetchLogAndWaitInternal(Promise<LogState> 
remoteRequest,
-                                                           CandidateIterator 
candidates,
-                                                           LocalLog log)
+    private static Future<ClusterMetadata> 
fetchLogAndWaitInternal(CandidateIterator candidates,
+                                                                   LocalLog 
log)
     {
         try (Timer.Context ctx = TCMMetrics.instance.fetchCMSLogLatency.time())
         {
+            Promise<LogState> remoteRequest = new AsyncPromise<>();
             Epoch currentEpoch = log.metadata().epoch;
             sendWithCallbackAsync(remoteRequest,
                                   Verb.TCM_FETCH_CMS_LOG_REQ,
                                   new FetchCMSLog(currentEpoch, 
ClusterMetadataService.state() == REMOTE),
                                   candidates,
                                   new 
Retry.Backoff(TCMMetrics.instance.fetchLogRetries));
-            LogState replay = remoteRequest.awaitUninterruptibly().get();
-            if (!replay.isEmpty())
-            {
-                logger.info("Replay request returned replay data: {}", replay);
-                log.append(replay);
-                TCMMetrics.instance.cmsLogEntriesFetched(currentEpoch, 
replay.latestEpoch());
-            }
-            return log.waitForHighestConsecutive();
-        }
-        catch (InterruptedException | ExecutionException e)
-        {
-            throw new RuntimeException(e);
+            return remoteRequest.map((replay) -> {
+                if (!replay.isEmpty())
+                {
+                    logger.info("Replay request returned replay data: {}", 
replay);
+                    log.append(replay);
+                    TCMMetrics.instance.cmsLogEntriesFetched(currentEpoch, 
replay.latestEpoch());
+                }
+                return log.waitForHighestConsecutive();
+            });
         }
     }
 
@@ -206,6 +207,8 @@ public final class RemoteProcessor implements Processor
             {
                 if (promise.isCancelled() || promise.isDone())
                     return;
+                if (Thread.currentThread().isInterrupted())
+                    promise.setFailure(new InterruptedException());
                 if (!candidates.hasNext())
                     promise.tryFailure(new 
IllegalStateException(String.format("Ran out of candidates while sending %s: 
%s", verb, candidates)));
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 4979f7f113..2c3e446d3c 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -1095,8 +1095,15 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster<I
                            .filter(i -> !i.isShutdown())
                            .map(IInstance::shutdown)
                            .collect(Collectors.toList());
-        FBUtilities.waitOnFutures(futures,1L, TimeUnit.MINUTES);
-
+        try
+        {
+            FBUtilities.waitOnFutures(futures, 1L, TimeUnit.MINUTES);
+        }
+        catch (Throwable t)
+        {
+            checkForThreadLeaks();
+            throw t;
+        }
         instances.clear();
         instanceMap.clear();
         PathUtils.setDeletionListener(ignore -> {});
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index e53f7c724a..ed0497180d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -849,7 +849,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
     public void postStartup()
     {
         sync(() ->
-            StorageService.instance.doAuthSetup()
+            StorageService.instance.doAuthSetup(false)
         ).run();
     }
 
@@ -940,8 +940,8 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                                 () -> SSTableReader.shutdownBlocking(1L, 
MINUTES),
                                 () -> 
shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
                                 () -> 
ActiveRepairService.instance().shutdownNowAndWait(1L, MINUTES),
-                                () -> SnapshotManager.shutdownAndWait(1L, 
MINUTES),
-                                () -> 
EpochAwareDebounce.instance.shutdownAndWait(1L, MINUTES)
+                                () -> EpochAwareDebounce.instance.close(),
+                                () -> SnapshotManager.shutdownAndWait(1L, 
MINUTES)
             );
 
             internodeMessagingStarted = false;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index bdcf3fad5a..e66a85a3f2 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@ -262,6 +262,7 @@ public class GossipTest extends TestBaseImpl
             stopUnchecked(toRemove);
             replaceHostAndStart(cluster, toRemove);
             Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); // 
wait a few gossip rounds
+            ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1), 4);
             cluster.get(2).runOnInstance(() -> 
assertFalse(Gossiper.instance.endpointStateMap.containsKey(InetAddressAndPort.getByNameUnchecked(node4))));
             cluster.get(3).runOnInstance(() -> 
assertFalse(Gossiper.instance.endpointStateMap.containsKey(InetAddressAndPort.getByNameUnchecked(node4))));
             cluster.get(3).nodetoolResult("disablegossip").asserts().success();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
index 3ebed55ef4..45f028929e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
@@ -56,14 +56,17 @@ public class HintedHandoffAddRemoveNodesTest extends 
TestBaseImpl
     @Test
     public void shouldAvoidHintTransferOnDecommission() throws Exception
     {
-        try (Cluster cluster = init(builder().withNodes(3)
+        // This test was written with expectation that auth table is going to 
be empty. Since auth setup became syncrhonous for tests
+        // we need to skip it now, since otherwise streaming will fail.
+        try (WithProperties properties = new 
WithProperties().set(CassandraRelevantProperties.SKIP_AUTH_SETUP, "true");
+             Cluster cluster = init(builder().withNodes(3)
                                              .withConfig(config -> 
config.set("transfer_hints_on_decommission", false)
                                                                          
.set("progress_barrier_timeout", "1000ms")
                                                                          
.set("progress_barrier_backoff", "100ms")
                                                                          // 
Just to make test pass faster
                                                                          
.set("progress_barrier_min_consistency_level", NODE_LOCAL)
                                                                          
.set("progress_barrier_default_consistency_level", NODE_LOCAL)
-                                                                         
.with(GOSSIP))
+                                                                         
.with(GOSSIP, NETWORK))
                                              .start()))
         {
             cluster.schemaChange(withKeyspace("CREATE TABLE 
%s.decom_no_hints_test (key int PRIMARY KEY, value int)"));
@@ -83,6 +86,7 @@ public class HintedHandoffAddRemoveNodesTest extends 
TestBaseImpl
             assertThat(hintsAfterShutdown).isEqualTo(1);
 
             cluster.get(2).runOnInstance(() -> 
setProgressBarrierMinConsistencyLevel(org.apache.cassandra.db.ConsistencyLevel.ONE));
+            ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1), 3);
             cluster.get(2).nodetoolResult("decommission", 
"--force").asserts().success();
             long hintsDeliveredByDecom = countHintsDelivered(cluster.get(2));
             String mode = cluster.get(2).callOnInstance(() -> 
StorageService.instance.getOperationMode());
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
index 3e8cb9c297..466dab1a8d 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
@@ -18,7 +18,9 @@
 
 package org.apache.cassandra.distributed.test.log;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
 
 import org.junit.Test;
 
@@ -29,8 +31,6 @@ import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.tcm.sequences.AddToCMS;
 
-import static org.junit.Assert.assertTrue;
-
 public class CMSCatchupTest extends TestBaseImpl
 {
     @Test
@@ -47,24 +47,31 @@ public class CMSCatchupTest extends TestBaseImpl
             cluster.filters().inbound().from(1).to(2).drop();
             cluster.filters().inbound().from(3).to(2).drop();
             AtomicInteger fetchedFromPeer = new AtomicInteger();
+            
cluster.filters().inbound().from(2).to(4).verbs(Verb.GOSSIP_DIGEST_ACK.id, 
Verb.GOSSIP_DIGEST_SYN.id, Verb.GOSSIP_DIGEST_ACK2.id).drop();
             cluster.filters().inbound().from(2).to(4).messagesMatching((from, 
to, msg) -> {
                 if (msg.verb() == Verb.TCM_FETCH_PEER_LOG_REQ.id)
                     fetchedFromPeer.getAndIncrement();
                 return false;
-            }).drop().on();
+            }).drop();
 
-            long mark = cluster.get(4).logs().mark();
+            int before = fetchedFromPeer.get();
             cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl 
with comment='test 123'"), ConsistencyLevel.ONE);
-            cluster.get(4).logs().watchFor(mark, "AlterOptions");
-
-            mark = cluster.get(2).logs().mark();
             cluster.get(1).shutdown().get();
-            cluster.get(2).logs().watchFor(mark, "/127.0.0.1:7012 state jump 
to shutdown");
+
             // node2, a CMS member, is now behind and node1 is shut down.
-            // Try reading at QUORUM from node4, node2 should detect it's 
behind and catch up from node4
-            int before = fetchedFromPeer.get();
+            // Try reading at QUORUM from node4, node2 should detect it's 
behind and catch up from node4, if it has not by now.
             cluster.coordinator(4).execute(withKeyspace("select * from %s.tbl 
where id = 55"), ConsistencyLevel.QUORUM);
-            assertTrue(fetchedFromPeer.get() > before);
+
+            long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
+            BooleanSupplier condition = () -> fetchedFromPeer.get() > before;
+            while (true)
+            {
+                if (System.nanoTime() > deadline)
+                    throw new AssertionError("Condition did not trigger before 
the deadline");
+
+                if (condition.getAsBoolean())
+                    return;
+            }
         }
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
index 6996f0c190..70f31181bb 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
@@ -21,8 +21,10 @@ package org.apache.cassandra.distributed.test.log;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.distributed.Cluster;
@@ -79,6 +81,7 @@ public class FetchLogFromPeersTest extends TestBaseImpl
             cluster.schemaChange(withKeyspace("alter keyspace %s with 
replication = {'class':'SimpleStrategy', 'replication_factor':3}"));
             cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key)"));
 
+            // Create divergence between node 1 and 2
             cluster.filters().inbound().from(1).to(2).drop();
 
             IInstanceConfig config = cluster.newInstanceConfig();
@@ -87,14 +90,31 @@ public class FetchLogFromPeersTest extends TestBaseImpl
 
             cluster.get(1).shutdown().get();
 
+            ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(3), 1, 2);
+            Assert.assertEquals(2,
+                                cluster.stream().filter(i -> i.config().num() 
!= 1).map(i -> {
+                                    return i.callOnInstance(() -> {
+                                        return 
ClusterMetadata.current().epoch.getEpoch();
+                                    });
+                                }).collect(Collectors.toSet()).size());
+
             // node2 is behind, writing to it will cause a failure, but it 
will then catch up
             try
             {
                 cluster.coordinator(2).execute(withKeyspace("insert into 
%s.tbl (id) values (3)"), ConsistencyLevel.QUORUM);
                 fail("writing should fail");
             }
-            catch (Exception ignored) {}
-
+            catch (Exception writeTimeout)
+            {
+                
Assert.assertTrue(writeTimeout.getMessage().contains("Operation timed out"));
+            }
+            ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(3), 1);
+            Assert.assertEquals(1,
+                                cluster.stream().filter(i -> i.config().num() 
!= 1).map(i -> {
+                                    return i.callOnInstance(() -> {
+                                        return 
ClusterMetadata.current().epoch.getEpoch();
+                                    });
+                                }).collect(Collectors.toSet()).size());
             cluster.coordinator(2).execute(withKeyspace("insert into %s.tbl 
(id) values (3)"), ConsistencyLevel.QUORUM);
         }
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
index 5ca7bd9f5d..9826d013d5 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.tcm.ClusterMetadata;
@@ -133,7 +134,7 @@ public class DecommissionTest extends TestBaseImpl
     @Test
     public void testMixedVersionBlockDecom() throws IOException {
         try (Cluster cluster = builder().withNodes(3)
-                                        .withConfig(config -> 
config.with(GOSSIP))
+                                        .withConfig(config -> 
config.with(GOSSIP, NETWORK))
                                         .start())
         {
             cluster.get(3).nodetoolResult("decommission", 
"--force").asserts().success();
@@ -157,7 +158,7 @@ public class DecommissionTest extends TestBaseImpl
                                                                      new 
NodeVersion(new CassandraVersion("6.0.0"),
                                                                                
      NodeVersion.CURRENT_METADATA_VERSION)));
             });
-
+            ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1), 3);
             NodeToolResult res = cluster.get(2).nodetoolResult("decommission", 
"--force");
             res.asserts().failure();
             assertTrue(res.getStdout().contains("Upgrade in progress"));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to