Shutdown compaction in drain to prevent leak patch by yukim; reviewed by marcuse for CASSANDRA-10079
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/17082d4b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/17082d4b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/17082d4b Branch: refs/heads/trunk Commit: 17082d4b54c89fd34f81400e0002fff67c30f150 Parents: 34b8d8f Author: Yuki Morishita <[email protected]> Authored: Wed Sep 2 19:36:37 2015 -0500 Committer: Yuki Morishita <[email protected]> Committed: Mon Oct 26 12:08:35 2015 -0500 ---------------------------------------------------------------------- .../db/compaction/CompactionManager.java | 40 ++++++++++++++++---- .../cassandra/service/StorageService.java | 3 ++ 2 files changed, 36 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/17082d4b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index e60675a..b85eb51 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -29,13 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.OpenDataException; @@ -198,6 +192,38 @@ public class CompactionManager implements CompactionManagerMBean return false; } + /** + * Shutdowns both compaction and validation executors, cancels running compaction / validation, + * and waits for tasks to complete if tasks were not cancelable. + */ + public void forceShutdown() + { + // shutdown executors to prevent further submission + executor.shutdown(); + validationExecutor.shutdown(); + + // interrupt compactions and validations + for (Holder compactionHolder : CompactionMetrics.getCompactions()) + { + compactionHolder.stop(); + } + + // wait for tasks to terminate + // compaction tasks are interrupted above, so it shuold be fairy quick + // until not interrupted tasks to complete. + for (ExecutorService exec : Arrays.asList(executor, validationExecutor)) + { + try + { + exec.awaitTermination(1, TimeUnit.MINUTES); + } + catch (InterruptedException e) + { + logger.error("Interrupted while waiting for tasks to be terminated", e); + } + } + } + public void finishCompactionsAndShutdown(long timeout, TimeUnit unit) throws InterruptedException { executor.shutdown(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/17082d4b/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d5730d5..7e5b67b 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3891,6 +3891,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE BatchlogManager.shutdown(); + // Interrupt on going compaction and shutdown to prevent further compaction + CompactionManager.instance.forceShutdown(); + // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure // there are no segments to replay, so we force the recycling of any remaining (should be at most one) CommitLog.instance.forceRecycleAllSegments();
