This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 45ad38f On forced shutdown, terminate all repair sessions.
45ad38f is described below
commit 45ad38fb5aec76418589c07d88fd0ca27fb430f4
Author: Swen Fuhrmann <[email protected]>
AuthorDate: Wed Sep 9 14:19:54 2020 +0200
On forced shutdown, terminate all repair sessions.
Patch by Swen Fuhrmann, reviewed by Alexander Dejanovski and
brandonwilliams for CASSANDRA-15902
---
CHANGES.txt | 1 +
.../org/apache/cassandra/repair/RepairSession.java | 15 +++++
.../org/apache/cassandra/repair/RepairJobTest.java | 76 +++++++++++++++++++++-
3 files changed, 89 insertions(+), 3 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index dfb4c4a..0f016d0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.23:
+ * Fix OOM when terminating repair session (CASSANDRA-15902)
* Avoid marking shutting down nodes as up after receiving gossip shutdown
message (CASSANDRA-16094)
* Check SSTables for latest version before dropping compact storage
(CASSANDRA-16063)
* Handle unexpected columns due to schema races (CASSANDRA-15899)
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java
b/src/java/org/apache/cassandra/repair/RepairSession.java
index ac8e0a9..d210ca7 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -310,6 +310,21 @@ public class RepairSession extends
AbstractFuture<RepairSessionResult> implement
public void forceShutdown(Throwable reason)
{
setException(reason);
+
+ // Ensure that all outstandig futures are cancled.
+ // Otherwise, when task executer will be shutdown later in this
method, the thread of the repair job will
+ // wait forever on the outstanding futures. If that happen the repair
thread won't be finished and won't release the memory.
+
+ for (ValidationTask validationTask: validating.values())
+ {
+ validationTask.cancel(true);
+ }
+
+ for (RemoteSyncTask syncTask: syncingTasks.values())
+ {
+ syncTask.cancel(true);
+ }
+
taskExecutor.shutdownNow();
terminate();
}
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 2f77a34..5269182 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -30,12 +30,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -64,6 +66,7 @@ import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.UUIDGen;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class RepairJobTest extends SchemaLoader
@@ -81,26 +84,54 @@ public class RepairJobTest extends SchemaLoader
private static InetAddress addr2;
private static InetAddress addr3;
private static InetAddress addr4;
- private RepairSession session;
+ private MeasureableRepairSession session;
private RepairJob job;
private RepairJobDesc sessionJobDesc;
- // So that threads actually get recycled and we can have accurate memory
accounting while testing
- // memory retention from CASSANDRA-14096
private static class MeasureableRepairSession extends RepairSession
{
+ private final CountDownLatch validationCompleteReached = new
CountDownLatch(1);
+
+ private volatile boolean simulateValidationsOutstanding;
+
public MeasureableRepairSession(UUID parentRepairSession, UUID id,
Collection<Range<Token>> ranges, String keyspace,
RepairParallelism parallelismDegree,
Set<InetAddress> endpoints, long repairedAt, String... cfnames)
{
super(parentRepairSession, id, ranges, keyspace,
parallelismDegree, endpoints, repairedAt, cfnames);
}
+ // So that threads actually get recycled and we can have accurate
memory accounting while testing
+ // memory retention from CASSANDRA-14096
protected DebuggableThreadPoolExecutor createExecutor()
{
DebuggableThreadPoolExecutor executor = super.createExecutor();
executor.setKeepAliveTime(THREAD_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS);
return executor;
}
+
+ void simulateValidationsOutstanding()
+ {
+ simulateValidationsOutstanding = true;
+ }
+
+ @Override
+ public void validationComplete(RepairJobDesc desc, InetAddress
endpoint, MerkleTrees trees)
+ {
+ validationCompleteReached.countDown();
+
+ // Do not delegate the validation complete to parent to simulate
that the call is still outstanding
+ if (simulateValidationsOutstanding)
+ {
+ return;
+ }
+ super.validationComplete(desc, endpoint, trees);
+ }
+
+ void waitUntilReceivedFirstValidationComplete()
+ {
+ boolean isFirstValidationCompleteReceived =
Uninterruptibles.awaitUninterruptibly(validationCompleteReached,
TEST_TIMEOUT_S, TimeUnit.SECONDS);
+ assertTrue("First validation completed",
isFirstValidationCompleteReceived);
+ }
}
@BeforeClass
@@ -246,6 +277,45 @@ public class RepairJobTest extends SchemaLoader
assertEquals(2, numDifferent);
}
+ /**
+ * CASSANDRA-15902: Verify that repair job will be released after force
shutdown on the session
+ */
+ @Test
+ public void releaseThreadAfterSessionForceShutdown() throws Throwable
+ {
+ Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>();
+ mockTrees.put(FBUtilities.getBroadcastAddress(),
createInitialTree(false));
+ mockTrees.put(addr2, createInitialTree(false));
+ mockTrees.put(addr3, createInitialTree(false));
+
+ List<MessageOut> observedMessages = new ArrayList<>();
+ interceptRepairMessages(mockTrees, observedMessages);
+
+ session.simulateValidationsOutstanding();
+
+ Thread jobThread = new Thread(() -> job.run());
+ jobThread.start();
+
+ session.waitUntilReceivedFirstValidationComplete();
+
+ session.forceShutdown(new Exception("force shutdown for testing"));
+
+ jobThread.join(TimeUnit.SECONDS.toMillis(TEST_TIMEOUT_S));
+ assertFalse("expect that the job thread has been finished and not
waiting on the outstanding validations forever", jobThread.isAlive());
+
+ // RepairJob should send out 3 x SNAPSHOTS -> 1 x VALIDATION -> done
+ // Only one VALIDATION because we shutdown the session after first
validation
+ List<RepairMessage.Type> expectedTypes = new ArrayList<>();
+ for (int i = 0; i < 3; i++)
+ expectedTypes.add(RepairMessage.Type.SNAPSHOT);
+
+ expectedTypes.add(RepairMessage.Type.VALIDATION_REQUEST);
+
+ assertEquals(expectedTypes, observedMessages.stream()
+ .map(k -> ((RepairMessage)
k.payload).messageType)
+
.collect(Collectors.toList()));
+ }
+
private void assertExpectedDifferences(Collection<RemoteSyncTask> tasks,
Integer ... differences)
{
List<Integer> expectedDifferences = new
ArrayList<>(Arrays.asList(differences));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]