This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2e05cd4c8d Reuse native transport-driven futures in Debounce.
2e05cd4c8d is described below
commit 2e05cd4c8dd22e458eb1d2dad9cd34936b470266
Author: Alex Petrov <[email protected]>
AuthorDate: Tue May 28 16:55:58 2024 +0200
Reuse native transport-driven futures in Debounce.
Patch by Alex Petrov; reviewed by Sam Tunnicliffe for CASSANDRA-19158.
---
.../cassandra/auth/CassandraRoleManager.java | 28 ++++---
.../config/CassandraRelevantProperties.java | 1 +
.../apache/cassandra/gms/GossipVerbHandler.java | 2 +
.../apache/cassandra/service/StorageService.java | 11 ++-
.../apache/cassandra/tcm/EpochAwareDebounce.java | 98 ++++++++--------------
.../org/apache/cassandra/tcm/PeerLogFetcher.java | 48 +++++++----
.../org/apache/cassandra/tcm/RemoteProcessor.java | 49 ++++++-----
.../distributed/impl/AbstractCluster.java | 11 ++-
.../cassandra/distributed/impl/Instance.java | 6 +-
.../cassandra/distributed/test/GossipTest.java | 1 +
.../test/HintedHandoffAddRemoveNodesTest.java | 8 +-
.../distributed/test/log/CMSCatchupTest.java | 29 ++++---
.../test/log/FetchLogFromPeersTest.java | 24 +++++-
.../distributed/test/ring/DecommissionTest.java | 5 +-
14 files changed, 180 insertions(+), 141 deletions(-)
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 921a45dcb2..a046adc2a9 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -137,11 +137,11 @@ public class CassandraRoleManager implements IRoleManager
public CassandraRoleManager()
{
supportedOptions = DatabaseDescriptor.getAuthenticator() instanceof
PasswordAuthenticator
- ? ImmutableSet.of(Option.LOGIN, Option.SUPERUSER,
Option.PASSWORD, Option.HASHED_PASSWORD)
- : ImmutableSet.of(Option.LOGIN, Option.SUPERUSER);
+ ? ImmutableSet.of(Option.LOGIN, Option.SUPERUSER,
Option.PASSWORD, Option.HASHED_PASSWORD)
+ : ImmutableSet.of(Option.LOGIN, Option.SUPERUSER);
alterableOptions = DatabaseDescriptor.getAuthenticator() instanceof
PasswordAuthenticator
- ? ImmutableSet.of(Option.PASSWORD,
Option.HASHED_PASSWORD)
- : ImmutableSet.<Option>of();
+ ? ImmutableSet.of(Option.PASSWORD,
Option.HASHED_PASSWORD)
+ : ImmutableSet.<Option>of();
}
@Override
@@ -149,17 +149,23 @@ public class CassandraRoleManager implements IRoleManager
{
loadRoleStatement();
loadIdentityStatement();
- if (asyncRoleSetup)
+ if (!asyncRoleSetup)
{
- scheduleSetupTask(() -> {
+ try
+ {
+ // Try to set up synchronously
setupDefaultRole();
- return null;
- });
+ return;
+ }
+ catch (Throwable t)
+ {
+ // We tried to execute the task in a sync way, but failed. Try
asynchronous setup.
+ }
}
- else
- {
+ scheduleSetupTask(() -> {
setupDefaultRole();
- }
+ return null;
+ });
}
@Override
diff --git
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 57f442d1d1..e1f6d28d86 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -474,6 +474,7 @@ public enum CassandraRelevantProperties
SET_SEP_THREAD_NAME("cassandra.set_sep_thread_name", "true"),
SHUTDOWN_ANNOUNCE_DELAY_IN_MS("cassandra.shutdown_announce_in_ms", "2000"),
SIZE_RECORDER_INTERVAL("cassandra.size_recorder_interval", "300"),
+ SKIP_AUTH_SETUP("cassandra.skip_auth_setup", "false"),
SKIP_GC_INSPECTOR("cassandra.skip_gc_inspector", "false"),
SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE("cassandra.skip_paxos_repair_on_topology_change"),
/** If necessary for operational purposes, permit certain keyspaces to be
ignored for paxos topology repairs. */
diff --git a/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
b/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
index 02aeaf4467..bac0854d07 100644
--- a/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
@@ -20,11 +20,13 @@ package org.apache.cassandra.gms;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.tcm.ClusterMetadataService;
public class GossipVerbHandler<T> implements IVerbHandler<T>
{
public void doVerb(Message<T> message)
{
Gossiper.instance.setLastProcessedMessageAt(message.creationTimeMillis());
+
ClusterMetadataService.instance().fetchLogFromPeerAsync(message.from(),
message.epoch());
}
}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index caf302fbd7..0578326b0e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -446,7 +446,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
private boolean isSurveyMode = TEST_WRITE_SURVEY.getBoolean(false);
/* true if node is rebuilding and receiving data */
private volatile boolean initialized = false;
- private final AtomicBoolean authSetupCalled = new AtomicBoolean(false);
+ private final AtomicBoolean authSetupCalled = new
AtomicBoolean(CassandraRelevantProperties.SKIP_AUTH_SETUP.getBoolean());
private volatile boolean authSetupComplete = false;
/* the probability for tracing any particular request, 0 disables tracing
and 1 enables for all */
@@ -1093,12 +1093,17 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
InProgressSequences.finishInProgressSequences(id);
}
+ void doAuthSetup()
+ {
+ doAuthSetup(true);
+ }
+
@VisibleForTesting
- public void doAuthSetup()
+ public void doAuthSetup(boolean async)
{
if (!authSetupCalled.getAndSet(true))
{
- DatabaseDescriptor.getRoleManager().setup();
+ DatabaseDescriptor.getRoleManager().setup(async);
DatabaseDescriptor.getAuthenticator().setup();
DatabaseDescriptor.getAuthorizer().setup();
DatabaseDescriptor.getNetworkAuthorizer().setup();
diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
index f65c03d830..25456dbbce 100644
--- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
+++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
@@ -18,23 +18,11 @@
package org.apache.cassandra.tcm;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
+import java.util.function.Supplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
-import org.apache.cassandra.tcm.log.LogState;
-import org.apache.cassandra.utils.ExecutorUtils;
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.Closeable;
import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.Promise;
/**
* When debouncing from a replica we know exactly which epoch we need, so to
avoid retries we
@@ -42,86 +30,66 @@ import org.apache.cassandra.utils.concurrent.Promise;
* comes in, we create a new future. If a request for a newer epoch comes in,
we simply
* swap out the current future reference for a new one which is requesting the
newer epoch.
*/
-public class EpochAwareDebounce
+public class EpochAwareDebounce implements Closeable
{
- private static final Logger logger =
LoggerFactory.getLogger(EpochAwareDebounce.class);
public static final EpochAwareDebounce instance = new EpochAwareDebounce();
- private final AtomicReference<EpochAwareAsyncPromise> currentFuture = new
AtomicReference<>();
- private final ExecutorPlus executor;
- private final List<Promise<LogState>> inflightRequests = new
CopyOnWriteArrayList<>();
+
+ private final AtomicReference<EpochAwareFuture> currentFuture = new
AtomicReference<>();
private EpochAwareDebounce()
{
- // 2 threads since we might start a new debounce for a newer epoch
while the old one is executing
- this.executor =
ExecutorFactory.Global.executorFactory().pooled("debounce", 2);
}
/**
* Deduplicate requests to catch up log state based on the desired epoch.
Callers supply a target epoch and
* a function obtain the ClusterMetadata that corresponds with it. It is
expected that this function will make rpc
* calls to peers, retrieving a LogState which can be applied locally to
produce the necessary {@code
- * ClusterMetadata}. The function takes a {@code Promise<LogState>} as
input, with the expectation that this
- * specific instance will be used to provide blocking behaviour when
making the rpc calls that fetch the {@code
- * LogState}. These promises are memoized in order to cancel them when
{@link #shutdownAndWait(long, TimeUnit)} is
- * called. This causes the fetch function to stop waiting on any in flight
{@code LogState} requests and prevents
- * shutdown from being blocked.
+ * ClusterMetadata}.
*
- * @param fetchFunction executes the request for LogState. It's expected
that this popluates fetchResult with the
- * successful result.
+ * @param fetchFunction supplies the future that, when dereferenced, will
yield metadata for the desired epoch
* @param epoch the desired epoch
* @return
*/
- public Future<ClusterMetadata> getAsync(Function<Promise<LogState>,
ClusterMetadata> fetchFunction,
- Epoch epoch)
+ public Future<ClusterMetadata> getAsync(Supplier<Future<ClusterMetadata>>
fetchFunction, Epoch epoch)
{
while (true)
{
- EpochAwareAsyncPromise running = currentFuture.get();
- if (running != null && !running.isDone() &&
running.epoch.isEqualOrAfter(epoch))
- return running;
+ EpochAwareFuture running = currentFuture.get();
+ // Someone else is about to install a new future
+ if (running == SENTINEL)
+ continue;
- Promise<LogState> fetchResult = new AsyncPromise<>();
+ if (running != null && !running.future.isDone() &&
running.epoch.isEqualOrAfter(epoch))
+ return running.future;
- EpochAwareAsyncPromise promise = new EpochAwareAsyncPromise(epoch);
- if (currentFuture.compareAndSet(running, promise))
+ if (currentFuture.compareAndSet(running, SENTINEL))
{
- fetchResult.addCallback((logState, error) -> {
- logger.debug("Removing future remotely requesting epoch {}
from in flight list", epoch);
- inflightRequests.remove(fetchResult);
- });
- inflightRequests.add(fetchResult);
-
- executor.submit(() -> {
- try
- {
- promise.setSuccess(fetchFunction.apply(fetchResult));
- }
- catch (Throwable t)
- {
- fetchResult.cancel(true);
- inflightRequests.remove(fetchResult);
- promise.setFailure(t);
- }
- });
- return promise;
+ EpochAwareFuture promise = new EpochAwareFuture(epoch,
fetchFunction.get());
+ boolean res = currentFuture.compareAndSet(SENTINEL, promise);
+ assert res : "Should not have happened";
+ return promise.future;
}
}
}
- private static class EpochAwareAsyncPromise extends
AsyncPromise<ClusterMetadata>
+ private static final EpochAwareFuture SENTINEL = new
EpochAwareFuture(Epoch.EMPTY, null);
+
+ @Override
+ public void close()
+ {
+ EpochAwareFuture future = currentFuture.get();
+ if (future != null && future != SENTINEL)
+ future.future.cancel(true);
+ }
+
+ private static class EpochAwareFuture
{
private final Epoch epoch;
- public EpochAwareAsyncPromise(Epoch epoch)
+ private final Future<ClusterMetadata> future;
+ public EpochAwareFuture(Epoch epoch, Future<ClusterMetadata> future)
{
this.epoch = epoch;
+ this.future = future;
}
}
-
- public void shutdownAndWait(long timeout, TimeUnit unit) throws
InterruptedException, TimeoutException
- {
- logger.info("Cancelling {} in flight log fetch requests",
inflightRequests.size());
- for (Promise<LogState> toCancel : inflightRequests)
- toCancel.cancel(true);
- ExecutorUtils.shutdownAndWait(timeout, unit, executor);
- }
}
diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
index 61cbc63eac..3564ab93f7 100644
--- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
+++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
@@ -22,7 +22,6 @@ import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +34,7 @@ import org.apache.cassandra.net.Verb;
import org.apache.cassandra.tcm.log.LocalLog;
import org.apache.cassandra.tcm.log.LogState;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.Promise;
@@ -74,40 +74,54 @@ public class PeerLogFetcher
public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote,
Epoch awaitAtleast)
{
- Function<Promise<LogState>, ClusterMetadata> fn = promise ->
fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast);
- return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast);
+ return EpochAwareDebounce.instance.getAsync(() ->
fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast);
}
- private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState>
remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast)
+ private Future<ClusterMetadata>
fetchLogEntriesAndWaitInternal(InetAddressAndPort remote, Epoch awaitAtleast)
{
Epoch before = ClusterMetadata.current().epoch;
if (before.isEqualOrAfter(awaitAtleast))
- return ClusterMetadata.current();
+ {
+ Promise<ClusterMetadata> res = new AsyncPromise<>();
+ res.setSuccess(ClusterMetadata.current());
+ return res;
+ }
+ Promise<LogState> fetchRes = new AsyncPromise<>();
logger.info("Fetching log from {}, at least {}", remote, awaitAtleast);
-
try (Timer.Context ctx =
TCMMetrics.instance.fetchPeerLogLatency.time())
{
- RemoteProcessor.sendWithCallbackAsync(remoteRequest,
+ RemoteProcessor.sendWithCallbackAsync(fetchRes,
Verb.TCM_FETCH_PEER_LOG_REQ,
new FetchPeerLog(before),
- new
RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
+ new
RemoteProcessor.CandidateIterator(Collections.singletonList(remote), false),
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
new
Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
- LogState logState = remoteRequest.awaitUninterruptibly().get();
- log.append(logState);
- ClusterMetadata fetched = log.waitForHighestConsecutive();
- if (fetched.epoch.isEqualOrAfter(awaitAtleast))
- {
- TCMMetrics.instance.peerLogEntriesFetched(before,
logState.latestEpoch());
- return fetched;
- }
+
+ return fetchRes.map((logState) -> {
+ log.append(logState);
+ ClusterMetadata fetched = log.waitForHighestConsecutive();
+ if (fetched.epoch.isEqualOrAfter(awaitAtleast))
+ {
+ TCMMetrics.instance.peerLogEntriesFetched(before,
logState.latestEpoch());
+ return fetched;
+ }
+ else
+ {
+ throw new IllegalStateException(String.format("Queried for
epoch %s, but could not catch up", awaitAtleast));
+ }
+ });
+
}
catch (Throwable t)
{
+ fetchRes.cancel(true);
JVMStabilityInspector.inspectThrowable(t);
+
logger.warn("Unable to fetch log entries from " + remote, t);
+ Promise<ClusterMetadata> res = new AsyncPromise<>();
+ res.setFailure(new IllegalStateException("Unable to fetch log
entries from " + remote, t));
+ return res;
}
- return ClusterMetadata.current();
}
}
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index a647d4d8b0..c267d140d4 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
@@ -126,17 +125,16 @@ public final class RemoteProcessor implements Processor
@Override
public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline
retryPolicy)
{
- Function<Promise<LogState>, ClusterMetadata> fetchFunction =
- promise -> fetchLogAndWaitInternal(promise,
- new
CandidateIterator(candidates(true), false),
- log);
// Synchonous, non-debounced call if we are waiting for the highest
epoch (without knowing/caring what it is).
// Should be used sparingly.
if (waitFor == null)
- return fetchFunction.apply(new AsyncPromise<>());
+ return fetchLogAndWait(new CandidateIterator(candidates(true),
false), log);
try
{
+ Supplier<Future<ClusterMetadata>> fetchFunction = () ->
fetchLogAndWaitInternal(new CandidateIterator(candidates(true), false),
+
log);
+
Future<ClusterMetadata> cmFuture =
EpochAwareDebounce.instance.getAsync(fetchFunction, waitFor);
return cmFuture.get(retryPolicy.remainingNanos(),
TimeUnit.NANOSECONDS);
}
@@ -152,34 +150,37 @@ public final class RemoteProcessor implements Processor
public static ClusterMetadata fetchLogAndWait(CandidateIterator
candidateIterator, LocalLog log)
{
- Promise<LogState> remoteRequest = new AsyncPromise<>();
- return fetchLogAndWaitInternal(remoteRequest, candidateIterator, log);
+ try
+ {
+ return fetchLogAndWaitInternal(candidateIterator,
log).awaitUninterruptibly().get();
+ }
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- private static ClusterMetadata fetchLogAndWaitInternal(Promise<LogState>
remoteRequest,
- CandidateIterator
candidates,
- LocalLog log)
+ private static Future<ClusterMetadata>
fetchLogAndWaitInternal(CandidateIterator candidates,
+ LocalLog
log)
{
try (Timer.Context ctx = TCMMetrics.instance.fetchCMSLogLatency.time())
{
+ Promise<LogState> remoteRequest = new AsyncPromise<>();
Epoch currentEpoch = log.metadata().epoch;
sendWithCallbackAsync(remoteRequest,
Verb.TCM_FETCH_CMS_LOG_REQ,
new FetchCMSLog(currentEpoch,
ClusterMetadataService.state() == REMOTE),
candidates,
new
Retry.Backoff(TCMMetrics.instance.fetchLogRetries));
- LogState replay = remoteRequest.awaitUninterruptibly().get();
- if (!replay.isEmpty())
- {
- logger.info("Replay request returned replay data: {}", replay);
- log.append(replay);
- TCMMetrics.instance.cmsLogEntriesFetched(currentEpoch,
replay.latestEpoch());
- }
- return log.waitForHighestConsecutive();
- }
- catch (InterruptedException | ExecutionException e)
- {
- throw new RuntimeException(e);
+ return remoteRequest.map((replay) -> {
+ if (!replay.isEmpty())
+ {
+ logger.info("Replay request returned replay data: {}",
replay);
+ log.append(replay);
+ TCMMetrics.instance.cmsLogEntriesFetched(currentEpoch,
replay.latestEpoch());
+ }
+ return log.waitForHighestConsecutive();
+ });
}
}
@@ -206,6 +207,8 @@ public final class RemoteProcessor implements Processor
{
if (promise.isCancelled() || promise.isDone())
return;
+ if (Thread.currentThread().isInterrupted())
+ promise.setFailure(new InterruptedException());
if (!candidates.hasNext())
promise.tryFailure(new
IllegalStateException(String.format("Ran out of candidates while sending %s:
%s", verb, candidates)));
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 4979f7f113..2c3e446d3c 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -1095,8 +1095,15 @@ public abstract class AbstractCluster<I extends
IInstance> implements ICluster<I
.filter(i -> !i.isShutdown())
.map(IInstance::shutdown)
.collect(Collectors.toList());
- FBUtilities.waitOnFutures(futures,1L, TimeUnit.MINUTES);
-
+ try
+ {
+ FBUtilities.waitOnFutures(futures, 1L, TimeUnit.MINUTES);
+ }
+ catch (Throwable t)
+ {
+ checkForThreadLeaks();
+ throw t;
+ }
instances.clear();
instanceMap.clear();
PathUtils.setDeletionListener(ignore -> {});
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index e53f7c724a..ed0497180d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -849,7 +849,7 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
public void postStartup()
{
sync(() ->
- StorageService.instance.doAuthSetup()
+ StorageService.instance.doAuthSetup(false)
).run();
}
@@ -940,8 +940,8 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
() -> SSTableReader.shutdownBlocking(1L,
MINUTES),
() ->
shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
() ->
ActiveRepairService.instance().shutdownNowAndWait(1L, MINUTES),
- () -> SnapshotManager.shutdownAndWait(1L,
MINUTES),
- () ->
EpochAwareDebounce.instance.shutdownAndWait(1L, MINUTES)
+ () -> EpochAwareDebounce.instance.close(),
+ () -> SnapshotManager.shutdownAndWait(1L,
MINUTES)
);
internodeMessagingStarted = false;
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index bdcf3fad5a..e66a85a3f2 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@ -262,6 +262,7 @@ public class GossipTest extends TestBaseImpl
stopUnchecked(toRemove);
replaceHostAndStart(cluster, toRemove);
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); //
wait a few gossip rounds
+ ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1), 4);
cluster.get(2).runOnInstance(() ->
assertFalse(Gossiper.instance.endpointStateMap.containsKey(InetAddressAndPort.getByNameUnchecked(node4))));
cluster.get(3).runOnInstance(() ->
assertFalse(Gossiper.instance.endpointStateMap.containsKey(InetAddressAndPort.getByNameUnchecked(node4))));
cluster.get(3).nodetoolResult("disablegossip").asserts().success();
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
index 3ebed55ef4..45f028929e 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
@@ -56,14 +56,17 @@ public class HintedHandoffAddRemoveNodesTest extends
TestBaseImpl
@Test
public void shouldAvoidHintTransferOnDecommission() throws Exception
{
- try (Cluster cluster = init(builder().withNodes(3)
+ // This test was written with expectation that auth table is going to
be empty. Since auth setup became syncrhonous for tests
+ // we need to skip it now, since otherwise streaming will fail.
+ try (WithProperties properties = new
WithProperties().set(CassandraRelevantProperties.SKIP_AUTH_SETUP, "true");
+ Cluster cluster = init(builder().withNodes(3)
.withConfig(config ->
config.set("transfer_hints_on_decommission", false)
.set("progress_barrier_timeout", "1000ms")
.set("progress_barrier_backoff", "100ms")
//
Just to make test pass faster
.set("progress_barrier_min_consistency_level", NODE_LOCAL)
.set("progress_barrier_default_consistency_level", NODE_LOCAL)
-
.with(GOSSIP))
+
.with(GOSSIP, NETWORK))
.start()))
{
cluster.schemaChange(withKeyspace("CREATE TABLE
%s.decom_no_hints_test (key int PRIMARY KEY, value int)"));
@@ -83,6 +86,7 @@ public class HintedHandoffAddRemoveNodesTest extends
TestBaseImpl
assertThat(hintsAfterShutdown).isEqualTo(1);
cluster.get(2).runOnInstance(() ->
setProgressBarrierMinConsistencyLevel(org.apache.cassandra.db.ConsistencyLevel.ONE));
+ ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1), 3);
cluster.get(2).nodetoolResult("decommission",
"--force").asserts().success();
long hintsDeliveredByDecom = countHintsDelivered(cluster.get(2));
String mode = cluster.get(2).callOnInstance(() ->
StorageService.instance.getOperationMode());
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
index 3e8cb9c297..466dab1a8d 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
@@ -18,7 +18,9 @@
package org.apache.cassandra.distributed.test.log;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
import org.junit.Test;
@@ -29,8 +31,6 @@ import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.tcm.sequences.AddToCMS;
-import static org.junit.Assert.assertTrue;
-
public class CMSCatchupTest extends TestBaseImpl
{
@Test
@@ -47,24 +47,31 @@ public class CMSCatchupTest extends TestBaseImpl
cluster.filters().inbound().from(1).to(2).drop();
cluster.filters().inbound().from(3).to(2).drop();
AtomicInteger fetchedFromPeer = new AtomicInteger();
+
cluster.filters().inbound().from(2).to(4).verbs(Verb.GOSSIP_DIGEST_ACK.id,
Verb.GOSSIP_DIGEST_SYN.id, Verb.GOSSIP_DIGEST_ACK2.id).drop();
cluster.filters().inbound().from(2).to(4).messagesMatching((from,
to, msg) -> {
if (msg.verb() == Verb.TCM_FETCH_PEER_LOG_REQ.id)
fetchedFromPeer.getAndIncrement();
return false;
- }).drop().on();
+ }).drop();
- long mark = cluster.get(4).logs().mark();
+ int before = fetchedFromPeer.get();
cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl
with comment='test 123'"), ConsistencyLevel.ONE);
- cluster.get(4).logs().watchFor(mark, "AlterOptions");
-
- mark = cluster.get(2).logs().mark();
cluster.get(1).shutdown().get();
- cluster.get(2).logs().watchFor(mark, "/127.0.0.1:7012 state jump
to shutdown");
+
// node2, a CMS member, is now behind and node1 is shut down.
- // Try reading at QUORUM from node4, node2 should detect it's
behind and catch up from node4
- int before = fetchedFromPeer.get();
+ // Try reading at QUORUM from node4, node2 should detect it's
behind and catch up from node4, if it has not by now.
cluster.coordinator(4).execute(withKeyspace("select * from %s.tbl
where id = 55"), ConsistencyLevel.QUORUM);
- assertTrue(fetchedFromPeer.get() > before);
+
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
+ BooleanSupplier condition = () -> fetchedFromPeer.get() > before;
+ while (true)
+ {
+ if (System.nanoTime() > deadline)
+ throw new AssertionError("Condition did not trigger before
the deadline");
+
+ if (condition.getAsBoolean())
+ return;
+ }
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
index 6996f0c190..70f31181bb 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
@@ -21,8 +21,10 @@ package org.apache.cassandra.distributed.test.log;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
@@ -79,6 +81,7 @@ public class FetchLogFromPeersTest extends TestBaseImpl
cluster.schemaChange(withKeyspace("alter keyspace %s with
replication = {'class':'SimpleStrategy', 'replication_factor':3}"));
cluster.schemaChange(withKeyspace("create table %s.tbl (id int
primary key)"));
+ // Create divergence between node 1 and 2
cluster.filters().inbound().from(1).to(2).drop();
IInstanceConfig config = cluster.newInstanceConfig();
@@ -87,14 +90,31 @@ public class FetchLogFromPeersTest extends TestBaseImpl
cluster.get(1).shutdown().get();
+ ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(3), 1, 2);
+ Assert.assertEquals(2,
+ cluster.stream().filter(i -> i.config().num()
!= 1).map(i -> {
+ return i.callOnInstance(() -> {
+ return
ClusterMetadata.current().epoch.getEpoch();
+ });
+ }).collect(Collectors.toSet()).size());
+
// node2 is behind, writing to it will cause a failure, but it
will then catch up
try
{
cluster.coordinator(2).execute(withKeyspace("insert into
%s.tbl (id) values (3)"), ConsistencyLevel.QUORUM);
fail("writing should fail");
}
- catch (Exception ignored) {}
-
+ catch (Exception writeTimeout)
+ {
+
Assert.assertTrue(writeTimeout.getMessage().contains("Operation timed out"));
+ }
+ ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(3), 1);
+ Assert.assertEquals(1,
+ cluster.stream().filter(i -> i.config().num()
!= 1).map(i -> {
+ return i.callOnInstance(() -> {
+ return
ClusterMetadata.current().epoch.getEpoch();
+ });
+ }).collect(Collectors.toSet()).size());
cluster.coordinator(2).execute(withKeyspace("insert into %s.tbl
(id) values (3)"), ConsistencyLevel.QUORUM);
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
index 5ca7bd9f5d..9826d013d5 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.tcm.ClusterMetadata;
@@ -133,7 +134,7 @@ public class DecommissionTest extends TestBaseImpl
@Test
public void testMixedVersionBlockDecom() throws IOException {
try (Cluster cluster = builder().withNodes(3)
- .withConfig(config ->
config.with(GOSSIP))
+ .withConfig(config ->
config.with(GOSSIP, NETWORK))
.start())
{
cluster.get(3).nodetoolResult("decommission",
"--force").asserts().success();
@@ -157,7 +158,7 @@ public class DecommissionTest extends TestBaseImpl
new
NodeVersion(new CassandraVersion("6.0.0"),
NodeVersion.CURRENT_METADATA_VERSION)));
});
-
+ ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1), 3);
NodeToolResult res = cluster.get(2).nodetoolResult("decommission",
"--force");
res.asserts().failure();
assertTrue(res.getStdout().contains("Upgrade in progress"));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]