This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new bde4fa0013 Waiting indefinitely on ReceivedMessage response in 
StreamSession#receive() can cause deadlock
bde4fa0013 is described below

commit bde4fa0013eb8cec5b1d88b21ca4463bc07272bb
Author: Jon Meredith <[email protected]>
AuthorDate: Mon Aug 28 16:10:41 2023 -0600

    Waiting indefinitely on ReceivedMessage response in StreamSession#receive() 
can cause deadlock
    
    patch by Jon Meredith; reviewed by Caleb Rackliffe, David Capwell for 
CASSANDRA-18733
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/streaming/StreamSession.java  |  64 ++++++-----
 .../async/StreamingMultiplexedChannel.java         |   8 ++
 .../StreamDisconnectedWhileReceivingTest.java      | 117 +++++++++++++++++++++
 4 files changed, 163 insertions(+), 27 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 21ca2abec7..b52bcfd634 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1.4
+ * Waiting indefinitely on ReceivedMessage response in StreamSession#receive() 
can cause deadlock (CASSANDRA-18733)
  * Allow empty keystore_password in encryption_options (CASSANDRA-18778)
  * Skip ColumnFamilyStore#topPartitions initialization when client or tool 
mode (CASSANDRA-18697)
 Merged from 4.0:
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 811717f85d..7b07a0d179 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -189,6 +189,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     // receiving peer's CompleteMessage.
     private boolean maybeCompleted = false;
     private Future<?> closeFuture;
+    private final Object closeFutureLock = new Object();
 
     private final TimeUUID pendingRepair;
     private final PreviewKind previewKind;
@@ -496,35 +497,43 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         }
     }
 
