This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 468715bff3c Revert "HBASE-27152 Under compaction mark may leak (#4568)"
468715bff3c is described below

commit 468715bff3c66e42f6812d96239b39ca0081a976
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Jul 21 09:04:17 2022 +0800

    Revert "HBASE-27152 Under compaction mark may leak (#4568)"
    
    This reverts commit 5c5ceb1ef2588b129a5928c22f70c9982eed4a19.
---
 .../hadoop/hbase/regionserver/CompactSplit.java    | 32 +++++++++++++++-------
 1 file changed, 22 insertions(+), 10 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index 8cbded31f54..8a6e806292a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -144,15 +145,15 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
     final String n = Thread.currentThread().getName();
 
     StealJobQueue<Runnable> stealJobQueue = new 
StealJobQueue<Runnable>(COMPARATOR);
-    // Since the StealJobQueue is unbounded, we need not to set the 
RejectedExecutionHandler for
-    // the long and short compaction thread pool executors.
     this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 
60, TimeUnit.SECONDS,
       stealJobQueue,
       new ThreadFactoryBuilder().setNameFormat(n + 
"-longCompactions-%d").setDaemon(true).build());
+    this.longCompactions.setRejectedExecutionHandler(new Rejection());
     this.longCompactions.prestartAllCoreThreads();
     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 
60, TimeUnit.SECONDS,
       stealJobQueue.getStealFromQueue(),
       new ThreadFactoryBuilder().setNameFormat(n + 
"-shortCompactions-%d").setDaemon(true).build());
+    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
   }
 
   @Override
@@ -381,20 +382,15 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
       // pool; we will do selection there, and move to large pool if necessary.
       pool = shortCompactions;
     }
-
-    // A simple implementation for under compaction marks.
-    // Since this method is always called in the synchronized methods, we do 
not need to use the
-    // boolean result to make sure that exactly the one that added here will 
be removed
-    // in the next steps.
-    underCompactionStores.add(getStoreNameForUnderCompaction(store));
+    pool.execute(
+      new CompactionRunner(store, region, compaction, tracker, 
completeTracker, pool, user));
     if (LOG.isDebugEnabled()) {
       LOG.debug(
         "Add compact mark for store {}, priority={}, current under compaction "
           + "store size is {}",
         getStoreNameForUnderCompaction(store), priority, 
underCompactionStores.size());
     }
-    pool.submit(
-      new CompactionRunner(store, region, compaction, tracker, 
completeTracker, pool, user));
+    underCompactionStores.add(getStoreNameForUnderCompaction(store));
     region.incrementCompactionsQueuedCount();
     if (LOG.isDebugEnabled()) {
       String type = (pool == shortCompactions) ? "Small " : "Large ";
@@ -723,6 +719,22 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
     }
   }
 
+  /**
+   * Cleanup class to use when rejecting a compaction request from the queue.
+   */
+  private static class Rejection implements RejectedExecutionHandler {
+    @Override
+    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+      if (runnable instanceof CompactionRunner) {
+        CompactionRunner runner = (CompactionRunner) runnable;
+        LOG.debug("Compaction Rejected: " + runner);
+        if (runner.compaction != null) {
+          runner.store.cancelRequestedCompaction(runner.compaction);
+        }
+      }
+    }
+  }
+
   /**
    * {@inheritDoc}
    */

Reply via email to