This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 45ad38f  On forced shutdown, terminate all repair sessions.
45ad38f is described below

commit 45ad38fb5aec76418589c07d88fd0ca27fb430f4
Author: Swen Fuhrmann <[email protected]>
AuthorDate: Wed Sep 9 14:19:54 2020 +0200

    On forced shutdown, terminate all repair sessions.
    
    Patch by Swen Fuhrmann, reviewed by Alexander Dejanovski and
    brandonwilliams for CASSANDRA-15902
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/repair/RepairSession.java | 15 +++++
 .../org/apache/cassandra/repair/RepairJobTest.java | 76 +++++++++++++++++++++-
 3 files changed, 89 insertions(+), 3 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index dfb4c4a..0f016d0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.23:
+ * Fix OOM when terminating repair session (CASSANDRA-15902)
  * Avoid marking shutting down nodes as up after receiving gossip shutdown 
message (CASSANDRA-16094)
  * Check SSTables for latest version before dropping compact storage 
(CASSANDRA-16063)
  * Handle unexpected columns due to schema races (CASSANDRA-15899)
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index ac8e0a9..d210ca7 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -310,6 +310,21 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
     public void forceShutdown(Throwable reason)
     {
         setException(reason);
+
+        // Ensure that all outstandig futures are cancled.
+        // Otherwise, when task executer will be shutdown later in this 
method, the thread of the repair job will
+        // wait forever on the outstanding futures. If that happen the repair 
thread won't be finished and won't release the memory.
+
+        for (ValidationTask validationTask: validating.values())
+        {
+            validationTask.cancel(true);
+        }
+
+        for (RemoteSyncTask syncTask: syncingTasks.values())
+        {
+            syncTask.cancel(true);
+        }
+
         taskExecutor.shutdownNow();
         terminate();
     }
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java 
b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 2f77a34..5269182 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -30,12 +30,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -64,6 +66,7 @@ import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.UUIDGen;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class RepairJobTest extends SchemaLoader
@@ -81,26 +84,54 @@ public class RepairJobTest extends SchemaLoader
     private static InetAddress addr2;
     private static InetAddress addr3;
     private static InetAddress addr4;
-    private RepairSession session;
+    private MeasureableRepairSession session;
     private RepairJob job;
     private RepairJobDesc sessionJobDesc;
 
-    // So that threads actually get recycled and we can have accurate memory 
accounting while testing
-    // memory retention from CASSANDRA-14096
     private static class MeasureableRepairSession extends RepairSession
     {
+        private final CountDownLatch validationCompleteReached = new 
CountDownLatch(1);
+
+        private volatile boolean simulateValidationsOutstanding;
+
         public MeasureableRepairSession(UUID parentRepairSession, UUID id, 
Collection<Range<Token>> ranges, String keyspace,
                                         RepairParallelism parallelismDegree, 
Set<InetAddress> endpoints, long repairedAt, String... cfnames)
         {
             super(parentRepairSession, id, ranges, keyspace, 
parallelismDegree, endpoints, repairedAt, cfnames);
         }
 
+        // So that threads actually get recycled and we can have accurate 
memory accounting while testing
+        // memory retention from CASSANDRA-14096
         protected DebuggableThreadPoolExecutor createExecutor()
         {
             DebuggableThreadPoolExecutor executor = super.createExecutor();
             executor.setKeepAliveTime(THREAD_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
             return executor;
         }
+
+        void simulateValidationsOutstanding()
+        {
+            simulateValidationsOutstanding = true;
+        }
+
+        @Override
+        public void validationComplete(RepairJobDesc desc, InetAddress 
endpoint, MerkleTrees trees)
+        {
+            validationCompleteReached.countDown();
+
+            // Do not delegate the validation complete to parent to simulate 
that the call is still outstanding
+            if (simulateValidationsOutstanding)
+            {
+                return;
+            }
+            super.validationComplete(desc, endpoint, trees);
+        }
+
+        void waitUntilReceivedFirstValidationComplete()
+        {
+            boolean isFirstValidationCompleteReceived = 
Uninterruptibles.awaitUninterruptibly(validationCompleteReached, 
TEST_TIMEOUT_S, TimeUnit.SECONDS);
+            assertTrue("First validation completed", 
isFirstValidationCompleteReceived);
+        }
     }
 
     @BeforeClass
@@ -246,6 +277,45 @@ public class RepairJobTest extends SchemaLoader
         assertEquals(2, numDifferent);
     }
 
+    /**
+     * CASSANDRA-15902: Verify that repair job will be released after force 
shutdown on the session
+     */
+    @Test
+    public void releaseThreadAfterSessionForceShutdown() throws Throwable
+    {
+        Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>();
+        mockTrees.put(FBUtilities.getBroadcastAddress(), 
createInitialTree(false));
+        mockTrees.put(addr2, createInitialTree(false));
+        mockTrees.put(addr3, createInitialTree(false));
+
+        List<MessageOut> observedMessages = new ArrayList<>();
+        interceptRepairMessages(mockTrees, observedMessages);
+
+        session.simulateValidationsOutstanding();
+
+        Thread jobThread = new Thread(() -> job.run());
+        jobThread.start();
+
+        session.waitUntilReceivedFirstValidationComplete();
+
+        session.forceShutdown(new Exception("force shutdown for testing"));
+
+        jobThread.join(TimeUnit.SECONDS.toMillis(TEST_TIMEOUT_S));
+        assertFalse("expect that the job thread has been finished and not 
waiting on the outstanding validations forever", jobThread.isAlive());
+
+        // RepairJob should send out 3 x SNAPSHOTS -> 1 x VALIDATION -> done
+        // Only one VALIDATION because we shutdown the session after first 
validation
+        List<RepairMessage.Type> expectedTypes = new ArrayList<>();
+        for (int i = 0; i < 3; i++)
+            expectedTypes.add(RepairMessage.Type.SNAPSHOT);
+
+        expectedTypes.add(RepairMessage.Type.VALIDATION_REQUEST);
+
+        assertEquals(expectedTypes, observedMessages.stream()
+                                                    .map(k -> ((RepairMessage) 
k.payload).messageType)
+                                                    
.collect(Collectors.toList()));
+    }
+    
     private void assertExpectedDifferences(Collection<RemoteSyncTask> tasks, 
Integer ... differences)
     {
         List<Integer> expectedDifferences = new 
ArrayList<>(Arrays.asList(differences));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to