This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 3ebb4360b9b HBASE-27152 Under compaction mark may leak (#4568)
3ebb4360b9b is described below
commit 3ebb4360b9bc17c8eed362671c1cba49c3a05697
Author: Xiaolin Ha <[email protected]>
AuthorDate: Wed Jul 20 08:49:10 2022 +0800
HBASE-27152 Under compaction mark may leak (#4568)
Signed-off-by: Duo Zhang <[email protected]>
(cherry picked from commit da27a67a1e3abfa0117b0cf1a59d01b7a2f8ab05)
---
.../hadoop/hbase/regionserver/CompactSplit.java | 32 +++++++---------------
1 file changed, 10 insertions(+), 22 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index 8a6e806292a..8cbded31f54 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -145,15 +144,15 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
final String n = Thread.currentThread().getName();
StealJobQueue<Runnable> stealJobQueue = new
StealJobQueue<Runnable>(COMPARATOR);
+ // Since the StealJobQueue is unbounded, we need not to set the
RejectedExecutionHandler for
+ // the long and short compaction thread pool executors.
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
60, TimeUnit.SECONDS,
stealJobQueue,
new ThreadFactoryBuilder().setNameFormat(n +
"-longCompactions-%d").setDaemon(true).build());
- this.longCompactions.setRejectedExecutionHandler(new Rejection());
this.longCompactions.prestartAllCoreThreads();
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
60, TimeUnit.SECONDS,
stealJobQueue.getStealFromQueue(),
new ThreadFactoryBuilder().setNameFormat(n +
"-shortCompactions-%d").setDaemon(true).build());
- this.shortCompactions.setRejectedExecutionHandler(new Rejection());
}
@Override
@@ -382,15 +381,20 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
// pool; we will do selection there, and move to large pool if necessary.
pool = shortCompactions;
}
- pool.execute(
- new CompactionRunner(store, region, compaction, tracker,
completeTracker, pool, user));
+
+ // A simple implementation for under compaction marks.
+ // Since this method is always called in the synchronized methods, we do
not need to use the
+ // boolean result to make sure that exactly the one that added here will
be removed
+ // in the next steps.
+ underCompactionStores.add(getStoreNameForUnderCompaction(store));
if (LOG.isDebugEnabled()) {
LOG.debug(
"Add compact mark for store {}, priority={}, current under compaction "
+ "store size is {}",
getStoreNameForUnderCompaction(store), priority,
underCompactionStores.size());
}
- underCompactionStores.add(getStoreNameForUnderCompaction(store));
+ pool.submit(
+ new CompactionRunner(store, region, compaction, tracker,
completeTracker, pool, user));
region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large ";
@@ -719,22 +723,6 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
}
}
- /**
- * Cleanup class to use when rejecting a compaction request from the queue.
- */
- private static class Rejection implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
- if (runnable instanceof CompactionRunner) {
- CompactionRunner runner = (CompactionRunner) runnable;
- LOG.debug("Compaction Rejected: " + runner);
- if (runner.compaction != null) {
- runner.store.cancelRequestedCompaction(runner.compaction);
- }
- }
- }
- }
-
/**
* {@inheritDoc}
*/