This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 3ebb4360b9b HBASE-27152 Under compaction mark may leak (#4568)
3ebb4360b9b is described below

commit 3ebb4360b9bc17c8eed362671c1cba49c3a05697
Author: Xiaolin Ha <[email protected]>
AuthorDate: Wed Jul 20 08:49:10 2022 +0800

    HBASE-27152 Under compaction mark may leak (#4568)
    
    Signed-off-by: Duo Zhang <[email protected]>
    (cherry picked from commit da27a67a1e3abfa0117b0cf1a59d01b7a2f8ab05)
---
 .../hadoop/hbase/regionserver/CompactSplit.java    | 32 +++++++---------------
 1 file changed, 10 insertions(+), 22 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index 8a6e806292a..8cbded31f54 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -145,15 +144,15 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
     final String n = Thread.currentThread().getName();
 
     StealJobQueue<Runnable> stealJobQueue = new 
StealJobQueue<Runnable>(COMPARATOR);
+    // Since the StealJobQueue is unbounded, we need not to set the 
RejectedExecutionHandler for
+    // the long and short compaction thread pool executors.
     this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 
60, TimeUnit.SECONDS,
       stealJobQueue,
       new ThreadFactoryBuilder().setNameFormat(n + 
"-longCompactions-%d").setDaemon(true).build());
-    this.longCompactions.setRejectedExecutionHandler(new Rejection());
     this.longCompactions.prestartAllCoreThreads();
     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 
60, TimeUnit.SECONDS,
       stealJobQueue.getStealFromQueue(),
       new ThreadFactoryBuilder().setNameFormat(n + 
"-shortCompactions-%d").setDaemon(true).build());
-    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
   }
 
   @Override
@@ -382,15 +381,20 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
       // pool; we will do selection there, and move to large pool if necessary.
       pool = shortCompactions;
     }
-    pool.execute(
-      new CompactionRunner(store, region, compaction, tracker, 
completeTracker, pool, user));
+
+    // A simple implementation for under compaction marks.
+    // Since this method is always called in the synchronized methods, we do 
not need to use the
+    // boolean result to make sure that exactly the one that added here will 
be removed
+    // in the next steps.
+    underCompactionStores.add(getStoreNameForUnderCompaction(store));
     if (LOG.isDebugEnabled()) {
       LOG.debug(
         "Add compact mark for store {}, priority={}, current under compaction "
           + "store size is {}",
         getStoreNameForUnderCompaction(store), priority, 
underCompactionStores.size());
     }
-    underCompactionStores.add(getStoreNameForUnderCompaction(store));
+    pool.submit(
+      new CompactionRunner(store, region, compaction, tracker, 
completeTracker, pool, user));
     region.incrementCompactionsQueuedCount();
     if (LOG.isDebugEnabled()) {
       String type = (pool == shortCompactions) ? "Small " : "Large ";
@@ -719,22 +723,6 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
     }
   }
 
-  /**
-   * Cleanup class to use when rejecting a compaction request from the queue.
-   */
-  private static class Rejection implements RejectedExecutionHandler {
-    @Override
-    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
-      if (runnable instanceof CompactionRunner) {
-        CompactionRunner runner = (CompactionRunner) runnable;
-        LOG.debug("Compaction Rejected: " + runner);
-        if (runner.compaction != null) {
-          runner.store.cancelRequestedCompaction(runner.compaction);
-        }
-      }
-    }
-  }
-
   /**
    * {@inheritDoc}
    */

Reply via email to