This is an automated email from the ASF dual-hosted git repository.
jonmeredith pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new bde4fa0013 Waiting indefinitely on ReceivedMessage response in
StreamSession#receive() can cause deadlock
bde4fa0013 is described below
commit bde4fa0013eb8cec5b1d88b21ca4463bc07272bb
Author: Jon Meredith <[email protected]>
AuthorDate: Mon Aug 28 16:10:41 2023 -0600
Waiting indefinitely on ReceivedMessage response in StreamSession#receive()
can cause deadlock
patch by Jon Meredith; reviewed by Caleb Rackliffe, David Capwell for
CASSANDRA-18733
---
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamSession.java | 64 ++++++-----
.../async/StreamingMultiplexedChannel.java | 8 ++
.../StreamDisconnectedWhileReceivingTest.java | 117 +++++++++++++++++++++
4 files changed, 163 insertions(+), 27 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 21ca2abec7..b52bcfd634 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1.4
+ * Waiting indefinitely on ReceivedMessage response in StreamSession#receive()
can cause deadlock (CASSANDRA-18733)
* Allow empty keystore_password in encryption_options (CASSANDRA-18778)
* Skip ColumnFamilyStore#topPartitions initialization when client or tool
mode (CASSANDRA-18697)
Merged from 4.0:
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 811717f85d..7b07a0d179 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -189,6 +189,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
// receiving peer's CompleteMessage.
private boolean maybeCompleted = false;
private Future<?> closeFuture;
+ private final Object closeFutureLock = new Object();
private final TimeUUID pendingRepair;
private final PreviewKind previewKind;
@@ -496,35 +497,43 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
}
}
- private synchronized Future<?> closeSession(State finalState)
+ private Future<?> closeSession(State finalState)
{
- // it's session is already closed
- if (closeFuture != null)
- return closeFuture;
-
- state(finalState);
-
- List<Future<?>> futures = new ArrayList<>();
-
- // ensure aborting the tasks do not happen on the network IO thread
(read: netty event loop)
- // as we don't want any blocking disk IO to stop the network thread
- if (finalState == State.FAILED || finalState == State.ABORTED)
-
futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
-
- // Channels should only be closed by the initiator; but, if this
session closed
- // due to failure, channels should be always closed regardless, even
if this is not the initator.
- if (!isFollower || state != State.COMPLETE)
+ // Keep a separate lock on the closeFuture so that we create it once
and only once.
+ // Cannot use the StreamSession monitor here as
StreamDeserializingTask/StreamSession.messageReceived
+ // holds it while calling syncUninterruptibly on sendMessage which can
trigger a closeSession in
+ // the Netty event loop on error and cause a deadlock.
+ synchronized (closeFutureLock)
{
- logger.debug("[Stream #{}] Will close attached inbound {} and
outbound {} channels", planId(), inbound, outbound);
- inbound.values().forEach(channel -> futures.add(channel.close()));
- outbound.values().forEach(channel -> futures.add(channel.close()));
- }
-
- sink.onClose(peer);
- streamResult.handleSessionComplete(this);
- closeFuture = FutureCombiner.allOf(futures);
+ if (closeFuture != null)
+ return closeFuture;
+
+ closeFuture = ScheduledExecutors.nonPeriodicTasks.submit(() -> {
+ synchronized (this) {
+ state(finalState);
+
+ sink.onClose(peer);
+ streamResult.handleSessionComplete(this);
+ }}).flatMap(ignore -> {
+ List<Future<?>> futures = new ArrayList<>();
+ // ensure aborting the tasks do not happen on the network
IO thread (read: netty event loop)
+ // as we don't want any blocking disk IO to stop the
network thread
+ if (finalState == State.FAILED || finalState ==
State.ABORTED)
+
futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
+
+ // Channels should only be closed by the initiator; but,
if this session closed
+ // due to failure, channels should be always closed
regardless, even if this is not the initator.
+ if (!isFollower || state != State.COMPLETE)
+ {
+ logger.debug("[Stream #{}] Will close attached inbound
{} and outbound {} channels", planId(), inbound, outbound);
+ inbound.values().forEach(channel ->
futures.add(channel.close()));
+ outbound.values().forEach(channel ->
futures.add(channel.close()));
+ }
+ return FutureCombiner.allOf(futures);
+ });
- return closeFuture;
+ return closeFuture;
+ }
}
private void abortTasks()
@@ -982,7 +991,8 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
{
try
{
- state(State.WAIT_COMPLETE);
+ if (state != State.COMPLETE) // mark as waiting to complete while
closeSession futures run.
+ state(State.WAIT_COMPLETE);
closeSession(State.COMPLETE);
}
finally
diff --git
a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
index 560fee9ad2..99e613ee2d 100644
---
a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
+++
b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -518,4 +519,11 @@ public class StreamingMultiplexedChannel
threadToChannelMap.clear();
fileTransferExecutor.shutdownNow();
}
+
+ @VisibleForTesting // For testing only -- close the control handle for
testing streaming exception handling.
+ public void unsafeCloseControlChannel()
+ {
+ logger.warn("Unsafe close of control channel");
+ controlChannel.close().awaitUninterruptibly();
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamDisconnectedWhileReceivingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamDisconnectedWhileReceivingTest.java
new file mode 100644
index 0000000000..c2c1611729
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamDisconnectedWhileReceivingTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static
org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM;
+import static org.junit.Assert.assertFalse;
+
+/** Demonstrate deadlock if the netty event loop thread calls
StreamSession.closeSession while
+ * the object monitor is being held by another thread.
+*/
+public class StreamDisconnectedWhileReceivingTest extends TestBaseImpl
+{
+ static final Logger logger =
LoggerFactory.getLogger(StreamDisconnectedWhileReceivingTest.class);
+
+ @Test
+ public void zeroCopy() throws IOException, InterruptedException
+ {
+ disconnectControlChannel(true);
+ }
+
+ @Test
+ public void notZeroCopy() throws IOException, InterruptedException
+ {
+ disconnectControlChannel(false);
+ }
+
+ private void disconnectControlChannel(boolean zeroCopyStreaming) throws
IOException, InterruptedException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withInstanceInitializer(BBHelper::install)
+ .withConfig(c -> c.with(Feature.values())
+
.set("stream_entire_sstables", zeroCopyStreaming)
+
.set("autocompaction_on_startup_enabled", false))
+ .start())
+ {
+ init(cluster);
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY)"));
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+
+ for (int i = 1; i <= 100; i++)
+ node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk)
VALUES (?)"), i);
+ node1.flush(KEYSPACE);
+ Thread nodetoolRepair = new Thread(() -> {
+ node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl");
+ });
+ nodetoolRepair.start();
+
+ nodetoolRepair.join(15000);
+ assertFalse("Repair did not complete - assuming deadlock",
nodetoolRepair.isAlive());
+
+ // if deadlock occurs, instance shutdown will hit timeout.
+ }
+ }
+
+ public static class BBHelper
+ {
+ static Logger logger = LoggerFactory.getLogger(BBHelper.class);
+
+ public static void receive(IncomingStreamMessage message, @SuperCall
Callable<Void> zuper) throws Throwable
+ {
+ logger.info("receive message {}", message.type);
+ if (message.type == STREAM)
+
message.stream.session().getChannel().unsafeCloseControlChannel();
+ zuper.call();
+ }
+
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ if (num != 2) // only target the second instance
+ return;
+
+ new ByteBuddy().rebase(StreamSession.class)
+ .method(named("receive").and(takesArguments(1)))
+ .intercept(MethodDelegation.to(BBHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]