-    private synchronized Future<?> closeSession(State finalState)
+    private Future<?> closeSession(State finalState)
     {
-        // it's session is already closed
-        if (closeFuture != null)
-            return closeFuture;
-
-        state(finalState);
-
-        List<Future<?>> futures = new ArrayList<>();
-
-        // ensure aborting the tasks do not happen on the network IO thread 
(read: netty event loop)
-        // as we don't want any blocking disk IO to stop the network thread
-        if (finalState == State.FAILED || finalState == State.ABORTED)
-            
futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
-
-        // Channels should only be closed by the initiator; but, if this 
session closed
-        // due to failure, channels should be always closed regardless, even 
if this is not the initator.
-        if (!isFollower || state != State.COMPLETE)
+        // Keep a separate lock on the closeFuture so that we create it once 
and only once.
+        // Cannot use the StreamSession monitor here as 
StreamDeserializingTask/StreamSession.messageReceived
+        // holds it while calling syncUninterruptibly on sendMessage which can 
trigger a closeSession in
+        // the Netty event loop on error and cause a deadlock.
+        synchronized (closeFutureLock)
         {
-            logger.debug("[Stream #{}] Will close attached inbound {} and 
outbound {} channels", planId(), inbound, outbound);
-            inbound.values().forEach(channel -> futures.add(channel.close()));
-            outbound.values().forEach(channel -> futures.add(channel.close()));
-        }
-
-        sink.onClose(peer);
-        streamResult.handleSessionComplete(this);
-        closeFuture = FutureCombiner.allOf(futures);
+            if (closeFuture != null)
+                return closeFuture;
+
+            closeFuture = ScheduledExecutors.nonPeriodicTasks.submit(() -> {
+                synchronized (this) {
+                    state(finalState);
+
+                    sink.onClose(peer);
+                    streamResult.handleSessionComplete(this);
+                }}).flatMap(ignore -> {
+                    List<Future<?>> futures = new ArrayList<>();
+                    // ensure aborting the tasks do not happen on the network 
IO thread (read: netty event loop)
+                    // as we don't want any blocking disk IO to stop the 
network thread
+                    if (finalState == State.FAILED || finalState == 
State.ABORTED)
+                        
futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
+
+                    // Channels should only be closed by the initiator; but, 
if this session closed
+                    // due to failure, channels should be always closed 
regardless, even if this is not the initator.
+                    if (!isFollower || state != State.COMPLETE)
+                    {
+                        logger.debug("[Stream #{}] Will close attached inbound 
{} and outbound {} channels", planId(), inbound, outbound);
+                        inbound.values().forEach(channel -> 
futures.add(channel.close()));
+                        outbound.values().forEach(channel -> 
futures.add(channel.close()));
+                    }
+                    return FutureCombiner.allOf(futures);
+                });
 
-        return closeFuture;
+            return closeFuture;
+        }
     }
 
     private void abortTasks()
@@ -982,7 +991,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     {
         try
         {
-            state(State.WAIT_COMPLETE);
+            if (state != State.COMPLETE) // mark as waiting to complete while 
closeSession futures run.
+                state(State.WAIT_COMPLETE);
             closeSession(State.COMPLETE);
         }
         finally
diff --git 
a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
 
b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
index 560fee9ad2..99e613ee2d 100644
--- 
a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
+++ 
b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -518,4 +519,11 @@ public class StreamingMultiplexedChannel
         threadToChannelMap.clear();
         fileTransferExecutor.shutdownNow();
     }
+
+    @VisibleForTesting // For testing only -- close the control handle for 
testing streaming exception handling.
+    public void unsafeCloseControlChannel()
+    {
+        logger.warn("Unsafe close of control channel");
+        controlChannel.close().awaitUninterruptibly();
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamDisconnectedWhileReceivingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamDisconnectedWhileReceivingTest.java
new file mode 100644
index 0000000000..c2c1611729
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamDisconnectedWhileReceivingTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM;
+import static org.junit.Assert.assertFalse;
+
+/** Demonstrate deadlock if the netty event loop thread calls 
StreamSession.closeSession while
+ *  the object monitor is being held by another thread.
+*/
+public class StreamDisconnectedWhileReceivingTest extends TestBaseImpl
+{
+    static final Logger logger = 
LoggerFactory.getLogger(StreamDisconnectedWhileReceivingTest.class);
+
+    @Test
+    public void zeroCopy() throws IOException, InterruptedException
+    {
+        disconnectControlChannel(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException, InterruptedException
+    {
+        disconnectControlChannel(false);
+    }
+
+    private void disconnectControlChannel(boolean zeroCopyStreaming) throws 
IOException, InterruptedException
+    {
+        try (Cluster cluster = Cluster.build(2)
+                                      
.withInstanceInitializer(BBHelper::install)
+                                      .withConfig(c -> c.with(Feature.values())
+                                                        
.set("stream_entire_sstables", zeroCopyStreaming)
+                                                        
.set("autocompaction_on_startup_enabled", false))
+                                      .start())
+        {
+            init(cluster);
+
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int 
PRIMARY KEY)"));
+            IInvokableInstance node1 = cluster.get(1);
+            IInvokableInstance node2 = cluster.get(2);
+
+            for (int i = 1; i <= 100; i++)
+                node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) 
VALUES (?)"), i);
+            node1.flush(KEYSPACE);
+            Thread nodetoolRepair = new Thread(() -> {
+                node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl");
+            });
+            nodetoolRepair.start();
+
+            nodetoolRepair.join(15000);
+            assertFalse("Repair did not complete - assuming deadlock", 
nodetoolRepair.isAlive());
+
+            // if deadlock occurs, instance shutdown will hit timeout.
+        }
+    }
+
+    public static class BBHelper
+    {
+        static Logger logger = LoggerFactory.getLogger(BBHelper.class);
+
+        public static void receive(IncomingStreamMessage message, @SuperCall 
Callable<Void> zuper) throws Throwable
+        {
+            logger.info("receive message {}", message.type);
+            if (message.type == STREAM)
+                
message.stream.session().getChannel().unsafeCloseControlChannel();
+            zuper.call();
+        }
+
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            if (num != 2) // only target the second instance
+                return;
+
+            new ByteBuddy().rebase(StreamSession.class)
+                           .method(named("receive").and(takesArguments(1)))
+                           .intercept(MethodDelegation.to(BBHelper.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to