Repository: hbase
Updated Branches:
  refs/heads/master 81133f89f -> 37b29e909


HBASE-19069 Do not wrap the original CompactionLifeCycleTracker when calling CP 
hooks


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/37b29e90
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/37b29e90
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/37b29e90

Branch: refs/heads/master
Commit: 37b29e909defecdc580112ce6cd306710d13e9e2
Parents: 81133f8
Author: zhangduo <zhang...@apache.org>
Authored: Mon Oct 23 21:10:44 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue Oct 24 10:56:14 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/CompactSplit.java | 135 ++++++++++---------
 .../TestCompactionLifeCycleTracker.java         |   9 +-
 2 files changed, 80 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/37b29e90/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index b82b346..0749f85 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -237,80 +237,73 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
     }
   }
 
-  // A compaction life cycle tracker to trace the execution of all the 
compactions triggered by one
-  // request and delegate to the source CompactionLifeCycleTracker. It will 
call completed method if
-  // all the compactions are finished.
-  private static final class AggregatingCompactionLifeCycleTracker
-      implements CompactionLifeCycleTracker {
+  private interface CompactionCompleteTracker {
+
+    default void completed(Store store) {
+    }
+  }
+
+  private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
+      new CompactionCompleteTracker() {
+      };
+
+  private static final class AggregatingCompleteTracker implements 
CompactionCompleteTracker {
 
     private final CompactionLifeCycleTracker tracker;
 
     private final AtomicInteger remaining;
 
-    public AggregatingCompactionLifeCycleTracker(CompactionLifeCycleTracker 
tracker,
-        int numberOfStores) {
+    public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int 
numberOfStores) {
       this.tracker = tracker;
       this.remaining = new AtomicInteger(numberOfStores);
     }
 
-    private void tryCompleted() {
+    @Override
+    public void completed(Store store) {
       if (remaining.decrementAndGet() == 0) {
         tracker.completed();
       }
     }
-
-    @Override
-    public void notExecuted(Store store, String reason) {
-      tracker.notExecuted(store, reason);
-      tryCompleted();
-    }
-
-    @Override
-    public void beforeExecution(Store store) {
-      tracker.beforeExecution(store);
-    }
-
-    @Override
-    public void afterExecution(Store store) {
-      tracker.afterExecution(store);
-      tryCompleted();
-    }
   }
 
-  private CompactionLifeCycleTracker wrap(CompactionLifeCycleTracker tracker,
+  private CompactionCompleteTracker 
getCompleteTracker(CompactionLifeCycleTracker tracker,
       IntSupplier numberOfStores) {
     if (tracker == CompactionLifeCycleTracker.DUMMY) {
       // a simple optimization to avoid creating unnecessary objects as 
usually we do not care about
       // the life cycle of a compaction.
-      return tracker;
+      return DUMMY_COMPLETE_TRACKER;
     } else {
-      return new AggregatingCompactionLifeCycleTracker(tracker, 
numberOfStores.getAsInt());
+      return new AggregatingCompleteTracker(tracker, 
numberOfStores.getAsInt());
     }
   }
 
   @Override
   public synchronized void requestCompaction(HRegion region, String why, int 
priority,
       CompactionLifeCycleTracker tracker, User user) throws IOException {
-    requestCompactionInternal(region, why, priority, true,
-      wrap(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), 
user);
+    requestCompactionInternal(region, why, priority, true, tracker,
+      getCompleteTracker(tracker, () -> 
region.getTableDescriptor().getColumnFamilyCount()), user);
   }
 
   @Override
   public synchronized void requestCompaction(HRegion region, HStore store, 
String why, int priority,
       CompactionLifeCycleTracker tracker, User user) throws IOException {
-    requestCompactionInternal(region, store, why, priority, true, 
wrap(tracker, () -> 1), user);
+    requestCompactionInternal(region, store, why, priority, true, tracker,
+      getCompleteTracker(tracker, () -> 1), user);
   }
 
   private void requestCompactionInternal(HRegion region, String why, int 
priority,
-      boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws 
IOException {
+      boolean selectNow, CompactionLifeCycleTracker tracker,
+      CompactionCompleteTracker completeTracker, User user) throws IOException 
{
     // request compaction on all stores
     for (HStore store : region.stores.values()) {
-      requestCompactionInternal(region, store, why, priority, selectNow, 
tracker, user);
+      requestCompactionInternal(region, store, why, priority, selectNow, 
tracker, completeTracker,
+        user);
     }
   }
 
   private void requestCompactionInternal(HRegion region, HStore store, String 
why, int priority,
-      boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws 
IOException {
+      boolean selectNow, CompactionLifeCycleTracker tracker,
+      CompactionCompleteTracker completeTracker, User user) throws IOException 
{
     if (this.server.isStopped() || (region.getTableDescriptor() != null &&
         !region.getTableDescriptor().isCompactionEnabled())) {
       return;
@@ -322,33 +315,36 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
       String reason = "Ignoring compaction request for " + region +
           " as an active space quota violation " + " policy disallows 
compactions.";
       tracker.notExecuted(store, reason);
+      completeTracker.completed(store);
       LOG.debug(reason);
       return;
     }
 
-    Optional<CompactionContext> compaction;
+    CompactionContext compaction;
     if (selectNow) {
-      compaction = selectCompaction(region, store, priority, tracker, user);
-      if (!compaction.isPresent()) {
+      Optional<CompactionContext> c = selectCompaction(region, store, 
priority, tracker, completeTracker, user);
+      if (!c.isPresent()) {
         // message logged inside
         return;
       }
+      compaction = c.get();
     } else {
-      compaction = Optional.empty();
+      compaction = null;
     }
 
     ThreadPoolExecutor pool;
     if (selectNow) {
       // compaction.get is safe as we will just return if selectNow is true 
but no compaction is
       // selected
-      pool = store.throttleCompaction(compaction.get().getRequest().getSize()) 
? longCompactions
+      pool = store.throttleCompaction(compaction.getRequest().getSize()) ? 
longCompactions
           : shortCompactions;
     } else {
       // We assume that most compactions are small. So, put system compactions 
into small
       // pool; we will do selection there, and move to large pool if necessary.
       pool = shortCompactions;
     }
-    pool.execute(new CompactionRunner(store, region, compaction, pool, user));
+    pool.execute(
+      new CompactionRunner(store, region, compaction, tracker, 
completeTracker, pool, user));
     region.incrementCompactionsQueuedCount();
     if (LOG.isDebugEnabled()) {
       String type = (pool == shortCompactions) ? "Small " : "Large ";
@@ -358,23 +354,25 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
   }
 
   public synchronized void requestSystemCompaction(HRegion region, String why) 
throws IOException {
-    requestCompactionInternal(region, why, NO_PRIORITY, false,
-      CompactionLifeCycleTracker.DUMMY, null);
+    requestCompactionInternal(region, why, NO_PRIORITY, false, 
CompactionLifeCycleTracker.DUMMY,
+      DUMMY_COMPLETE_TRACKER, null);
   }
 
   public synchronized void requestSystemCompaction(HRegion region, HStore 
store, String why)
       throws IOException {
     requestCompactionInternal(region, store, why, NO_PRIORITY, false,
-      CompactionLifeCycleTracker.DUMMY, null);
+      CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
   }
 
   private Optional<CompactionContext> selectCompaction(HRegion region, HStore 
store, int priority,
-      CompactionLifeCycleTracker tracker, User user) throws IOException {
+      CompactionLifeCycleTracker tracker, CompactionCompleteTracker 
completeTracker, User user)
+      throws IOException {
     Optional<CompactionContext> compaction = store.requestCompaction(priority, 
tracker, user);
     if (!compaction.isPresent() && region.getRegionInfo() != null) {
       String reason = "Not compacting " + 
region.getRegionInfo().getRegionNameAsString() +
           " because compaction request was cancelled";
       tracker.notExecuted(store, reason);
+      completeTracker.completed(store);
       LOG.debug(reason);
     }
     return compaction;
@@ -491,12 +489,12 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
       if (cmp != 0) {
         return cmp;
       }
-      Optional<CompactionContext> c1 = o1.compaction;
-      Optional<CompactionContext> c2 = o2.compaction;
-      if (c1.isPresent()) {
-        return c2.isPresent() ? compare(c1.get().getRequest(), 
c2.get().getRequest()) : -1;
+      CompactionContext c1 = o1.compaction;
+      CompactionContext c2 = o2.compaction;
+      if (c1 != null) {
+        return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
       } else {
-        return c2.isPresent() ? 1 : 0;
+        return c2 != null ? 1 : 0;
       }
     }
   };
@@ -504,19 +502,24 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
   private final class CompactionRunner implements Runnable {
     private final HStore store;
     private final HRegion region;
-    private final Optional<CompactionContext> compaction;
+    private final CompactionContext compaction;
+    private final CompactionLifeCycleTracker tracker;
+    private final CompactionCompleteTracker completeTracker;
     private int queuedPriority;
     private ThreadPoolExecutor parent;
     private User user;
     private long time;
 
-    public CompactionRunner(HStore store, HRegion region, 
Optional<CompactionContext> compaction,
+    public CompactionRunner(HStore store, HRegion region, CompactionContext 
compaction,
+        CompactionLifeCycleTracker tracker, CompactionCompleteTracker 
completeTracker,
         ThreadPoolExecutor parent, User user) {
       this.store = store;
       this.region = region;
       this.compaction = compaction;
-      this.queuedPriority = compaction.isPresent() ? 
compaction.get().getRequest().getPriority()
-          : store.getCompactPriority();
+      this.tracker = tracker;
+      this.completeTracker = completeTracker;
+      this.queuedPriority =
+          compaction != null ? compaction.getRequest().getPriority() : 
store.getCompactPriority();
       this.parent = parent;
       this.user = user;
       this.time = EnvironmentEdgeManager.currentTime();
@@ -524,15 +527,18 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
 
     @Override
     public String toString() {
-      return compaction.map(c -> "Request = " + c.getRequest())
-          .orElse("regionName = " + region.toString() + ", storeName = " + 
store.toString() +
-              ", priority = " + queuedPriority + ", time = " + time);
+      if (compaction != null) {
+        return "Request = " + compaction.getRequest();
+      } else {
+        return "regionName = " + region.toString() + ", storeName = " + 
store.toString() +
+            ", priority = " + queuedPriority + ", time = " + time;
+      }
     }
 
     private void doCompaction(User user) {
       CompactionContext c;
       // Common case - system compaction without a file selection. Select now.
-      if (!compaction.isPresent()) {
+      if (compaction == null) {
         int oldPriority = this.queuedPriority;
         this.queuedPriority = this.store.getCompactPriority();
         if (this.queuedPriority > oldPriority) {
@@ -543,8 +549,8 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
         }
         Optional<CompactionContext> selected;
         try {
-          selected = selectCompaction(this.region, this.store, queuedPriority,
-            CompactionLifeCycleTracker.DUMMY, user);
+          selected = selectCompaction(this.region, this.store, queuedPriority, 
tracker,
+            completeTracker, user);
         } catch (IOException ex) {
           LOG.error("Compaction selection failed " + this, ex);
           server.checkFileSystem();
@@ -572,12 +578,12 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
           return;
         }
       } else {
-        c = compaction.get();
+        c = compaction;
       }
       // Finally we can compact something.
       assert c != null;
 
-      c.getRequest().getTracker().beforeExecution(store);
+      tracker.beforeExecution(store);
       try {
         // Note: please don't put single-compaction logic here;
         //       put it into region/store/etc. This is CST logic.
@@ -610,7 +616,8 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
         region.reportCompactionRequestFailure();
         server.checkFileSystem();
       } finally {
-        c.getRequest().getTracker().afterExecution(store);
+        tracker.afterExecution(store);
+        completeTracker.completed(store);
         region.decrementCompactionsQueuedCount();
         LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
       }
@@ -645,7 +652,9 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
       if (runnable instanceof CompactionRunner) {
         CompactionRunner runner = (CompactionRunner) runnable;
         LOG.debug("Compaction Rejected: " + runner);
-        runner.compaction.ifPresent(c -> 
runner.store.cancelRequestedCompaction(c));
+        if (runner.compaction != null) {
+          runner.store.cancelRequestedCompaction(runner.compaction);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/37b29e90/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
index 70d3463..4eb43b2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
@@ -76,7 +78,12 @@ public class TestCompactionLifeCycleTracker {
   private static CompactionLifeCycleTracker TRACKER = null;
 
   // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks.
-  public static final class CompactionObserver implements RegionObserver {
+  public static final class CompactionObserver implements RegionObserver, 
RegionCoprocessor {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
 
     @Override
     public void 
preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,

Reply via email to