This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 468715bff3c Revert "HBASE-27152 Under compaction mark may leak (#4568)"
468715bff3c is described below
commit 468715bff3c66e42f6812d96239b39ca0081a976
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Jul 21 09:04:17 2022 +0800
Revert "HBASE-27152 Under compaction mark may leak (#4568)"
This reverts commit 5c5ceb1ef2588b129a5928c22f70c9982eed4a19.
---
.../hadoop/hbase/regionserver/CompactSplit.java | 32 +++++++++++++++-------
1 file changed, 22 insertions(+), 10 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index 8cbded31f54..8a6e806292a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -144,15 +145,15 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
final String n = Thread.currentThread().getName();
StealJobQueue<Runnable> stealJobQueue = new
StealJobQueue<Runnable>(COMPARATOR);
- // Since the StealJobQueue is unbounded, we need not to set the
RejectedExecutionHandler for
- // the long and short compaction thread pool executors.
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
60, TimeUnit.SECONDS,
stealJobQueue,
new ThreadFactoryBuilder().setNameFormat(n +
"-longCompactions-%d").setDaemon(true).build());
+ this.longCompactions.setRejectedExecutionHandler(new Rejection());
this.longCompactions.prestartAllCoreThreads();
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
60, TimeUnit.SECONDS,
stealJobQueue.getStealFromQueue(),
new ThreadFactoryBuilder().setNameFormat(n +
"-shortCompactions-%d").setDaemon(true).build());
+ this.shortCompactions.setRejectedExecutionHandler(new Rejection());
}
@Override
@@ -381,20 +382,15 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
// pool; we will do selection there, and move to large pool if necessary.
pool = shortCompactions;
}
-
- // A simple implementation for under compaction marks.
- // Since this method is always called in the synchronized methods, we do
not need to use the
- // boolean result to make sure that exactly the one that added here will
be removed
- // in the next steps.
- underCompactionStores.add(getStoreNameForUnderCompaction(store));
+ pool.execute(
+ new CompactionRunner(store, region, compaction, tracker,
completeTracker, pool, user));
if (LOG.isDebugEnabled()) {
LOG.debug(
"Add compact mark for store {}, priority={}, current under compaction "
+ "store size is {}",
getStoreNameForUnderCompaction(store), priority,
underCompactionStores.size());
}
- pool.submit(
- new CompactionRunner(store, region, compaction, tracker,
completeTracker, pool, user));
+ underCompactionStores.add(getStoreNameForUnderCompaction(store));
region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large ";
@@ -723,6 +719,22 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
}
}
+ /**
+ * Cleanup class to use when rejecting a compaction request from the queue.
+ */
+ private static class Rejection implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+ if (runnable instanceof CompactionRunner) {
+ CompactionRunner runner = (CompactionRunner) runnable;
+ LOG.debug("Compaction Rejected: " + runner);
+ if (runner.compaction != null) {
+ runner.store.cancelRequestedCompaction(runner.compaction);
+ }
+ }
+ }
+ }
+
/**
* {@inheritDoc}
*/