This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new dc45bb5876 Allow threads waiting for the log follower to be interrupted
dc45bb5876 is described below
commit dc45bb5876aafa2ce7dcfe6a3b7de0f6a9a35fda
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Jul 11 19:40:55 2024 +0100
Allow threads waiting for the log follower to be interrupted
Patch by Sam Tunnicliffe and David Capwell; reviewed by Alex Petrov for
CASSANDRA-19761
---
CHANGES.txt | 1 +
.../org/apache/cassandra/tcm/RemoteProcessor.java | 4 +-
.../org/apache/cassandra/tcm/log/LocalLog.java | 22 +++--
.../cassandra/utils/JVMStabilityInspector.java | 3 +
.../distributed/test/tcm/CMSShutdownTest.java | 104 +++++++++++++++++++++
5 files changed, 125 insertions(+), 9 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 72d6e8f1e9..7c6c10cd50 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Allow threads waiting for the metadata log follower to be interrupted
(CASSANDRA-19761)
* Support dictionary lookup for CassandraPasswordValidator (CASSANDRA-19762)
* Disallow denylisting keys in system_cluster_metadata (CASSANDRA-19713)
* Fix gossip status after replacement (CASSANDRA-19712)
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index c267d140d4..79f0b7cf7d 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -152,7 +152,7 @@ public final class RemoteProcessor implements Processor
{
try
{
- return fetchLogAndWaitInternal(candidateIterator,
log).awaitUninterruptibly().get();
+ return fetchLogAndWaitInternal(candidateIterator,
log).await().get();
}
catch (InterruptedException | ExecutionException e)
{
@@ -191,7 +191,7 @@ public final class RemoteProcessor implements Processor
{
Promise<RSP> promise = new AsyncPromise<>();
sendWithCallbackAsync(promise, verb, request, candidates,
retryPolicy);
- return promise.awaitUninterruptibly().get();
+ return promise.await().get();
}
catch (InterruptedException | ExecutionException e)
{
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 0307e49048..a5b744fc17 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -370,6 +370,8 @@ public abstract class LocalLog implements Closeable
*/
public void append(LogState logState)
{
+ if (logState.isEmpty())
+ return;
logger.debug("Appending log state with snapshot to the pending buffer:
{}", logState);
// If we receive a base state (snapshot), we need to construct a
synthetic ForceSnapshot transformation that will serve as
// a base for application of the rest of the entries. If the log state
contains any additional transformations that follow
@@ -403,13 +405,14 @@ public abstract class LocalLog implements Closeable
{
runOnce(null);
}
- catch (InterruptedException | TimeoutException e)
+ catch (TimeoutException e)
{
- throw new RuntimeException("Should not have happened, since we
await uninterruptibly", e);
+ // This should not happen as no duration was specified in the call
to runOnce
+ throw new RuntimeException("Timed out waiting for log follower to
run", e);
}
}
- abstract void runOnce(DurationSpec durationSpec) throws
InterruptedException, TimeoutException;
+ abstract void runOnce(DurationSpec durationSpec) throws TimeoutException;
abstract void processPending();
private Entry peek()
@@ -664,8 +667,11 @@ public abstract class LocalLog implements Closeable
}
@Override
- public void runOnce(DurationSpec duration) throws
InterruptedException, TimeoutException
+ public void runOnce(DurationSpec duration) throws TimeoutException
{
+ if (executor.isTerminated())
+ throw new IllegalStateException("Global log follower has
shutdown");
+
Condition ours = Condition.newOneTimeCondition();
for (int i = 0; i < 2; i++)
{
@@ -678,9 +684,10 @@ public abstract class LocalLog implements Closeable
{
if (duration == null)
{
- current.awaitUninterruptibly();
+
+ current.awaitThrowUncheckedOnInterrupt();
}
- else if
(!current.await(duration.to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS))
+ else if
(!current.awaitThrowUncheckedOnInterrupt(duration.to(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS))
{
throw new TimeoutException(String.format("Timed out
waiting for follower to run at least once. " +
"Pending is
%s and current is now at epoch %s.",
@@ -707,7 +714,7 @@ public abstract class LocalLog implements Closeable
if (runnable.subscriber.compareAndSet(null, ours))
{
runnable.logNotifier.signalAll();
- ours.awaitUninterruptibly();
+ ours.awaitThrowUncheckedOnInterrupt();
return;
}
}
@@ -727,6 +734,7 @@ public abstract class LocalLog implements Closeable
Condition condition = runnable.subscriber.get();
if (condition != null)
condition.signalAll();
+
runnable.logNotifier.signalAll();
try
{
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index a396ef9947..983e75f252 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -141,6 +141,9 @@ public final class JVMStabilityInspector
if (t instanceof InterruptedException)
throw new UncheckedInterruptedException((InterruptedException) t);
+ if (t instanceof UncheckedInterruptedException)
+ throw (UncheckedInterruptedException)t;
+
if (DatabaseDescriptor.getDiskFailurePolicy() ==
Config.DiskFailurePolicy.die)
if (t instanceof FSError || t instanceof CorruptSSTableException)
isUnstable = true;
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSShutdownTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSShutdownTest.java
new file mode 100644
index 0000000000..736f8335e9
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSShutdownTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tcm;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.PaxosBackedProcessor;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.log.Entry;
+import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
+import org.apache.cassandra.utils.Shared;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+public class CMSShutdownTest extends TestBaseImpl
+{
+ @Test
+ public void shutdownCMSCoincidingWithUnsuccessfulCommit() throws Exception
+ {
+ // This test simulates a CMS node attempting to commit an entry to the
log but being unable
+ // to obtain consensus from other CMS members while it is also
shutting down itself.
+ try (Cluster cluster = Cluster.build(2)
+ .withConfig(c ->
c.with(Feature.values()))
+
.withInstanceInitializer(BBHelper::install)
+ .start())
+ {
+ cluster.get(1).runOnInstance(CommitHelper::scheduleCommits);
+ State.latch.await();
+ }
+ }
+
+ private static class CommitHelper
+ {
+ public static void commitTransformations()
+ {
+ // Continuously attempt to commit a log entry, meanwhile counting
down the
+ // latch ensures that every commit will fail as if unable to
obtain consensus
+ // from other CMS members
+ State.latch.countDown();
+ for (; ;)
+
ClusterMetadataService.instance().commit(TriggerSnapshot.instance);
+ }
+
+ public static void scheduleCommits()
+ {
+ Stage.MISC.execute(CommitHelper::commitTransformations);
+ }
+ }
+
+ @Shared
+ public static class State
+ {
+ public static final CountDownLatch latch = new CountDownLatch(1);
+ }
+
+ public static class BBHelper
+ {
+ static void install(ClassLoader cl, int node)
+ {
+ if (node != 1) return;
+ new ByteBuddy().rebase(PaxosBackedProcessor.class)
+ .method(named("tryCommitOne"))
+ .intercept(MethodDelegation.to(BBHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static boolean tryCommitOne(Entry.Id entryId, Transformation
transform, Epoch previousEpoch, Epoch nextEpoch, @SuperCall Callable<Boolean>
call) throws Exception
+ {
+ if (State.latch.getCount() == 1)
+ return call.call();
+ return false;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]