Repository: hbase
Updated Branches:
  refs/heads/master 4add40ca2 -> c9fdbec77


HBASE-18989 Polish the compaction related CP hooks


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c9fdbec7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c9fdbec7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c9fdbec7

Branch: refs/heads/master
Commit: c9fdbec772fe7dea06644d86e2854b98047ac9da
Parents: 4add40c
Author: zhangduo <zhang...@apache.org>
Authored: Mon Oct 23 16:44:54 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Oct 23 16:44:54 2017 +0800

----------------------------------------------------------------------
 .../hbase/coprocessor/RegionObserver.java       |  23 +-
 .../hadoop/hbase/regionserver/CompactSplit.java | 101 +++++--
 .../hadoop/hbase/regionserver/HRegion.java      |  34 ++-
 .../hbase/regionserver/HRegionServer.java       |  10 +-
 .../hadoop/hbase/regionserver/HStore.java       |   5 +-
 .../hbase/regionserver/RSRpcServices.java       |  47 +---
 .../hadoop/hbase/regionserver/Region.java       |  19 +-
 .../regionserver/RegionServerServices.java      |  10 +-
 .../apache/hadoop/hbase/regionserver/Store.java |   7 -
 .../compactions/CompactionLifeCycleTracker.java |  19 +-
 .../compactions/CompactionRequester.java        |  46 ++++
 .../hadoop/hbase/MockRegionServerServices.java  |   6 +
 .../hadoop/hbase/master/MockRegionServer.java   |  36 +--
 .../hbase/regionserver/TestCompaction.java      |   2 +-
 .../TestCompactionLifeCycleTracker.java         | 267 +++++++++++++++++++
 15 files changed, 487 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 94550df..ba96a5b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -158,7 +158,7 @@ public interface RegionObserver {
   /**
    * Called prior to selecting the {@link StoreFile StoreFiles} to compact 
from the list of
    * available candidates. To alter the files used for compaction, you may 
mutate the passed in list
-   * of candidates.
+   * of candidates. If you remove all the candidates then the compaction will 
be canceled.
    * @param c the environment provided by the region server
    * @param store the store where compaction is being requested
    * @param candidates the store files currently available for compaction
@@ -183,18 +183,12 @@ public interface RegionObserver {
 
   /**
    * Called prior to writing the {@link StoreFile}s selected for compaction 
into a new
-   * {@code StoreFile}. To override or modify the compaction process, 
implementing classes have two
-   * options:
-   * <ul>
-   * <li>Wrap the provided {@link InternalScanner} with a custom 
implementation that is returned
-   * from this method. The custom scanner can then inspect
-   *  {@link org.apache.hadoop.hbase.KeyValue}s from the wrapped scanner, 
applying its own
-   *   policy to what gets written.</li>
-   * <li>Call {@link 
org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a
-   * custom implementation for writing of new {@link StoreFile}s. 
<strong>Note: any implementations
-   * bypassing core compaction using this approach must write out new store 
files themselves or the
-   * existing data will no longer be available after compaction.</strong></li>
-   * </ul>
+   * {@code StoreFile}.
+   * <p>
+   * To override or modify the compaction process, implementing classes can 
wrap the provided
+   * {@link InternalScanner} with a custom implementation that is returned 
from this method. The
+   * custom scanner can then inspect {@link org.apache.hadoop.hbase.Cell}s 
from the wrapped scanner,
+   * applying its own policy to what gets written.
    * @param c the environment provided by the region server
    * @param store the store being compacted
    * @param scanner the scanner over existing data used in the store file 
rewriting
@@ -206,8 +200,7 @@ public interface RegionObserver {
    */
   default InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
       InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker 
tracker,
-      CompactionRequest request)
-      throws IOException {
+      CompactionRequest request) throws IOException {
     return scanner;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index f37e49e..b82b346 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -34,6 +34,8 @@ import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntSupplier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +46,7 @@ import 
org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import 
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
@@ -60,7 +63,7 @@ import 
org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
  * Compact region on request and then run split if appropriate
  */
 @InterfaceAudience.Private
-public class CompactSplit implements PropagatingConfigurationObserver {
+public class CompactSplit implements CompactionRequester, 
PropagatingConfigurationObserver {
   private static final Log LOG = LogFactory.getLog(CompactSplit.class);
 
   // Configuration key for the large compaction threads.
@@ -99,7 +102,6 @@ public class CompactSplit implements 
PropagatingConfigurationObserver {
 
   /** @param server */
   CompactSplit(HRegionServer server) {
-    super();
     this.server = server;
     this.conf = server.getConfiguration();
     this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
@@ -235,14 +237,68 @@ public class CompactSplit implements 
PropagatingConfigurationObserver {
     }
   }
 
+  // A compaction life cycle tracker to trace the execution of all the 
compactions triggered by one
+  // request and delegate to the source CompactionLifeCycleTracker. It will 
call completed method if
+  // all the compactions are finished.
+  private static final class AggregatingCompactionLifeCycleTracker
+      implements CompactionLifeCycleTracker {
+
+    private final CompactionLifeCycleTracker tracker;
+
+    private final AtomicInteger remaining;
+
+    public AggregatingCompactionLifeCycleTracker(CompactionLifeCycleTracker 
tracker,
+        int numberOfStores) {
+      this.tracker = tracker;
+      this.remaining = new AtomicInteger(numberOfStores);
+    }
+
+    private void tryCompleted() {
+      if (remaining.decrementAndGet() == 0) {
+        tracker.completed();
+      }
+    }
+
+    @Override
+    public void notExecuted(Store store, String reason) {
+      tracker.notExecuted(store, reason);
+      tryCompleted();
+    }
+
+    @Override
+    public void beforeExecution(Store store) {
+      tracker.beforeExecution(store);
+    }
+
+    @Override
+    public void afterExecution(Store store) {
+      tracker.afterExecution(store);
+      tryCompleted();
+    }
+  }
+
+  private CompactionLifeCycleTracker wrap(CompactionLifeCycleTracker tracker,
+      IntSupplier numberOfStores) {
+    if (tracker == CompactionLifeCycleTracker.DUMMY) {
+      // a simple optimization to avoid creating unnecessary objects as 
usually we do not care about
+      // the life cycle of a compaction.
+      return tracker;
+    } else {
+      return new AggregatingCompactionLifeCycleTracker(tracker, 
numberOfStores.getAsInt());
+    }
+  }
+
+  @Override
   public synchronized void requestCompaction(HRegion region, String why, int 
priority,
       CompactionLifeCycleTracker tracker, User user) throws IOException {
-    requestCompactionInternal(region, why, priority, true, tracker, user);
+    requestCompactionInternal(region, why, priority, true,
+      wrap(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), 
user);
   }
 
+  @Override
   public synchronized void requestCompaction(HRegion region, HStore store, 
String why, int priority,
       CompactionLifeCycleTracker tracker, User user) throws IOException {
-    requestCompactionInternal(region, store, why, priority, true, tracker, 
user);
+    requestCompactionInternal(region, store, why, priority, true, 
wrap(tracker, () -> 1), user);
   }
 
   private void requestCompactionInternal(HRegion region, String why, int 
priority,
@@ -259,6 +315,17 @@ public class CompactSplit implements 
PropagatingConfigurationObserver {
         !region.getTableDescriptor().isCompactionEnabled())) {
       return;
     }
+    RegionServerSpaceQuotaManager spaceQuotaManager =
+        this.server.getRegionServerSpaceQuotaManager();
+    if (spaceQuotaManager != null &&
+        
spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName()))
 {
+      String reason = "Ignoring compaction request for " + region +
+          " as an active space quota violation " + " policy disallows 
compactions.";
+      tracker.notExecuted(store, reason);
+      LOG.debug(reason);
+      return;
+    }
+
     Optional<CompactionContext> compaction;
     if (selectNow) {
       compaction = selectCompaction(region, store, priority, tracker, user);
@@ -270,17 +337,6 @@ public class CompactSplit implements 
PropagatingConfigurationObserver {
       compaction = Optional.empty();
     }
 
-    RegionServerSpaceQuotaManager spaceQuotaManager =
-        this.server.getRegionServerSpaceQuotaManager();
-    if (spaceQuotaManager != null &&
-        
spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName()))
 {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Ignoring compaction request for " + region +
-            " as an active space quota violation " + " policy disallows 
compactions.");
-      }
-      return;
-    }
-
     ThreadPoolExecutor pool;
     if (selectNow) {
       // compaction.get is safe as we will just return if selectNow is true 
but no compaction is
@@ -315,9 +371,11 @@ public class CompactSplit implements 
PropagatingConfigurationObserver {
   private Optional<CompactionContext> selectCompaction(HRegion region, HStore 
store, int priority,
       CompactionLifeCycleTracker tracker, User user) throws IOException {
     Optional<CompactionContext> compaction = store.requestCompaction(priority, 
tracker, user);
-    if (!compaction.isPresent() && LOG.isDebugEnabled() && 
region.getRegionInfo() != null) {
-      LOG.debug("Not compacting " + 
region.getRegionInfo().getRegionNameAsString() +
-          " because compaction request was cancelled");
+    if (!compaction.isPresent() && region.getRegionInfo() != null) {
+      String reason = "Not compacting " + 
region.getRegionInfo().getRegionNameAsString() +
+          " because compaction request was cancelled";
+      tracker.notExecuted(store, reason);
+      LOG.debug(reason);
     }
     return compaction;
   }
@@ -454,7 +512,6 @@ public class CompactSplit implements 
PropagatingConfigurationObserver {
 
     public CompactionRunner(HStore store, HRegion region, 
Optional<CompactionContext> compaction,
         ThreadPoolExecutor parent, User user) {
-      super();
       this.store = store;
       this.region = region;
       this.compaction = compaction;
@@ -462,7 +519,7 @@ public class CompactSplit implements 
PropagatingConfigurationObserver {
           : store.getCompactPriority();
       this.parent = parent;
       this.user = user;
-      this.time = System.currentTimeMillis();
+      this.time = EnvironmentEdgeManager.currentTime();
     }
 
     @Override
@@ -520,7 +577,7 @@ public class CompactSplit implements 
PropagatingConfigurationObserver {
       // Finally we can compact something.
       assert c != null;
 
-      c.getRequest().getTracker().beforeExecute(store);
+      c.getRequest().getTracker().beforeExecution(store);
       try {
         // Note: please don't put single-compaction logic here;
         //       put it into region/store/etc. This is CST logic.
@@ -553,7 +610,7 @@ public class CompactSplit implements 
PropagatingConfigurationObserver {
         region.reportCompactionRequestFailure();
         server.checkFileSystem();
       } finally {
-        c.getRequest().getTracker().afterExecute(store);
+        c.getRequest().getTracker().afterExecution(store);
         region.decrementCompactionsQueuedCount();
         LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 84b0d6a..9022e1f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1952,11 +1952,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   protected void doRegionCompactionPrep() throws IOException {
   }
 
-  @Override
-  public void triggerMajorCompaction() throws IOException {
-    stores.values().forEach(HStore::triggerMajorCompaction);
-  }
-
   /**
    * Synchronously compact all stores in the region.
    * <p>This operation could block for a long time, so don't call it from a
@@ -1972,7 +1967,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    */
   public void compact(boolean majorCompaction) throws IOException {
     if (majorCompaction) {
-      triggerMajorCompaction();
+      stores.values().forEach(HStore::triggerMajorCompaction);
     }
     for (HStore s : stores.values()) {
       Optional<CompactionContext> compaction = s.requestCompaction();
@@ -8212,16 +8207,27 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   }
 
   @Override
-  public void requestCompaction(String why, int priority, 
CompactionLifeCycleTracker tracker,
-      User user) throws IOException {
-    ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this, 
why, priority, tracker,
-      user);
+  public void requestCompaction(String why, int priority, boolean major,
+      CompactionLifeCycleTracker tracker) throws IOException {
+    if (major) {
+      stores.values().forEach(HStore::triggerMajorCompaction);
+    }
+    rsServices.getCompactionRequestor().requestCompaction(this, why, priority, 
tracker,
+      RpcServer.getRequestUser().orElse(null));
   }
 
   @Override
-  public void requestCompaction(byte[] family, String why, int priority,
-      CompactionLifeCycleTracker tracker, User user) throws IOException {
-    ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this,
-      Preconditions.checkNotNull(stores.get(family)), why, priority, tracker, 
user);
+  public void requestCompaction(byte[] family, String why, int priority, 
boolean major,
+      CompactionLifeCycleTracker tracker) throws IOException {
+    HStore store = stores.get(family);
+    if (store == null) {
+      throw new NoSuchColumnFamilyException("column family " + 
Bytes.toString(family) +
+          " does not exist in region " + 
getRegionInfo().getRegionNameAsString());
+    }
+    if (major) {
+      store.triggerMajorCompaction();
+    }
+    rsServices.getCompactionRequestor().requestCompaction(this, store, why, 
priority, tracker,
+      RpcServer.getRequestUser().orElse(null));
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index d396b3e..2c0bd03 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -127,6 +127,7 @@ import 
org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
@@ -1686,9 +1687,9 @@ public class HRegionServer extends HasThread implements
     int totalStaticBloomSizeKB = 0;
     long totalCompactingKVs = 0;
     long currentCompactedKVs = 0;
-    List<? extends Store> storeList = r.getStores();
+    List<HStore> storeList = r.getStores();
     stores += storeList.size();
-    for (Store store : storeList) {
+    for (HStore store : storeList) {
       storefiles += store.getStorefilesCount();
       storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 
1024 / 1024);
       storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
@@ -2779,6 +2780,11 @@ public class HRegionServer extends HasThread implements
     return this.cacheFlusher;
   }
 
+  @Override
+  public CompactionRequester getCompactionRequestor() {
+    return this.compactSplitThread;
+  }
+
   /**
    * Get the top N most loaded regions this server is serving so we can tell 
the
    * master which regions it can reallocate if we're overloaded. TODO: actually

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 83b5561..2ec5437 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1625,7 +1625,10 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
     return 
StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
   }
 
-  @Override
+  /**
+   * getter for CompactionProgress object
+   * @return CompactionProgress object; can be null
+   */
   public CompactionProgress getCompactionProgress() {
     return this.storeEngine.getCompactor().getProgress();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index a565eeb..045838a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1533,7 +1533,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
    * @throws ServiceException
    */
   @Override
-  @QosPriority(priority=HConstants.ADMIN_QOS)
+  @QosPriority(priority = HConstants.ADMIN_QOS)
   public CompactRegionResponse compactRegion(final RpcController controller,
       final CompactRegionRequest request) throws ServiceException {
     try {
@@ -1551,41 +1551,20 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
       }
       region.startRegionOperation(Operation.COMPACT_REGION);
       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
-      boolean major = false;
-      byte [] family = null;
-      HStore store = null;
+      boolean major = request.hasMajor() && request.getMajor();
       if (request.hasFamily()) {
-        family = request.getFamily().toByteArray();
-        store = region.getStore(family);
-        if (store == null) {
-          throw new ServiceException(new DoNotRetryIOException("column family 
" +
-              Bytes.toString(family) + " does not exist in region " +
-              region.getRegionInfo().getRegionNameAsString()));
-        }
-      }
-      if (request.hasMajor()) {
-        major = request.getMajor();
-      }
-      if (major) {
-        if (family != null) {
-          store.triggerMajorCompaction();
-        } else {
-          region.triggerMajorCompaction();
-        }
-      }
-
-      String familyLogMsg = (family != null)?" for column family: " + 
Bytes.toString(family):"";
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("User-triggered compaction requested for region "
-          + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
-      }
-      String log = "User-triggered " + (major ? "major " : "") + "compaction" 
+ familyLogMsg;
-      if (family != null) {
-        regionServer.compactSplitThread.requestCompaction(region, store, log, 
Store.PRIORITY_USER,
-          CompactionLifeCycleTracker.DUMMY, 
RpcServer.getRequestUser().orElse(null));
+        byte[] family = request.getFamily().toByteArray();
+        String log = "User-triggered " + (major ? "major " : "") + "compaction 
for region " +
+            region.getRegionInfo().getRegionNameAsString() + " and family " +
+            Bytes.toString(family);
+        LOG.trace(log);
+        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
+          CompactionLifeCycleTracker.DUMMY);
       } else {
-        regionServer.compactSplitThread.requestCompaction(region, log, 
Store.PRIORITY_USER,
-          CompactionLifeCycleTracker.DUMMY, 
RpcServer.getRequestUser().orElse(null));
+        String log = "User-triggered " + (major ? "major " : "") + "compaction 
for region " +
+            region.getRegionInfo().getRegionNameAsString();
+        LOG.trace(log);
+        region.requestCompaction(log, Store.PRIORITY_USER, major, 
CompactionLifeCycleTracker.DUMMY);
       }
       return CompactRegionResponse.newBuilder().build();
     } catch (IOException ie) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 79012ea..0c93ed1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.CompactionState;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -43,7 +42,6 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -455,15 +453,6 @@ public interface Region extends ConfigurationObserver {
   // Wizards only, please
 
   /**
-   * Trigger major compaction on all stores in the region.
-   * <p>
-   * Compaction will be performed asynchronously to this call by the 
RegionServer's
-   * CompactSplitThread.
-   * @throws IOException
-   */
-  void triggerMajorCompaction() throws IOException;
-
-  /**
    * @return if a given region is in compaction now.
    */
   CompactionState getCompactionState();
@@ -471,12 +460,12 @@ public interface Region extends ConfigurationObserver {
   /**
    * Request compaction on this region.
    */
-  void requestCompaction(String why, int priority, CompactionLifeCycleTracker 
tracker, User user)
-      throws IOException;
+  void requestCompaction(String why, int priority, boolean major,
+      CompactionLifeCycleTracker tracker) throws IOException;
 
   /**
    * Request compaction for the given family
    */
-  void requestCompaction(byte[] family, String why, int priority,
-      CompactionLifeCycleTracker tracker, User user) throws IOException;
+  void requestCompaction(byte[] family, String why, int priority, boolean 
major,
+      CompactionLifeCycleTracker tracker) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index d8e8ac5..af883a3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -59,11 +60,18 @@ public interface RegionServerServices extends Server, 
OnlineRegions, FavoredNode
   List<WAL> getWALs() throws IOException;
 
   /**
-   * @return Implementation of {@link FlushRequester} or null.
+   * @return Implementation of {@link FlushRequester} or null. Usually it will 
not be null unless
+   *         during intialization.
    */
   FlushRequester getFlushRequester();
 
   /**
+   * @return Implementation of {@link CompactionRequester} or null. Usually it 
will not be null
+   *         unless during intialization.
+   */
+  CompactionRequester getCompactionRequestor();
+
+  /**
    * @return the RegionServerAccounting for this Region Server
    */
   RegionServerAccounting getRegionServerAccounting();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index b6bad6f..d60de6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -62,12 +61,6 @@ public interface Store {
   FileSystem getFileSystem();
 
   /**
-   * getter for CompactionProgress object
-   * @return CompactionProgress object; can be null
-   */
-  CompactionProgress getCompactionProgress();
-
-  /**
    * Tests whether we should run a major compaction. For example, if the 
configured major compaction
    * interval is reached.
    * @return true if we should run a major compaction.

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java
index 38fec7e..dfff2f9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java
@@ -33,12 +33,18 @@ public interface CompactionLifeCycleTracker {
   };
 
   /**
+   * Called if the compaction request is failed for some reason.
+   */
+  default void notExecuted(Store store, String reason) {
+  }
+
+  /**
    * Called before compaction is executed by CompactSplitThread.
    * <p>
    * Requesting compaction on a region can lead to multiple compactions on 
different stores, so we
    * will pass the {@link Store} in to tell you the store we operate on.
    */
-  default void beforeExecute(Store store) {
+  default void beforeExecution(Store store) {
   }
 
   /**
@@ -47,6 +53,15 @@ public interface CompactionLifeCycleTracker {
    * Requesting compaction on a region can lead to multiple compactions on 
different stores, so we
    * will pass the {@link Store} in to tell you the store we operate on.
    */
-  default void afterExecute(Store store) {
+  default void afterExecution(Store store) {
+  }
+
+  /**
+   * Called after all the requested compactions are completed.
+   * <p>
+   * The compaction scheduling is per Store so if you request a compaction on 
a region it may lead
+   * to multiple compactions.
+   */
+  default void completed() {
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
new file mode 100644
index 0000000..7674722
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Request a compaction.
+ */
+@InterfaceAudience.Private
+public interface CompactionRequester {
+
+  /**
+   * Request compaction on all the stores of the given region.
+   */
+  void requestCompaction(HRegion region, String why, int priority,
+      CompactionLifeCycleTracker tracker, @Nullable User user) throws 
IOException;
+
+  /**
+   * Request compaction on the given store.
+   */
+  void requestCompaction(HRegion region, HStore store, String why, int 
priority,
+      CompactionLifeCycleTracker tracker, @Nullable User user) throws 
IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index c739715..58a0055 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -49,6 +49,7 @@ import 
org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -151,6 +152,11 @@ public class MockRegionServerServices implements 
RegionServerServices {
   }
 
   @Override
+  public CompactionRequester getCompactionRequestor() {
+    return null;
+  }
+
+  @Override
   public ClusterConnection getConnection() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 6ca7076..119c225 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -60,6 +60,7 @@ import 
org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -219,7 +220,6 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
 
   @Override
   public boolean isStopped() {
-    // TODO Auto-generated method stub
     return false;
   }
 
@@ -264,18 +264,15 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
 
   @Override
   public void addRegion(HRegion r) {
-    // TODO Auto-generated method stub
   }
 
   @Override
   public boolean removeRegion(HRegion r, ServerName destination) {
-    // TODO Auto-generated method stub
     return false;
   }
 
   @Override
   public HRegion getRegion(String encodedRegionName) {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -316,13 +313,14 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
 
   @Override
   public FlushRequester getFlushRequester() {
-    // TODO Auto-generated method stub
     return null;
   }
-
+  @Override
+  public CompactionRequester getCompactionRequestor() {
+    return null;
+  }
   @Override
   public RegionServerAccounting getRegionServerAccounting() {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -334,24 +332,20 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
   @Override
   public void postOpenDeployTasks(PostOpenDeployContext context) throws 
KeeperException,
       IOException {
-    // TODO Auto-generated method stub
   }
 
   @Override
   public RpcServerInterface getRpcServer() {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public FileSystem getFileSystem() {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -371,7 +365,6 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
   @Override
   public MutateResponse mutate(RpcController controller, MutateRequest request)
       throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -410,7 +403,6 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
   @Override
   public BulkLoadHFileResponse bulkLoadHFile(RpcController controller,
       BulkLoadHFileRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -423,7 +415,6 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
   @Override
   public ClientProtos.MultiResponse multi(
       RpcController controller, MultiRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -451,14 +442,12 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
   @Override
   public GetStoreFileResponse getStoreFile(RpcController controller,
       GetStoreFileRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
       GetOnlineRegionRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -470,74 +459,63 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
   @Override
   public OpenRegionResponse openRegion(RpcController controller,
       OpenRegionRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public WarmupRegionResponse warmupRegion(RpcController controller,
       WarmupRegionRequest request) throws ServiceException {
-    //TODO Auto-generated method stub
     return null;
   }
   @Override
   public CloseRegionResponse closeRegion(RpcController controller,
       CloseRegionRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public FlushRegionResponse flushRegion(RpcController controller,
       FlushRegionRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public CompactRegionResponse compactRegion(RpcController controller,
       CompactRegionRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
       ReplicateWALEntryRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public RollWALWriterResponse rollWALWriter(RpcController controller,
       RollWALWriterRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public GetServerInfoResponse getServerInfo(RpcController controller,
       GetServerInfoRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public StopServerResponse stopServer(RpcController controller,
       StopServerRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public List<Region> getRegions(TableName tableName) throws IOException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public Leases getLeases() {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -575,13 +553,11 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
   public ReplicateWALEntryResponse
       replay(RpcController controller, ReplicateWALEntryRequest request)
       throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public Map<String, HRegion> getRecoveringRegions() {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -603,14 +579,12 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
 
   @Override
   public boolean registerService(com.google.protobuf.Service service) {
-    // TODO Auto-generated method stub
     return false;
   }
 
   @Override
   public CoprocessorServiceResponse execRegionServerService(RpcController 
controller,
       CoprocessorServiceRequest request) throws ServiceException {
-    // TODO Auto-generated method stub
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index a9f331e..6316809 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -706,7 +706,7 @@ public class TestCompaction {
     }
 
     @Override
-    public void afterExecute(Store store) {
+    public void afterExecution(Store store) {
       done.countDown();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9fdbec7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
new file mode 100644
index 0000000..70d3463
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
+import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Confirm that the function of CompactionLifeCycleTracker is OK as we do not 
use it in our own
+ * code.
+ */
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestCompactionLifeCycleTracker {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final TableName NAME =
+      TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName());
+
+  private static final byte[] CF1 = Bytes.toBytes("CF1");
+
+  private static final byte[] CF2 = Bytes.toBytes("CF2");
+
+  private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
+
+  private HRegion region;
+
+  private static CompactionLifeCycleTracker TRACKER = null;
+
+  // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks.
+  public static final class CompactionObserver implements RegionObserver {
+
+    @Override
+    public void 
preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
+        List<? extends StoreFile> candidates, CompactionLifeCycleTracker 
tracker)
+        throws IOException {
+      if (TRACKER != null) {
+        assertSame(tracker, TRACKER);
+      }
+    }
+
+    @Override
+    public void 
postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
+        List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
+        CompactionRequest request) {
+      if (TRACKER != null) {
+        assertSame(tracker, TRACKER);
+      }
+    }
+
+    @Override
+    public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker 
tracker,
+        CompactionRequest request) throws IOException {
+      if (TRACKER != null) {
+        assertSame(tracker, TRACKER);
+      }
+      return scanner;
+    }
+
+    @Override
+    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, 
Store store,
+        StoreFile resultFile, CompactionLifeCycleTracker tracker, 
CompactionRequest request)
+        throws IOException {
+      if (TRACKER != null) {
+        assertSame(tracker, TRACKER);
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    
UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,
 2);
+    UTIL.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    UTIL.getAdmin()
+        .createTable(TableDescriptorBuilder.newBuilder(NAME)
+            .addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1))
+            .addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2))
+            .addCoprocessor(CompactionObserver.class.getName()).build());
+    try (Table table = UTIL.getConnection().getTable(NAME)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addImmutable(CF1, QUALIFIER, 
Bytes.toBytes(i)));
+      }
+      UTIL.getAdmin().flush(NAME);
+      for (int i = 100; i < 200; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addImmutable(CF1, QUALIFIER, 
Bytes.toBytes(i)));
+      }
+      UTIL.getAdmin().flush(NAME);
+    }
+    region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
+    assertEquals(2, region.getStore(CF1).getStorefilesCount());
+    assertEquals(0, region.getStore(CF2).getStorefilesCount());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    region = null;
+    TRACKER = null;
+    UTIL.deleteTable(NAME);
+  }
+
+  private static final class Tracker implements CompactionLifeCycleTracker {
+
+    final List<Pair<Store, String>> notExecutedStores = new ArrayList<>();
+
+    final List<Store> beforeExecuteStores = new ArrayList<>();
+
+    final List<Store> afterExecuteStores = new ArrayList<>();
+
+    private boolean completed = false;
+
+    @Override
+    public void notExecuted(Store store, String reason) {
+      notExecutedStores.add(Pair.newPair(store, reason));
+    }
+
+    @Override
+    public void beforeExecution(Store store) {
+      beforeExecuteStores.add(store);
+    }
+
+    @Override
+    public void afterExecution(Store store) {
+      afterExecuteStores.add(store);
+    }
+
+    @Override
+    public synchronized void completed() {
+      completed = true;
+      notifyAll();
+    }
+
+    public synchronized void await() throws InterruptedException {
+      while (!completed) {
+        wait();
+      }
+    }
+  }
+
+  @Test
+  public void testRequestOnRegion() throws IOException, InterruptedException {
+    Tracker tracker = new Tracker();
+    TRACKER = tracker;
+    region.requestCompaction("test", Store.PRIORITY_USER, false, tracker);
+    tracker.await();
+    assertEquals(1, tracker.notExecutedStores.size());
+    assertEquals(Bytes.toString(CF2),
+      tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
+    assertThat(tracker.notExecutedStores.get(0).getSecond(),
+      containsString("compaction request was cancelled"));
+
+    assertEquals(1, tracker.beforeExecuteStores.size());
+    assertEquals(Bytes.toString(CF1), 
tracker.beforeExecuteStores.get(0).getColumnFamilyName());
+
+    assertEquals(1, tracker.afterExecuteStores.size());
+    assertEquals(Bytes.toString(CF1), 
tracker.afterExecuteStores.get(0).getColumnFamilyName());
+  }
+
+  @Test
+  public void testRequestOnStore() throws IOException, InterruptedException {
+    Tracker tracker = new Tracker();
+    TRACKER = tracker;
+    region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker);
+    tracker.await();
+    assertTrue(tracker.notExecutedStores.isEmpty());
+    assertEquals(1, tracker.beforeExecuteStores.size());
+    assertEquals(Bytes.toString(CF1), 
tracker.beforeExecuteStores.get(0).getColumnFamilyName());
+    assertEquals(1, tracker.afterExecuteStores.size());
+    assertEquals(Bytes.toString(CF1), 
tracker.afterExecuteStores.get(0).getColumnFamilyName());
+
+    tracker = new Tracker();
+    TRACKER = tracker;
+    region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker);
+    tracker.await();
+    assertEquals(1, tracker.notExecutedStores.size());
+    assertEquals(Bytes.toString(CF2),
+      tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
+    assertThat(tracker.notExecutedStores.get(0).getSecond(),
+      containsString("compaction request was cancelled"));
+    assertTrue(tracker.beforeExecuteStores.isEmpty());
+    assertTrue(tracker.afterExecuteStores.isEmpty());
+  }
+
+  @Test
+  public void testSpaceQuotaViolation() throws IOException, 
InterruptedException {
+    
region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME,
+      new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L,
+          100L));
+    Tracker tracker = new Tracker();
+    TRACKER = tracker;
+    region.requestCompaction("test", Store.PRIORITY_USER, false, tracker);
+    tracker.await();
+    assertEquals(2, tracker.notExecutedStores.size());
+    tracker.notExecutedStores.sort((p1, p2) -> 
p1.getFirst().getColumnFamilyName()
+        .compareTo(p2.getFirst().getColumnFamilyName()));
+
+    assertEquals(Bytes.toString(CF1),
+      tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
+    assertThat(tracker.notExecutedStores.get(0).getSecond(),
+      containsString("space quota violation"));
+
+    assertEquals(Bytes.toString(CF2),
+      tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName());
+    assertThat(tracker.notExecutedStores.get(1).getSecond(),
+      containsString("space quota violation"));
+
+    assertTrue(tracker.beforeExecuteStores.isEmpty());
+    assertTrue(tracker.afterExecuteStores.isEmpty());
+  }
+}

Reply via email to