This is an automated email from the ASF dual-hosted git repository.

niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-25714 by this push:
     new f19f92d  HBASE-25991 Do compaction on compaction server (#3425)
f19f92d is described below

commit f19f92d297643357841f9c985c234a8af8e749ed
Author: niuyulin <[email protected]>
AuthorDate: Tue Jul 6 17:32:54 2021 +0800

    HBASE-25991 Do compaction on compaction server (#3425)
    
    Signed-off-by: Duo Zhang <[email protected]>
    Signed-off-by: Michael Stack <[email protected]>
---
 .../org/apache/hadoop/hbase/AbstractServer.java    |  28 ++
 .../hbase/compactionserver/CSRpcServices.java      |  31 +-
 .../compactionserver/CompactionFilesCache.java     | 182 +++++++++
 .../hbase/compactionserver/CompactionTask.java     | 176 +++++++++
 .../compactionserver/CompactionThreadManager.java  | 433 ++++++++++++++++++++-
 .../hbase/compactionserver/HCompactionServer.java  |  46 ++-
 .../compaction/CompactionOffloadManager.java       |  17 +-
 .../hadoop/hbase/regionserver/CompactSplit.java    | 192 ++++-----
 .../hbase/regionserver/CompactThreadControl.java   | 156 ++++++++
 .../hbase/regionserver/HRegionFileSystem.java      |   2 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  38 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   2 +-
 .../hbase/regionserver/RegionServerServices.java   |  29 +-
 .../CompactionThroughputControllerFactory.java     |   5 +-
 .../throttle/NoLimitThroughputController.java      |   3 +-
 ...ressureAwareCompactionThroughputController.java |   5 +-
 .../PressureAwareFlushThroughputController.java    |   6 +-
 .../PressureAwareThroughputController.java         |  11 +-
 .../throttle/ThroughputController.java             |   5 +-
 ...oller.java => ThroughputControllerService.java} |  37 +-
 .../compactionserver/TestCompactionFilesCache.java | 155 ++++++++
 .../compactionserver/TestCompactionServer.java     | 154 +++++++-
 22 files changed, 1451 insertions(+), 262 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
index 208bd86..3e64c85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
@@ -25,10 +25,12 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
@@ -56,6 +59,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
 public abstract class AbstractServer extends Thread implements Server {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractServer.class);
   protected Configuration conf;
+  protected volatile boolean dataFsOk;
+  protected HFileSystem dataFs;
   // A sleeper that sleeps for msgInterval.
   protected Sleeper sleeper;
   protected int msgInterval;
@@ -170,6 +175,7 @@ public abstract class AbstractServer extends Thread 
implements Server {
     super(processName); // thread name
     this.startcode = System.currentTimeMillis();
     this.conf = conf;
+    this.dataFsOk = true;
     this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
     this.userProvider = UserProvider.instantiate(conf);
     this.shortOperationTimeout = 
conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
@@ -400,6 +406,28 @@ public abstract class AbstractServer extends Thread 
implements Server {
     }
   }
 
+  /**
+   * Checks to see if the file system is still accessible. If not, sets 
abortRequested and
+   * stopRequested
+   * @return false if file system is not available
+   */
+  public boolean checkFileSystem() {
+    if (this.dataFsOk && this.dataFs != null) {
+      try {
+        FSUtils.checkFileSystemAvailable(this.dataFs);
+      } catch (IOException e) {
+        abort("File System not available", e);
+        this.dataFsOk = false;
+      }
+    }
+    return this.dataFsOk;
+  }
+
+  @Override
+  public FileSystem getFileSystem() {
+    return dataFs;
+  }
+
   protected abstract AbstractRpcServices getRpcService();
 
   protected abstract String getProcessName();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
index 2a76d7a..25cfa1a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
@@ -19,11 +19,14 @@ package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.AbstractRpcServices;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -33,11 +36,13 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 
 @InterfaceAudience.Private
 public class CSRpcServices extends AbstractRpcServices
@@ -46,8 +51,6 @@ public class CSRpcServices extends AbstractRpcServices
 
   private final HCompactionServer compactionServer;
 
-  // Request counter.
-  final LongAdder requestCount = new LongAdder();
   /** RPC scheduler to use for the compaction server. */
   public static final String COMPACTION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
       "hbase.compaction.server.rpc.scheduler.factory.class";
@@ -86,11 +89,25 @@ public class CSRpcServices extends AbstractRpcServices
    */
   @Override
   public CompactResponse requestCompaction(RpcController controller,
-      CompactionProtos.CompactRequest request) {
-    requestCount.increment();
+      CompactionProtos.CompactRequest request) throws ServiceException {
+    compactionServer.requestCount.increment();
+    ServerName rsServerName = ProtobufUtil.toServerName(request.getServer());
+    RegionInfo regionInfo = ProtobufUtil.toRegionInfo(request.getRegionInfo());
+    ColumnFamilyDescriptor cfd = 
ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
+    boolean major = request.getMajor();
+    int priority = request.getPriority();
+    List<HBaseProtos.ServerName> favoredNodes = 
Collections.singletonList(request.getServer());
     LOG.info("Receive compaction request from {}", 
ProtobufUtil.toString(request));
-    compactionServer.compactionThreadManager.requestCompaction();
-    return CompactionProtos.CompactResponse.newBuilder().build();
+    CompactionTask compactionTask =
+        
CompactionTask.newBuilder().setRsServerName(rsServerName).setRegionInfo(regionInfo)
+            
.setColumnFamilyDescriptor(cfd).setRequestMajor(major).setPriority(priority)
+            
.setFavoredNodes(favoredNodes).setSubmitTime(System.currentTimeMillis()).build();
+    try {
+      
compactionServer.compactionThreadManager.requestCompaction(compactionTask);
+      return CompactionProtos.CompactResponse.newBuilder().build();
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
   }
 
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionFilesCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionFilesCache.java
new file mode 100644
index 0000000..5499cc8
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionFilesCache.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+
+/**
+ * Since we do not maintain StoreFileManager in compaction server(can't 
refresh when flush). we use
+ * external storage(this class) to record compacting and compacted files. This 
storage is in memory
+ * and only used by CompactionServer,(RegionServer not use it).This storage 
never do hfile movement
+ * or deletion. RS does file movement still.
+ */
[email protected]
+class CompactionFilesCache {
+  private static Logger LOG = 
LoggerFactory.getLogger(CompactionFilesCache.class);
+  private final ConcurrentMap<String, ConcurrentMap<String, Set<String>>> 
selectedFiles =
+      new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, ConcurrentMap<String, Set<String>>> 
compactedFiles =
+      new ConcurrentHashMap<>();
+  /**
+   * Mark files as completed, called after CS finished compaction and RS 
accepted the results of
+   * this compaction, these compacted files will be deleted by RS if no reader 
referenced to them.
+   */
+  boolean addCompactedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
+      List<String> compactedFiles) {
+    Set<String> compactedFileSet = getCompactedStoreFilesInternal(regionInfo, 
cfd);
+    synchronized (compactedFileSet) {
+      compactedFileSet.addAll(compactedFiles);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Mark files as compacted, region: {}, cf, files: {}", 
regionInfo,
+          cfd.getNameAsString(), compactedFileSet);
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Mark files as selected, called after the files are selected and before 
the compaction is
+   * started. Avoid a file is selected twice in two compaction.
+   * @return True if these files don't be selected, false if these files are 
already selected.
+   */
+  boolean addSelectedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
+      List<String> selectedFiles) {
+    Set<String> selectedFileSet = getSelectedStoreFilesInternal(regionInfo, 
cfd);
+    synchronized (selectedFileSet) {
+      for (String selectedFile : selectedFiles) {
+        if (selectedFileSet.contains(selectedFile)) {
+          return false;
+        }
+      }
+      selectedFileSet.addAll(selectedFiles);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Mark files are selected, region: {}, cf: {}, files: {}",
+          regionInfo.getEncodedName(), cfd.getNameAsString(), selectedFiles);
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get files which are compacted, called before select files to do 
compaction. Thread-safe
+   * @return The files which are compacted
+   */
+  Set<String> getCompactedStoreFiles(RegionInfo regionInfo, 
ColumnFamilyDescriptor cfd) {
+    Set<String> compactedFiles = getStoreFiles(this.compactedFiles, 
regionInfo, cfd);
+    synchronized (compactedFiles) {
+      return ImmutableSet.copyOf(compactedFiles);
+    }
+  }
+
+  /**
+   * Get files which are compacting, called before select files to do 
compaction. Thread-safe
+   * @return The files which are compacting
+   */
+  Set<String> getSelectedStoreFiles(RegionInfo regionInfo, 
ColumnFamilyDescriptor cfd) {
+    Set<String> selectedFiles = getStoreFiles(this.selectedFiles, regionInfo, 
cfd);
+    synchronized (selectedFiles) {
+      return ImmutableSet.copyOf(selectedFiles);
+    }
+  }
+
+  /**
+   * Get files which are compacted, we use this return object as lock
+   * @return The files which are compacted as lock object
+   */
+  Object getCompactedStoreFilesAsLock(RegionInfo regionInfo, 
ColumnFamilyDescriptor cfd) {
+    return getCompactedStoreFilesInternal(regionInfo, cfd);
+  }
+
+  /**
+   * Get files which are compacting, we use this return object as lock
+   * @return The files which are compacting as lock object
+   */
+  Object getSelectedStoreFilesAsLock(RegionInfo regionInfo, 
ColumnFamilyDescriptor cfd) {
+    return getSelectedStoreFilesInternal(regionInfo, cfd);
+  }
+
+  /**
+   * Get files which are compacted, called before select files to do 
compaction. No-thread-safe
+   * @return The files which are compacted
+   */
+  private Set<String> getCompactedStoreFilesInternal(RegionInfo regionInfo,
+      ColumnFamilyDescriptor cfd) {
+    return getStoreFiles(this.compactedFiles, regionInfo, cfd);
+  }
+
+  /**
+   * Get files which are compacting, called before select files to do 
compaction. No-thread-safe
+   * @return The files which are compacting
+   */
+  private Set<String> getSelectedStoreFilesInternal(RegionInfo regionInfo,
+      ColumnFamilyDescriptor cfd) {
+    return getStoreFiles(this.selectedFiles, regionInfo, cfd);
+  }
+
+  private Set<String> getStoreFiles(
+      ConcurrentMap<String, ConcurrentMap<String, Set<String>>> fileMap, 
RegionInfo regionInfo,
+      ColumnFamilyDescriptor cfd) {
+    String encodedName = regionInfo.getEncodedName();
+    String family = cfd.getNameAsString();
+    Map<String, Set<String>> familyFilesMap =
+        fileMap.computeIfAbsent(encodedName, v -> new ConcurrentHashMap<>());
+    return familyFilesMap.computeIfAbsent(family, v -> new HashSet<>());
+  }
+
+  /**
+   * Remove files from selected, called:
+   * 1. after the compaction is failed;
+   * 2. after the compaction is finished and report to RS failed;
+   * 3. after the compaction is finished and report to RS succeeded (and will 
mark these files
+   *    as compacted).
+   */
+  void removeSelectedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
+      List<String> selectedFiles) {
+    Set<String> selectedFileSet = getSelectedStoreFilesInternal(regionInfo, 
cfd);
+    synchronized (selectedFileSet) {
+      selectedFileSet.removeAll(selectedFiles);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Remove files from selected, region: {}, cf: {}, files: {}",
+          regionInfo.getEncodedName(), cfd.getNameAsString(), selectedFiles);
+      }
+    }
+  }
+
+  /**
+   * Remove compacted files which are already deleted by RS
+   */
+  void cleanupCompactedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
+      Set<String> storeFileNames) {
+    Set<String> compactedFileSet = getCompactedStoreFilesInternal(regionInfo, 
cfd);
+    synchronized (compactedFileSet) {
+      compactedFileSet.retainAll(storeFileNames);
+    }
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionTask.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionTask.java
new file mode 100644
index 0000000..05e350f
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionTask.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.util.List;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+/**
+ * This class hold {@link CompactionContext} relate to CompactionRequest
+ */
[email protected]
+public final class CompactionTask {
+  private ServerName rsServerName;
+  private RegionInfo regionInfo;
+  private ColumnFamilyDescriptor cfd;
+  private CompactionContext compactionContext;
+  private HStore store;
+  private boolean requestMajor;
+  private int priority;
+  private List<String> selectedFileNames;
+  private MonitoredTask status;
+  private String taskName;
+  private List<HBaseProtos.ServerName> favoredNodes;
+  private long submitTime;
+
+  private CompactionTask() {
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private CompactionTask compactionTask = new CompactionTask();
+
+    Builder setRsServerName(ServerName rsServerName) {
+      compactionTask.rsServerName = rsServerName;
+      return this;
+    }
+
+    Builder setRegionInfo(RegionInfo regionInfo) {
+      compactionTask.regionInfo = regionInfo;
+      return this;
+    }
+
+    Builder setColumnFamilyDescriptor(ColumnFamilyDescriptor cfd) {
+      compactionTask.cfd = cfd;
+      return this;
+    }
+
+    Builder setRequestMajor(boolean requestMajor) {
+      compactionTask.requestMajor = requestMajor;
+      return this;
+    }
+
+    Builder setPriority(int priority) {
+      compactionTask.priority = priority;
+      return this;
+    }
+
+    Builder setFavoredNodes(List<HBaseProtos.ServerName> favoredNodes) {
+      compactionTask.favoredNodes = favoredNodes;
+      return this;
+    }
+
+    Builder setSubmitTime(long submitTime) {
+      compactionTask.submitTime = submitTime;
+      return this;
+    }
+
+    CompactionTask build() {
+      return compactionTask;
+    }
+  }
+
+  void setSelectedFileNames(List<String> selectedFileNames) {
+    this.selectedFileNames = selectedFileNames;
+  }
+
+  void setMonitoredTask(MonitoredTask status) {
+    this.status = status;
+  }
+
+  void setCompactionContext(CompactionContext compactionContext) {
+    this.compactionContext = compactionContext;
+  }
+
+  void setHStore(HStore store) {
+    this.store = store;
+  }
+
+  ServerName getRsServerName() {
+    return rsServerName;
+  }
+
+  RegionInfo getRegionInfo() {
+    return regionInfo;
+  }
+
+  ColumnFamilyDescriptor getCfd() {
+    return cfd;
+  }
+
+  CompactionContext getCompactionContext() {
+    return compactionContext;
+  }
+
+  HStore getStore() {
+    return store;
+  }
+
+  List<String> getSelectedFileNames() {
+    return selectedFileNames;
+  }
+
+  MonitoredTask getStatus() {
+    return status;
+  }
+
+  void setTaskName(String name) {
+    this.taskName = name;
+  }
+
+  String getTaskName() {
+    return taskName;
+  }
+
+  boolean isRequestMajor() {
+    return requestMajor;
+  }
+
+  int getPriority() {
+    return priority;
+  }
+
+  List<HBaseProtos.ServerName> getFavoredNodes() {
+    return favoredNodes;
+  }
+
+  long getSubmitTime() {
+    return submitTime;
+  }
+
+  void setPriority(int priority) {
+    this.priority = priority;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder("RS: ").append(rsServerName).append(", region: ")
+        .append(regionInfo.getRegionNameAsString()).append(", CF: 
").append(cfd.getNameAsString())
+        .append(", priority: ").append(priority).toString();
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
index e7a5068..2a04e9e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,41 +18,447 @@
 package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import 
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link 
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are 
core logic of
+ * compaction.
+ */
 @InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
   private static Logger LOG = 
LoggerFactory.getLogger(CompactionThreadManager.class);
+  // Configuration key for the large compaction threads.
+  private final static String LARGE_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.large";
+  private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+  // Configuration key for the small compaction threads.
+  private final static String SMALL_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.small";
+  private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
 
   private final Configuration conf;
-  private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
-      new ConcurrentHashMap<>();
   private final HCompactionServer server;
+  private Path rootDir;
+  private FSTableDescriptors tableDescriptors;
+  private CompactThreadControl compactThreadControl;
+  private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+      new ConcurrentHashMap<>();
+  private static CompactionFilesCache compactionFilesCache = new 
CompactionFilesCache();
 
-  public CompactionThreadManager(final Configuration conf, HCompactionServer 
server) {
+  CompactionThreadManager(final Configuration conf, HCompactionServer server) {
     this.conf = conf;
     this.server = server;
+    try {
+      this.rootDir = CommonFSUtils.getRootDir(this.conf);
+      this.tableDescriptors = new FSTableDescriptors(conf);
+      int largeThreads =
+          Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, 
LARGE_COMPACTION_THREADS_DEFAULT));
+      int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, 
SMALL_COMPACTION_THREADS_DEFAULT);
+      compactThreadControl = new CompactThreadControl(this, largeThreads, 
smallThreads,
+          COMPACTION_TASK_COMPARATOR, REJECTION);
+    } catch (Throwable t) {
+      LOG.error("Failed construction CompactionThreadManager", t);
+    }
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return server.getChoreService();
+  }
+
+  @Override
+  public double getCompactionPressure() {
+    double max = 0;
+    for (CompactionTask task : getRunningCompactionTasks().values()) {
+      double normCount = task.getStore().getCompactionPressure();
+      if (normCount > max) {
+        max = normCount;
+      }
+    }
+    return max;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
+
+  public void requestCompaction(CompactionTask compactionTask) throws 
IOException {
+    try {
+      selectFileAndExecuteTask(compactionTask);
+    } catch (Throwable e) {
+      LOG.error("Failed requestCompaction {}", compactionTask, e);
+      server.checkFileSystem();
+      // count the failed during selectFile
+      server.requestFailedCount.increment();
+      throw new IOException(e);
+    }
+  }
+
+  private void selectFileAndExecuteTask(CompactionTask compactionTask) throws 
IOException {
+    ServerName rsServerName = compactionTask.getRsServerName();
+    RegionInfo regionInfo = compactionTask.getRegionInfo();
+    ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+    String logStr = compactionTask.toString();
+    MonitoredTask status =
+        TaskMonitor.get().createStatus("Compacting region: " + 
regionInfo.getRegionNameAsString()
+            + ", family: " + cfd.getNameAsString() + " from RS: " + 
rsServerName);
+    status.enableStatusJournal(false);
+    // 1. select compaction and check compaction context is present
+    LOG.info("Start select compaction {}", compactionTask);
+    status.setStatus("Start select compaction");
+    HStore store;
+    CompactionContext compactionContext;
+    Pair<Boolean, List<String>> updateSelectedFilesCacheResult;
+    // the synchronized ensure file in store, selected files in cache, 
compacted files in cache,
+    // the three has consistent state, we need this condition to guarantee 
correct selection
+    synchronized 
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
+      synchronized 
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
+        Pair<HStore, Optional<CompactionContext>> pair = 
selectCompaction(regionInfo, cfd,
+          compactionTask.isRequestMajor(), compactionTask.getPriority(), 
status, logStr);
+        store = pair.getFirst();
+        Optional<CompactionContext> compaction = pair.getSecond();
+        if (!compaction.isPresent()) {
+          store.close();
+          LOG.info("Compaction context is empty: {}", compactionTask);
+          status.abort("Compaction context is empty and return");
+          return;
+        }
+        compactionContext = compaction.get();
+        // 2. update compactionFilesCache
+        updateSelectedFilesCacheResult =
+            updateStorageAfterSelectCompaction(regionInfo, cfd, 
compactionContext, status, logStr);
+      } // end of synchronized selected files
+    } // end of synchronized compacted files
+    if (!updateSelectedFilesCacheResult.getFirst()) {
+      store.close();
+      return;
+    }
+    List<String> selectedFileNames = 
updateSelectedFilesCacheResult.getSecond();
+    compactionTask.setHStore(store);
+    compactionTask.setCompactionContext(compactionContext);
+    compactionTask.setSelectedFileNames(selectedFileNames);
+    compactionTask.setMonitoredTask(status);
+    compactionTask.setPriority(compactionContext.getRequest().getPriority());
+    // 3. execute a compaction task
+    ThreadPoolExecutor pool;
+    pool = store.throttleCompaction(compactionContext.getRequest().getSize())
+        ? compactThreadControl.getLongCompactions()
+        : compactThreadControl.getShortCompactions();
+    pool.submit(new CompactionTaskRunner(compactionTask));
+  }
+
+  /**
+   * Open store, and select compaction context
+   * @return Store and CompactionContext
+   */
+  Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo 
regionInfo,
+      ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask 
status, String logStr)
+      throws IOException {
+    status.setStatus("Open store");
+    tableDescriptors.get(regionInfo.getTable());
+    HStore store = getStore(conf, server.getFileSystem(), rootDir,
+      tableDescriptors.get(regionInfo.getTable()), regionInfo, 
cfd.getNameAsString());
+
+    // CompactedHFilesDischarger only run on regionserver, so compactionserver 
does not have
+    // opportunity to clean compacted file at that time, we clean compacted 
files here
+    compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
+      store.getStorefiles().stream().map(sf -> 
sf.getPath().getName()).collect(Collectors.toSet()));
+    if (major) {
+      status.setStatus("Trigger major compaction");
+      store.triggerMajorCompaction();
+    }
+    // get current compacting and compacted files, NOTE: these files are file 
names only, don't
+    // include paths.
+    status.setStatus("Get current compacting and compacted files from 
compactionFilesCache");
+    Set<String> compactingFiles = 
compactionFilesCache.getSelectedStoreFiles(regionInfo, cfd);
+    Set<String> compactedFiles = 
compactionFilesCache.getCompactedStoreFiles(regionInfo, cfd);
+    Set<String> excludeFiles = new HashSet<>(compactingFiles);
+    excludeFiles.addAll(compactedFiles);
+    // Convert files names to store files
+    status.setStatus("Convert current compacting and compacted files to store 
files");
+    List<HStoreFile> excludeStoreFiles = getExcludedStoreFiles(store, 
excludeFiles);
+    LOG.info(
+      "Start select store: {}, excludeFileNames: {}, excluded: {}, compacting: 
{}, compacted: {}",
+      logStr, excludeFiles.size(), excludeStoreFiles.size(), 
compactingFiles.size(),
+      compactedFiles.size());
+    status.setStatus("Select store files to compaction, major: " + major);
+    Optional<CompactionContext> compaction = store.selectCompaction(priority,
+      CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
+    LOG.info("After select store: {}, if compaction context is present: {}", 
logStr,
+      compaction.isPresent());
+    return new Pair<>(store, compaction);
+  }
+
+  /**
+   * Mark files in compaction context as selected in compactionFilesCache
+   * @return True if success, otherwise if files are already in selected 
compactionFilesCache
+   */
+  private Pair<Boolean, List<String>> 
updateStorageAfterSelectCompaction(RegionInfo regionInfo,
+      ColumnFamilyDescriptor cfd, CompactionContext compactionContext, 
MonitoredTask status,
+      String logStr) {
+    LOG.info("Start update compactionFilesCache after select compaction: {}", 
logStr);
+    // save selected files to compactionFilesCache
+    List<String> selectedFilesNames = new ArrayList<>();
+    for (HStoreFile selectFile : compactionContext.getRequest().getFiles()) {
+      selectedFilesNames.add(selectFile.getFileInfo().getPath().getName());
+    }
+    if (compactionFilesCache.addSelectedFiles(regionInfo, cfd, 
selectedFilesNames)) {
+      LOG.info("Update compactionFilesCache after select compaction success: 
{}", logStr);
+      status.setStatus("Update compactionFilesCache after select compaction 
success");
+      return new Pair<>(Boolean.TRUE, selectedFilesNames);
+    } else {
+      //should not happen
+      LOG.info("selected files are already in store and return: {}", logStr);
+      status.abort("Selected files are already in compactionFilesCache and 
return");
+      return new Pair<>(Boolean.FALSE, Collections.EMPTY_LIST);
+    }
+  }
+
+  /**
+   * Execute compaction in the process of compaction server
+   */
+  private void doCompaction(CompactionTask compactionTask) throws IOException {
+    RegionInfo regionInfo = compactionTask.getRegionInfo();
+    ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+    HStore store = compactionTask.getStore();
+    CompactionContext compactionContext = 
compactionTask.getCompactionContext();
+    List<String> selectedFileNames = compactionTask.getSelectedFileNames();
+    MonitoredTask status = compactionTask.getStatus();
+    try {
+      LOG.info("Start compact store: {}, cf: {}, compaction context: {}", 
store, cfd,
+        compactionContext);
+      List<Path> newFiles =
+          
compactionContext.compact(compactThreadControl.getCompactionThroughputController(),
 null);
+      LOG.info("Finish compact store: {}, cf: {}, new files: {}", store, cfd, 
newFiles);
+      List<String> newFileNames = new ArrayList<>();
+      for (Path newFile : newFiles) {
+        newFileNames.add(newFile.getName());
+      }
+      reportCompactionCompleted(compactionTask, newFileNames, status);
+    } finally {
+      status.setStatus("Remove selected files");
+      LOG.info("Remove selected files: {}", compactionTask);
+      compactionFilesCache.removeSelectedFiles(regionInfo, cfd, 
selectedFileNames);
+    }
   }
 
-  private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws 
IOException {
-    AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
-    if (admin == null) {
-      LOG.debug("New RS admin connection to {}", sn);
-      admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
-      this.rsAdmins.put(sn, admin);
+  /**
+   * Report compaction completed to RS
+   */
+  private void reportCompactionCompleted(CompactionTask task, List<String> 
newFiles,
+      MonitoredTask status) {
+    ServerName rsServerName = task.getRsServerName();
+    RegionInfo regionInfo = task.getRegionInfo();
+    ColumnFamilyDescriptor cfd = task.getCfd();
+    List<String> selectedFileNames = task.getSelectedFileNames();
+    boolean newForceMajor = task.getStore().getForceMajor();
+    Builder builder =
+        
CompleteCompactionRequest.newBuilder().setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
+            
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setNewForceMajor(newForceMajor);
+    // use file name only, dose not include path, because the size of protobuf 
is too big
+    for (String selectFile : selectedFileNames) {
+      builder.addSelectedFiles(selectFile);
+    }
+    for (String newFile : newFiles) {
+      builder.addNewFiles(newFile);
+    }
+    CompleteCompactionRequest completeCompactionRequest = builder.build();
+    AsyncRegionServerAdmin rsAdmin = getRsAdmin(rsServerName);
+    try {
+      status
+          .setStatus("Report complete compaction to RS: " + rsServerName + ", 
selected file size: "
+              + selectedFileNames.size() + ", new file size: " + 
newFiles.size());
+      LOG.info("Report complete compaction: {}, selectedFileSize: {}, 
newFileSize: {}", task,
+        completeCompactionRequest.getSelectedFilesList().size(),
+        completeCompactionRequest.getNewFilesList().size());
+      CompleteCompactionResponse completeCompactionResponse =
+          
FutureUtils.get(rsAdmin.completeCompaction(completeCompactionRequest));
+      if (completeCompactionResponse.getSuccess()) {
+        status.markComplete("Report to RS succeeded and RS accepted");
+        // move selected files to compacted files
+        compactionFilesCache.addCompactedFiles(regionInfo, cfd, 
selectedFileNames);
+        LOG.info("Compaction manager request complete compaction success. {}", 
task);
+      } else {
+        //TODO: maybe region is move, we need get latest regionserver name and 
retry
+        status.abort("Report to RS succeeded but RS denied");
+        LOG.warn("Compaction manager request complete compaction fail. {}", 
task);
+      }
+    } catch (IOException e) {
+      //TODO: rpc call broken, add retry
+      status.abort("Report to RS failed");
+      LOG.error("Compaction manager request complete compaction error. {}", 
task, e);
     }
-    return admin;
   }
 
-  public void requestCompaction() {
+  private List<HStoreFile> getExcludedStoreFiles(HStore store, Set<String> 
excludeFileNames) {
+    Collection<HStoreFile> storefiles = store.getStorefiles();
+    List<HStoreFile> storeFiles = new ArrayList<>();
+    for (HStoreFile storefile : storefiles) {
+      String name = storefile.getPath().getName();
+      if (excludeFileNames.contains(name)) {
+        storeFiles.add(storefile);
+      }
+    }
+    return storeFiles;
+  }
+
+  private HStore getStore(final Configuration conf, final FileSystem fs, final 
Path rootDir,
+      final TableDescriptor htd, final RegionInfo hri, final String 
familyName) throws IOException {
+    HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs,
+        CommonFSUtils.getTableDir(rootDir, htd.getTableName()), hri);
+    HRegion region = new HRegion(regionFs, null, conf, htd, null);
+    HStore store = new HStore(region, 
htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
+    OptionalLong maxSequenceId = store.getMaxSequenceId();
+    LOG.info("store max sequence id: {}", maxSequenceId.orElse(0));
+    region.getMVCC().advanceTo(maxSequenceId.orElse(0));
+    return store;
+  }
+
+  private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) {
+    return server.getAsyncClusterConnection().getRegionServerAdmin(sn);
+  }
+
+  ConcurrentHashMap<String, CompactionTask> getRunningCompactionTasks() {
+    return runningCompactionTasks;
+  }
+
+  void waitForStop() {
+    compactThreadControl.waitForStop();
+  }
+
+  private void executeCompaction(CompactionTask compactionTask) {
+    try {
+      String taskName = compactionTask.getRsServerName() + "-"
+          + compactionTask.getRegionInfo().getRegionNameAsString() + "-"
+          + compactionTask.getCfd().getNameAsString() + "-" + 
System.currentTimeMillis();
+      compactionTask.setTaskName(taskName);
+      runningCompactionTasks.put(compactionTask.getTaskName(), compactionTask);
+      doCompaction(compactionTask);
+    } catch (Throwable e) {
+      LOG.error("Execute compaction task error: {}", compactionTask, e);
+      server.checkFileSystem();
+      //count the failed during do compaction
+      server.requestFailedCount.increment();
+    } finally {
+      runningCompactionTasks.remove(compactionTask.getTaskName());
+      if (compactionTask.getStore() != null) {
+        try {
+          compactionTask.getStore().close();
+        } catch (IOException e) {
+          LOG.warn("Failed to close store: {}", compactionTask, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Cleanup class to use when rejecting a compaction request from the queue.
+   */
+  private static BiConsumer<Runnable, ThreadPoolExecutor> REJECTION = 
(runnable, pool) -> {
+    {
+      if (runnable instanceof CompactionTaskRunner) {
+        CompactionTaskRunner runner = (CompactionTaskRunner) runnable;
+        LOG.info("Compaction Rejected: " + runner);
+        CompactionTask task = runner.getCompactionTask();
+        if (task != null) {
+          compactionFilesCache.removeSelectedFiles(task.getRegionInfo(), 
task.getCfd(),
+            task.getSelectedFileNames());
+        }
+      }
+    }
+  };
+
+  protected class CompactionTaskRunner implements Runnable {
+    private CompactionTask compactionTask;
+
+    CompactionTaskRunner(CompactionTask compactionTask) {
+      this.compactionTask = compactionTask;
+    }
+
+    @Override
+    public void run() {
+      executeCompaction(compactionTask);
+    }
+
+    CompactionTask getCompactionTask() {
+      return compactionTask;
+    }
   }
 
+  private static final Comparator<Runnable> COMPACTION_TASK_COMPARATOR =
+    (Runnable r1, Runnable r2) -> {
+      // CompactionRunner first
+      if (r1 instanceof CompactionTaskRunner) {
+        if (!(r2 instanceof CompactionTaskRunner)) {
+          return -1;
+        }
+      } else {
+        if (r2 instanceof CompactionTaskRunner) {
+          return 1;
+        } else {
+          // break the tie based on hash code
+          return System.identityHashCode(r1) - System.identityHashCode(r2);
+        }
+      }
+      CompactionTask o1 = ((CompactionTaskRunner) r1).getCompactionTask();
+      CompactionTask o2 = ((CompactionTaskRunner) r2).getCompactionTask();
+      int cmp = Integer.compare(o1.getPriority(), o2.getPriority());
+      if (cmp == 0) {
+        return Long.compare(o1.getSubmitTime(), o2.getSubmitTime());
+      }
+      return cmp;
+    };
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
index cd09480..f78e58a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.AbstractServer;
@@ -27,7 +29,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.security.SecurityConstants;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -55,6 +59,11 @@ public class HCompactionServer extends AbstractServer {
   /** compaction server process name */
   public static final String COMPACTIONSERVER = "compactionserver";
   private static final Logger LOG = 
LoggerFactory.getLogger(HCompactionServer.class);
+  // Request counter.
+  final LongAdder requestCount = new LongAdder();
+  final LongAdder requestFailedCount = new LongAdder();
+  // ChoreService used to schedule tasks that we want to run periodically
+  private ChoreService choreService;
 
   @Override
   protected String getProcessName() {
@@ -68,14 +77,13 @@ public class HCompactionServer extends AbstractServer {
 
   @Override
   public ChoreService getChoreService() {
-    return null;
+    return choreService;
   }
 
   protected final CSRpcServices rpcServices;
 
   // Stub to do compaction server status calls against the master.
   private volatile CompactionServerStatusService.BlockingInterface cssStub;
-
   CompactionThreadManager compactionThreadManager;
   /**
    * Get the current master from ZooKeeper and open the RPC connection to it. 
To get a fresh
@@ -120,12 +128,14 @@ public class HCompactionServer extends AbstractServer {
     // login the server principal (if using secure Hadoop)
     login(userProvider, this.rpcServices.getIsa().getHostName());
     Superusers.initialize(conf);
+    this.dataFs = new HFileSystem(this.conf, true);
+    this.choreService = new ChoreService(getName(), true);
     this.compactionThreadManager = new CompactionThreadManager(conf, this);
     this.rpcServices.start();
   }
 
   @Override
-  protected CSRpcServices getRpcService(){
+  protected CSRpcServices getRpcService() {
     return rpcServices;
   }
 
@@ -138,9 +148,20 @@ public class HCompactionServer extends AbstractServer {
     long reportEndTime) {
     ClusterStatusProtos.CompactionServerLoad.Builder serverLoad =
       ClusterStatusProtos.CompactionServerLoad.newBuilder();
-    serverLoad.setCompactedCells(0);
-    serverLoad.setCompactingCells(0);
-    serverLoad.setTotalNumberOfRequests(rpcServices.requestCount.sum());
+    Collection<CompactionTask> tasks = 
compactionThreadManager.getRunningCompactionTasks().values();
+    long compactingCells = 0;
+    long compactedCells = 0;
+    for (CompactionTask compactionTask : tasks) {
+      serverLoad.addCompactionTasks(compactionTask.getTaskName());
+      CompactionProgress progress = 
compactionTask.getStore().getCompactionProgress();
+      if (progress != null) {
+        compactedCells += progress.getCurrentCompactedKvs();
+        compactingCells += progress.getTotalCompactingKVs();
+      }
+    }
+    serverLoad.setCompactedCells(compactedCells);
+    serverLoad.setCompactingCells(compactingCells);
+    serverLoad.setTotalNumberOfRequests(requestCount.sum());
     serverLoad.setReportStartTime(reportStartTime);
     serverLoad.setReportEndTime(reportEndTime);
     return serverLoad.build();
@@ -209,7 +230,7 @@ public class HCompactionServer extends AbstractServer {
       // We registered with the Master. Go into run mode.
       long lastMsg = System.currentTimeMillis();
       // The main run loop.
-      while (!isStopped()) {
+      while (!isStopped() && this.dataFsOk) {
         long now = System.currentTimeMillis();
         if ((now - lastMsg) >= msgInterval) {
           if (tryCompactionServerReport(lastMsg, now) && !online.get()) {
@@ -231,6 +252,16 @@ public class HCompactionServer extends AbstractServer {
         abort(prefix + t.getMessage(), t);
       }
     }
+    stopChores();
+    if (this.compactionThreadManager != null) {
+      this.compactionThreadManager.waitForStop();
+    }
+  }
+
+  private void stopChores() {
+    if (this.choreService != null) {
+      choreService.shutdown();
+    }
   }
 
   @Override
@@ -244,7 +275,6 @@ public class HCompactionServer extends AbstractServer {
     stop(msg);
   }
 
-
   @Override
   public void stop(final String msg) {
     if (!this.stopped) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
index b0e80d1..a88fc73 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
@@ -22,8 +22,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.CompactionServerMetrics;
@@ -59,8 +57,6 @@ public class CompactionOffloadManager {
   private CompactionOffloadSwitchStorage compactionOffloadSwitchStorage;
   private static final Logger LOG =
       LoggerFactory.getLogger(CompactionOffloadManager.class.getName());
-  private final ConcurrentMap<ServerName, AsyncCompactionServerService> 
csStubs =
-      new ConcurrentHashMap<>();
 
   public CompactionOffloadManager(final MasterServices master) {
     this.masterServices = master;
@@ -164,14 +160,8 @@ public class CompactionOffloadManager {
     return compactionServerList.get((int) index);
   }
 
-  private AsyncCompactionServerService getCsStub(final ServerName sn) throws 
IOException {
-    AsyncCompactionServerService csStub = this.csStubs.get(sn);
-    if (csStub == null) {
-      LOG.debug("New CS stub connection to {}", sn);
-      csStub = 
this.masterServices.getAsyncClusterConnection().getCompactionServerService(sn);
-      this.csStubs.put(sn, csStub);
-    }
-    return csStub;
+  private AsyncCompactionServerService getCsStub(final ServerName sn) {
+    return 
this.masterServices.getAsyncClusterConnection().getCompactionServerService(sn);
   }
 
   public CompactResponse requestCompaction(CompactRequest request) throws 
ServiceException {
@@ -180,9 +170,10 @@ public class CompactionOffloadManager {
       ProtobufUtil.toString(request), targetCompactionServer);
     try {
       
FutureUtils.get(getCsStub(targetCompactionServer).requestCompaction(request));
+      return CompactResponse.newBuilder().build();
     } catch (Throwable t) {
       LOG.error("requestCompaction from master to CS error: {}", t);
+      throw new ServiceException(t);
     }
-    return null;
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index 441b18b..9c431f9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -30,10 +30,10 @@ import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.function.IntSupplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
@@ -48,7 +48,6 @@ import 
org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.StealJobQueue;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -87,12 +86,9 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
 
   private final HRegionServer server;
   private final Configuration conf;
-  private volatile ThreadPoolExecutor longCompactions;
-  private volatile ThreadPoolExecutor shortCompactions;
+  private final CompactThreadControl compactThreadControl;
   private volatile ThreadPoolExecutor splits;
 
-  private volatile ThroughputController compactionThroughputController;
-
   private volatile boolean compactionsEnabled;
   /**
    * Splitting should not take place if the total number of regions exceed 
this.
@@ -105,12 +101,14 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
     this.server = server;
     this.conf = server.getConfiguration();
     this.compactionsEnabled = 
this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
-    createCompactionExecutors();
+    this.regionSplitLimit =
+      conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, 
DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
+    int largeThreads =
+      Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, 
LARGE_COMPACTION_THREADS_DEFAULT));
+    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, 
SMALL_COMPACTION_THREADS_DEFAULT);
+    compactThreadControl =
+        new CompactThreadControl(server, largeThreads, smallThreads, 
COMPARATOR, REJECTION);
     createSplitExcecutors();
-
-    // compaction throughput controller
-    this.compactionThroughputController =
-        CompactionThroughputControllerFactory.create(server, conf);
   }
 
   private void createSplitExcecutors() {
@@ -120,65 +118,18 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
       new ThreadFactoryBuilder().setNameFormat(n + 
"-splits-%d").setDaemon(true).build());
   }
 
-  private void createCompactionExecutors() {
-    this.regionSplitLimit =
-        conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, 
DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
-
-    int largeThreads =
-        Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, 
LARGE_COMPACTION_THREADS_DEFAULT));
-    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, 
SMALL_COMPACTION_THREADS_DEFAULT);
-
-    // if we have throttle threads, make sure the user also specified size
-    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
-
-    final String n = Thread.currentThread().getName();
-
-    StealJobQueue<Runnable> stealJobQueue = new 
StealJobQueue<Runnable>(COMPARATOR);
-    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 
60, TimeUnit.SECONDS,
-        stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + 
"-longCompactions-%d")
-            .setDaemon(true).build());
-    this.longCompactions.setRejectedExecutionHandler(new Rejection());
-    this.longCompactions.prestartAllCoreThreads();
-    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 
60, TimeUnit.SECONDS,
-        stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
-            .setNameFormat(n + 
"-shortCompactions-%d").setDaemon(true).build());
-    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
-  }
-
   @Override
   public String toString() {
-    return "compactionQueue=(longCompactions="
-        + longCompactions.getQueue().size() + ":shortCompactions="
-        + shortCompactions.getQueue().size() + ")"
+    return compactThreadControl.toString()
         + ", splitQueue=" + splits.getQueue().size();
   }
 
   public String dumpQueue() {
-    StringBuilder queueLists = new StringBuilder();
-    queueLists.append("Compaction/Split Queue dump:\n");
-    queueLists.append("  LargeCompation Queue:\n");
-    BlockingQueue<Runnable> lq = longCompactions.getQueue();
-    Iterator<Runnable> it = lq.iterator();
-    while (it.hasNext()) {
-      queueLists.append("    " + it.next().toString());
-      queueLists.append("\n");
-    }
-
-    if (shortCompactions != null) {
-      queueLists.append("\n");
-      queueLists.append("  SmallCompation Queue:\n");
-      lq = shortCompactions.getQueue();
-      it = lq.iterator();
-      while (it.hasNext()) {
-        queueLists.append("    " + it.next().toString());
-        queueLists.append("\n");
-      }
-    }
-
+    StringBuilder queueLists = compactThreadControl.dumpQueue();
     queueLists.append("\n");
     queueLists.append("  Split Queue:\n");
-    lq = splits.getQueue();
-    it = lq.iterator();
+    BlockingQueue<Runnable> lq = splits.getQueue();
+    Iterator<Runnable> it = lq.iterator();
     while (it.hasNext()) {
       queueLists.append("    " + it.next().toString());
       queueLists.append("\n");
@@ -230,12 +181,15 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
   }
 
   private void interrupt() {
-    longCompactions.shutdownNow();
-    shortCompactions.shutdownNow();
+    compactThreadControl.getLongCompactions().shutdownNow();
+    compactThreadControl.getShortCompactions().shutdownNow();
   }
 
   private void reInitializeCompactionsExecutors() {
-    createCompactionExecutors();
+    int largeThreads =
+        Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, 
LARGE_COMPACTION_THREADS_DEFAULT));
+    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, 
SMALL_COMPACTION_THREADS_DEFAULT);
+    compactThreadControl.createCompactionExecutors(largeThreads, smallThreads, 
COMPARATOR);
   }
 
   private interface CompactionCompleteTracker {
@@ -355,18 +309,19 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
     if (selectNow) {
       // compaction.get is safe as we will just return if selectNow is true 
but no compaction is
       // selected
-      pool = store.throttleCompaction(compaction.getRequest().getSize()) ? 
longCompactions
-          : shortCompactions;
+      pool = store.throttleCompaction(compaction.getRequest().getSize())
+          ? compactThreadControl.getLongCompactions()
+          : compactThreadControl.getShortCompactions();
     } else {
       // We assume that most compactions are small. So, put system compactions 
into small
       // pool; we will do selection there, and move to large pool if necessary.
-      pool = shortCompactions;
+      pool = compactThreadControl.getShortCompactions();
     }
     pool.execute(
       new CompactionRunner(store, region, compaction, tracker, 
completeTracker, pool, user));
     region.incrementCompactionsQueuedCount();
     if (LOG.isDebugEnabled()) {
-      String type = (pool == shortCompactions) ? "Small " : "Large ";
+      String type = (pool == compactThreadControl.getShortCompactions()) ? 
"Small " : "Large ";
       LOG.debug(type + "Compaction requested: " + (selectNow ? 
compaction.toString() : "system")
           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " 
+ this);
     }
@@ -407,8 +362,8 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
    */
   void interruptIfNecessary() {
     splits.shutdown();
-    longCompactions.shutdown();
-    shortCompactions.shutdown();
+    compactThreadControl.getLongCompactions().shutdown();
+    compactThreadControl.getShortCompactions().shutdown();
   }
 
   private void waitFor(ThreadPoolExecutor t, String name) {
@@ -429,8 +384,7 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
 
   void join() {
     waitFor(splits, "Split Thread");
-    waitFor(longCompactions, "Large Compaction Thread");
-    waitFor(shortCompactions, "Small Compaction Thread");
+    compactThreadControl.waitForStop();
   }
 
   /**
@@ -440,16 +394,17 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
    * @return The current size of the regions queue.
    */
   public int getCompactionQueueSize() {
-    return longCompactions.getQueue().size() + 
shortCompactions.getQueue().size();
+    return compactThreadControl.getLongCompactions().getQueue().size()
+        + compactThreadControl.getShortCompactions().getQueue().size();
   }
 
   public int getLargeCompactionQueueSize() {
-    return longCompactions.getQueue().size();
+    return compactThreadControl.getLongCompactions().getQueue().size();
   }
 
 
   public int getSmallCompactionQueueSize() {
-    return shortCompactions.getQueue().size();
+    return compactThreadControl.getShortCompactions().getQueue().size();
   }
 
   public int getSplitQueueSize() {
@@ -591,12 +546,14 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
         // Now see if we are in correct pool for the size; if not, go to the 
correct one.
         // We might end up waiting for a while, so cancel the selection.
 
-        ThreadPoolExecutor pool =
-            store.throttleCompaction(c.getRequest().getSize()) ? 
longCompactions : shortCompactions;
+        ThreadPoolExecutor pool = 
store.throttleCompaction(c.getRequest().getSize())
+            ? compactThreadControl.getLongCompactions()
+            : compactThreadControl.getShortCompactions();
 
         // Long compaction pool can process small job
         // Short compaction pool should not process large job
-        if (this.parent == shortCompactions && pool == longCompactions) {
+        if (this.parent == compactThreadControl.getShortCompactions()
+            && pool == compactThreadControl.getLongCompactions()) {
           this.store.cancelRequestedCompaction(c);
           this.parent = pool;
           this.parent.execute(this);
@@ -611,13 +568,13 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
       tracker.beforeExecution(store);
       try {
         // Note: please don't put single-compaction logic here;
-        //       put it into region/store/etc. This is CST logic.
+        // put it into region/store/etc. This is CST logic.
         long start = EnvironmentEdgeManager.currentTime();
-        boolean completed =
-            region.compact(c, store, compactionThroughputController, user);
+        boolean completed = region.compact(c, store,
+          compactThreadControl.getCompactionThroughputController(), user);
         long now = EnvironmentEdgeManager.currentTime();
-        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " +
-              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
+        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + 
this + "; duration="
+            + StringUtils.formatTimeDiff(now, start));
         if (completed) {
           // degenerate case: blocked regions require recursive enqueues
           if (store.getCompactPriority() <= 0) {
@@ -671,18 +628,15 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
   /**
    * Cleanup class to use when rejecting a compaction request from the queue.
    */
-  private static class Rejection implements RejectedExecutionHandler {
-    @Override
-    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
-      if (runnable instanceof CompactionRunner) {
-        CompactionRunner runner = (CompactionRunner) runnable;
-        LOG.debug("Compaction Rejected: " + runner);
-        if (runner.compaction != null) {
-          runner.store.cancelRequestedCompaction(runner.compaction);
-        }
+  private static BiConsumer<Runnable, ThreadPoolExecutor> REJECTION = 
(runnable, pool) -> {
+    if (runnable instanceof CompactionRunner) {
+      CompactionRunner runner = (CompactionRunner) runnable;
+      LOG.debug("Compaction Rejected: " + runner);
+      if (runner.compaction != null) {
+        runner.store.cancelRequestedCompaction(runner.compaction);
       }
     }
-  }
+  };
 
   /**
    * {@inheritDoc}
@@ -697,31 +651,31 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
     int largeThreads = Math.max(1, newConf.getInt(
             LARGE_COMPACTION_THREADS,
             LARGE_COMPACTION_THREADS_DEFAULT));
-    if (this.longCompactions.getCorePoolSize() != largeThreads) {
+    if (compactThreadControl.getLongCompactions().getCorePoolSize() != 
largeThreads) {
       LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
-              " from " + this.longCompactions.getCorePoolSize() + " to " +
+              " from " + 
compactThreadControl.getLongCompactions().getCorePoolSize() + " to " +
               largeThreads);
-      if(this.longCompactions.getCorePoolSize() < largeThreads) {
-        this.longCompactions.setMaximumPoolSize(largeThreads);
-        this.longCompactions.setCorePoolSize(largeThreads);
+      if(compactThreadControl.getLongCompactions().getCorePoolSize() < 
largeThreads) {
+        
compactThreadControl.getLongCompactions().setMaximumPoolSize(largeThreads);
+        
compactThreadControl.getLongCompactions().setCorePoolSize(largeThreads);
       } else {
-        this.longCompactions.setCorePoolSize(largeThreads);
-        this.longCompactions.setMaximumPoolSize(largeThreads);
+        
compactThreadControl.getLongCompactions().setCorePoolSize(largeThreads);
+        
compactThreadControl.getLongCompactions().setMaximumPoolSize(largeThreads);
       }
     }
 
     int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
             SMALL_COMPACTION_THREADS_DEFAULT);
-    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
+    if (compactThreadControl.getShortCompactions().getCorePoolSize() != 
smallThreads) {
       LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
-                " from " + this.shortCompactions.getCorePoolSize() + " to " +
+                " from " + 
compactThreadControl.getShortCompactions().getCorePoolSize() + " to " +
                 smallThreads);
-      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
-        this.shortCompactions.setMaximumPoolSize(smallThreads);
-        this.shortCompactions.setCorePoolSize(smallThreads);
+      if(compactThreadControl.getShortCompactions().getCorePoolSize() < 
smallThreads) {
+        
compactThreadControl.getShortCompactions().setMaximumPoolSize(smallThreads);
+        
compactThreadControl.getShortCompactions().setCorePoolSize(smallThreads);
       } else {
-        this.shortCompactions.setCorePoolSize(smallThreads);
-        this.shortCompactions.setMaximumPoolSize(smallThreads);
+        
compactThreadControl.getShortCompactions().setCorePoolSize(smallThreads);
+        
compactThreadControl.getShortCompactions().setMaximumPoolSize(smallThreads);
       }
     }
 
@@ -740,12 +694,12 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
       }
     }
 
-    ThroughputController old = this.compactionThroughputController;
+    ThroughputController old = 
compactThreadControl.getCompactionThroughputController();
     if (old != null) {
       old.stop("configuration change");
     }
-    this.compactionThroughputController =
-        CompactionThroughputControllerFactory.create(server, newConf);
+    compactThreadControl.setCompactionThroughputController(
+      CompactionThroughputControllerFactory.create(server, newConf));
 
     // We change this atomically here instead of reloading the config in order 
that upstream
     // would be the only one with the flexibility to reload the config.
@@ -753,11 +707,11 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
   }
 
   protected int getSmallCompactionThreadNum() {
-    return this.shortCompactions.getCorePoolSize();
+    return compactThreadControl.getShortCompactions().getCorePoolSize();
   }
 
   protected int getLargeCompactionThreadNum() {
-    return this.longCompactions.getCorePoolSize();
+    return compactThreadControl.getLongCompactions().getCorePoolSize();
   }
 
   protected int getSplitThreadNum() {
@@ -781,7 +735,7 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
   }
 
   public ThroughputController getCompactionThroughputController() {
-    return compactionThroughputController;
+    return compactThreadControl.getCompactionThroughputController();
   }
 
   /**
@@ -790,15 +744,15 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
    * from short compaction queue
    */
   void shutdownLongCompactions(){
-    this.longCompactions.shutdown();
+    compactThreadControl.getLongCompactions().shutdown();
   }
 
   public void clearLongCompactionsQueue() {
-    longCompactions.getQueue().clear();
+    compactThreadControl.getLongCompactions().getQueue().clear();
   }
 
   public void clearShortCompactionsQueue() {
-    shortCompactions.getQueue().clear();
+    compactThreadControl.getShortCompactions().getQueue().clear();
   }
 
   public boolean isCompactionsEnabled() {
@@ -814,14 +768,14 @@ public class CompactSplit implements CompactionRequester, 
PropagatingConfigurati
    * @return the longCompactions thread pool executor
    */
   ThreadPoolExecutor getLongCompactions() {
-    return longCompactions;
+    return compactThreadControl.getLongCompactions();
   }
 
   /**
    * @return the shortCompactions thread pool executor
    */
   ThreadPoolExecutor getShortCompactions() {
-    return shortCompactions;
+    return compactThreadControl.getShortCompactions();
   }
 
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactThreadControl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactThreadControl.java
new file mode 100644
index 0000000..7364771
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactThreadControl.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import 
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import 
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class help manage compaction thread pools and compaction throughput 
controller,
+ * @see CompactSplit
+ * @see org.apache.hadoop.hbase.compactionserver.CompactionThreadManager
+ */
[email protected]
+public class CompactThreadControl {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactThreadControl.class);
+  private volatile ThreadPoolExecutor longCompactions;
+  private volatile ThreadPoolExecutor shortCompactions;
+  private volatile ThroughputController compactionThroughputController;
+  private BiConsumer<Runnable, ThreadPoolExecutor> rejection;
+
+  public CompactThreadControl(ThroughputControllerService server, int 
largeThreads,
+      int smallThreads, Comparator<Runnable> cmp,
+      BiConsumer<Runnable, ThreadPoolExecutor> rejection) {
+    createCompactionExecutors(largeThreads, smallThreads, cmp);
+
+    // compaction throughput controller
+    this.compactionThroughputController =
+        CompactionThroughputControllerFactory.create(server, 
server.getConfiguration());
+    // compaction throughput controller
+    this.rejection = rejection;
+  }
+
+  /**
+   * Cleanup class to use when rejecting a compaction request from the queue.
+   */
+  private class Rejection implements RejectedExecutionHandler {
+    @Override
+    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+      rejection.accept(runnable, pool);
+    }
+  }
+
+  void createCompactionExecutors(int largeThreads, int smallThreads, 
Comparator<Runnable> cmp) {
+    // if we have throttle threads, make sure the user also specified size
+    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
+
+    final String n = Thread.currentThread().getName();
+
+    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(cmp);
+    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 
60, TimeUnit.SECONDS,
+        stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + 
"-longCompactions-%d")
+            .setDaemon(true).build());
+    this.longCompactions.setRejectedExecutionHandler(new Rejection());
+    this.longCompactions.prestartAllCoreThreads();
+    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 
60, TimeUnit.SECONDS,
+        stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
+            .setNameFormat(n + 
"-shortCompactions-%d").setDaemon(true).build());
+    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
+  }
+
+  @Override
+  public String toString() {
+    return "compactionQueue=(longCompactions=" + 
longCompactions.getQueue().size()
+        + ":shortCompactions=" + shortCompactions.getQueue().size() + ")";
+  }
+
+  public StringBuilder dumpQueue() {
+    StringBuilder queueLists = new StringBuilder();
+    queueLists.append("Compaction/Split Queue dump:\n");
+    queueLists.append("  LargeCompation Queue:\n");
+    BlockingQueue<Runnable> lq = longCompactions.getQueue();
+    Iterator<Runnable> it = lq.iterator();
+    while (it.hasNext()) {
+      queueLists.append("    " + it.next().toString());
+      queueLists.append("\n");
+    }
+
+    if (shortCompactions != null) {
+      queueLists.append("\n");
+      queueLists.append("  SmallCompation Queue:\n");
+      lq = shortCompactions.getQueue();
+      it = lq.iterator();
+      while (it.hasNext()) {
+        queueLists.append("    " + it.next().toString());
+        queueLists.append("\n");
+      }
+    }
+    return queueLists;
+  }
+
+  public ThreadPoolExecutor getLongCompactions() {
+    return longCompactions;
+  }
+
+  public ThreadPoolExecutor getShortCompactions() {
+    return shortCompactions;
+  }
+
+  void setCompactionThroughputController(ThroughputController 
compactionThroughputController) {
+    this.compactionThroughputController = compactionThroughputController;
+  }
+
+  public ThroughputController getCompactionThroughputController() {
+    return compactionThroughputController;
+  }
+
+  public void waitForStop() {
+    waitForPoolStop(longCompactions, "Large Compaction Thread");
+    waitForPoolStop(shortCompactions, "Small Compaction Thread");
+  }
+
+  private void waitForPoolStop(ThreadPoolExecutor t, String name) {
+    if (t == null) {
+      return;
+    }
+    try {
+      t.shutdown();
+      t.awaitTermination(60, TimeUnit.SECONDS);
+    } catch (InterruptedException ie) {
+      LOG.warn("Interrupted waiting for " + name + " to finish...");
+      Thread.currentThread().interrupt();
+      t.shutdownNow();
+    }
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 73234f1..cc34c6d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -100,7 +100,7 @@ public class HRegionFileSystem {
    * @param tableDir {@link Path} to where the table is being stored
    * @param regionInfo {@link RegionInfo} for region
    */
-  HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path 
tableDir,
+  public HRegionFileSystem(final Configuration conf, final FileSystem fs, 
final Path tableDir,
       final RegionInfo regionInfo) {
     this.fs = fs;
     this.conf = conf;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index e49ad87..b2df9b7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -327,8 +327,6 @@ public class HRegionServer extends AbstractServer implements
   // Instance of the hbase executor executorService.
   protected ExecutorService executorService;
 
-  private volatile boolean dataFsOk;
-  private HFileSystem dataFs;
   private HFileSystem walFs;
 
   // Go down hard. Used if file system becomes unavailable and also in
@@ -346,7 +344,6 @@ public class HRegionServer extends AbstractServer implements
   private volatile boolean killed = false;
   private volatile boolean shutDown = false;
 
-
   private Path dataRootDir;
   private Path walRootDir;
 
@@ -533,7 +530,6 @@ public class HRegionServer extends AbstractServer implements
     super(conf, "RegionServer"); // thread name
     TraceUtil.initTracer(conf);
     try {
-      this.dataFsOk = true;
       this.eventLoopGroupConfig = setupNetty(this.conf);
       MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
       HFile.checkHFileVersion(this.conf);
@@ -2836,11 +2832,6 @@ public class HRegionServer extends AbstractServer 
implements
     return dataRootDir;
   }
 
-  @Override
-  public FileSystem getFileSystem() {
-    return dataFs;
-  }
-
   /**
    * @return {@code true} when the data file system is available, {@code 
false} otherwise.
    */
@@ -3279,24 +3270,6 @@ public class HRegionServer extends AbstractServer 
implements
         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
   }
 
-  /**
-   * Checks to see if the file system is still accessible. If not, sets
-   * abortRequested and stopRequested
-   *
-   * @return false if file system is not available
-   */
-  boolean checkFileSystem() {
-    if (this.dataFsOk && this.dataFs != null) {
-      try {
-        FSUtils.checkFileSystemAvailable(this.dataFs);
-      } catch (IOException e) {
-        abort("File System not available", e);
-        this.dataFsOk = false;
-      }
-    }
-    return this.dataFsOk;
-  }
-
   @Override
   public void updateRegionFavoredNodesMapping(String encodedRegionName,
       
List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> 
favoredNodes) {
@@ -3503,6 +3476,12 @@ public class HRegionServer extends AbstractServer 
implements
         .build();
   }
 
+  /**
+   * @return the max compaction pressure of all stores on this regionserver. 
The value should be
+   *         greater than or equal to 0.0, and any value greater than 1.0 
means we enter the
+   *         emergency state that some stores have too many store files.
+   * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
+   */
   @Override
   public double getCompactionPressure() {
     double max = 0;
@@ -3540,6 +3519,11 @@ public class HRegionServer extends AbstractServer 
implements
     return flushThroughputController;
   }
 
+  /**
+   * @return the flush pressure of all stores on this regionserver. The value 
should be greater than
+   *         or equal to 0.0, and any value greater than 1.0 means we enter 
the emergency state that
+   *         global memstore size already exceeds lower limit.
+   */
   @Override
   public double getFlushPressure() {
     if (getRegionServerAccounting() == null || cacheFlusher == null) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 9749576..7dc4c6e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -240,7 +240,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
    * @param family HColumnDescriptor for this column
    * @param confParam configuration object failed.  Can be null.
    */
-  protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
+  public HStore(final HRegion region, final ColumnFamilyDescriptor family,
       final Configuration confParam, boolean warmup) throws IOException {
 
     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index c718f15..1f51fe8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import 
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -56,14 +57,16 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
  * the code base.
  */
 @InterfaceAudience.Private
-public interface RegionServerServices extends Server, MutableOnlineRegions, 
FavoredNodesForRegion {
+public interface RegionServerServices
+    extends Server, MutableOnlineRegions, FavoredNodesForRegion, 
ThroughputControllerService {
 
-  /** @return the WAL for a particular region. Pass null for getting the
-   * default (common) WAL */
+  /**
+   * @return the WAL for a particular region. Pass null for getting the 
default (common) WAL
+   */
   WAL getWAL(RegionInfo regionInfo) throws IOException;
 
-  /** @return the List of WALs that are used by this server
-   *  Doesn't include the meta WAL
+  /**
+   * @return the List of WALs that are used by this server Doesn't include the 
meta WAL
    */
   List<WAL> getWALs() throws IOException;
 
@@ -229,27 +232,11 @@ public interface RegionServerServices extends Server, 
MutableOnlineRegions, Favo
   HeapMemoryManager getHeapMemoryManager();
 
   /**
-   * @return the max compaction pressure of all stores on this regionserver. 
The value should be
-   *         greater than or equal to 0.0, and any value greater than 1.0 
means we enter the
-   *         emergency state that some stores have too many store files.
-   * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
-   */
-  double getCompactionPressure();
-
-  /**
    * @return the controller to avoid flush too fast
    */
   ThroughputController getFlushThroughputController();
 
   /**
-   * @return the flush pressure of all stores on this regionserver. The value 
should be greater than
-   *         or equal to 0.0, and any value greater than 1.0 means we enter 
the emergency state that
-   *         global memstore size already exceeds lower limit.
-   */
-  @Deprecated
-  double getFlushPressure();
-
-  /**
    * @return the metrics tracker for the region server
    */
   MetricsRegionServer getMetrics();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
index 45e7267..df6be3f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
@@ -19,11 +19,10 @@ package org.apache.hadoop.hbase.regionserver.throttle;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.util.ReflectionUtils;
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public final class CompactionThroughputControllerFactory {
@@ -45,7 +44,7 @@ public final class CompactionThroughputControllerFactory {
   private static final String 
DEPRECATED_NAME_OF_NO_LIMIT_THROUGHPUT_CONTROLLER_CLASS =
     
"org.apache.hadoop.hbase.regionserver.compactions.NoLimitThroughputController";
 
-  public static ThroughputController create(RegionServerServices server,
+  public static ThroughputController create(ThroughputControllerService server,
       Configuration conf) {
     Class<? extends ThroughputController> clazz = 
getThroughputControllerClass(conf);
     ThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
index 4b1b261..c14e514 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.throttle;
 
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class NoLimitThroughputController implements ThroughputController {
@@ -27,7 +26,7 @@ public class NoLimitThroughputController implements 
ThroughputController {
   public static final NoLimitThroughputController INSTANCE = new 
NoLimitThroughputController();
 
   @Override
-  public void setup(RegionServerServices server) {
+  public void setup(ThroughputControllerService server) {
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
index 1c3952e..d37d7c3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
@@ -20,11 +20,10 @@ package org.apache.hadoop.hbase.regionserver.throttle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
 
 /**
  * A throughput controller which uses the follow schema to limit throughput
@@ -74,7 +73,7 @@ public class PressureAwareCompactionThroughputController 
extends PressureAwareTh
   private long maxThroughputOffpeak;
 
   @Override
-  public void setup(final RegionServerServices server) {
+  public void setup(final ThroughputControllerService server) {
     server.getChoreService().scheduleChore(
       new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) {
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
index 51e7b42..6183786 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
@@ -20,11 +20,11 @@ package org.apache.hadoop.hbase.regionserver.throttle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+
 
 /**
  * A throughput controller which uses the follow schema to limit throughput
@@ -68,7 +68,7 @@ public class PressureAwareFlushThroughputController extends 
PressureAwareThrough
       10L * 1024 * 1024;// 10MB
 
   @Override
-  public void setup(final RegionServerServices server) {
+  public void setup(final ThroughputControllerService server) {
     server.getChoreService().scheduleChore(
       new ScheduledChore("FlushThroughputTuner", this, tuningPeriod, 
this.tuningPeriod) {
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
index 306df0b..a690433 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
@@ -23,12 +23,12 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public abstract class PressureAwareThroughputController extends Configured 
implements
@@ -77,10 +77,11 @@ public abstract class PressureAwareThroughputController 
extends Configured imple
   private volatile double maxThroughput;
   private volatile double maxThroughputPerOperation;
 
-  protected final ConcurrentMap<String, ActiveOperation> activeOperations = 
new ConcurrentHashMap<>();
+  protected final ConcurrentMap<String, ActiveOperation> activeOperations =
+      new ConcurrentHashMap<>();
 
   @Override
-  public abstract void setup(final RegionServerServices server);
+  public abstract void setup(final ThroughputControllerService server);
 
   protected String throughputDesc(long deltaSize, long elapsedTime) {
     return throughputDesc((double) deltaSize / elapsedTime * 1000);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
index 707d02d..0e8f3c4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver.throttle;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 
 /**
  * A utility that constrains the total throughput of one or more simultaneous 
flows by
@@ -30,9 +29,9 @@ import 
org.apache.hadoop.hbase.regionserver.RegionServerServices;
 public interface ThroughputController extends Stoppable {
 
   /**
-   * Setup controller for the given region server.
+   * Setup controller for the given server.
    */
-  void setup(RegionServerServices server);
+  void setup(ThroughputControllerService server);
 
   /**
    * Start the throughput controller.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControllerService.java
similarity index 51%
copy from 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
copy to 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControllerService.java
index 707d02d..7e55c6d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControllerService.java
@@ -17,36 +17,17 @@
  */
 package org.apache.hadoop.hbase.regionserver.throttle;
 
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 
 /**
- * A utility that constrains the total throughput of one or more simultaneous 
flows by
- * sleeping when necessary.
+ * this Interface is provider of methods the {@link ThroughputController} 
needs to run
  */
[email protected](HBaseInterfaceAudience.CONFIG)
-public interface ThroughputController extends Stoppable {
-
-  /**
-   * Setup controller for the given region server.
-   */
-  void setup(RegionServerServices server);
-
-  /**
-   * Start the throughput controller.
-   */
-  void start(String name);
-
-  /**
-   * Control the throughput. Will sleep if too fast.
-   * @return the actual sleep time.
-   */
-  long control(String name, long size) throws InterruptedException;
-
-  /**
-   * Finish the controller. Should call this method in a finally block.
-   */
-  void finish(String name);
[email protected]
+public interface ThroughputControllerService {
+  ChoreService getChoreService();
+  double getCompactionPressure();
+  double getFlushPressure();
+  Configuration getConfiguration();
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionFilesCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionFilesCache.java
new file mode 100644
index 0000000..6836d91
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionFilesCache.java
@@ -0,0 +1,155 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category(SmallTests.class)
+public class TestCompactionFilesCache {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestCompactionFilesCache.class);
+  private static TableName tableName = 
TableName.valueOf("testCompactionFilesCache");
+  private static byte[] family = Bytes.toBytes("A");
+  private static CompactionFilesCache storage;
+  private static RegionInfo regionInfo = 
RegionInfoBuilder.newBuilder(tableName).build();
+  private static ColumnFamilyDescriptor cfd =
+      ColumnFamilyDescriptorBuilder.newBuilder(family).build();
+
+  private static class TestStorageThread extends Thread {
+    List<String> fileNameList;
+    boolean flag;
+
+    TestStorageThread(List<String> files, boolean flag) {
+      this.fileNameList = files;
+      this.flag = flag;
+    }
+
+    @Override
+    public void run() {
+      if (flag) {
+        storage.addSelectedFiles(regionInfo, cfd, fileNameList);
+      } else {
+        storage.removeSelectedFiles(regionInfo, cfd, fileNameList);
+      }
+    }
+  }
+
+  @Before
+  public void cleanUpStorage() {
+    storage = new CompactionFilesCache();
+  }
+
+  @Test
+  public void testSelectedFilesStore() {
+    List<String> fileNames = Lists.newArrayList("1");
+    storage.addSelectedFiles(regionInfo, cfd, fileNames);
+    // can not add a selected file again
+    Assert.assertFalse(storage.addSelectedFiles(regionInfo, cfd, fileNames));
+
+    List<String> fileNames2 = Lists.newArrayList("2");
+    Assert.assertTrue(storage.addSelectedFiles(regionInfo, cfd, fileNames2));
+
+    List<String> fileNames3 = Lists.newArrayList("3", "2");
+    Assert.assertFalse(storage.addSelectedFiles(regionInfo, cfd, fileNames3));
+
+    storage.removeSelectedFiles(regionInfo, cfd, fileNames2);
+    Assert.assertTrue(storage.addSelectedFiles(regionInfo, cfd, fileNames3));
+
+    Set<String> selectedFiles = storage.getSelectedStoreFiles(regionInfo, cfd);
+    Assert.assertEquals(3, selectedFiles.size());
+  }
+
+  @Test
+  public void testCompactedFilesStore() {
+    List<String> fileNames = Lists.newArrayList("1");
+    storage.addCompactedFiles(regionInfo, cfd, fileNames);
+    // add a compacted file twice, no check(This should not happen)
+    Assert.assertTrue(storage.addCompactedFiles(regionInfo, cfd, fileNames));
+
+    List<String> fileNames2 = Lists.newArrayList("3", "2");
+    Assert.assertTrue(storage.addCompactedFiles(regionInfo, cfd, fileNames2));
+
+    Set<String> compactedFiles = storage.getCompactedStoreFiles(regionInfo, 
cfd);
+    Assert.assertEquals(3, compactedFiles.size());
+
+    Set<String> storeFileNames = Sets.newHashSet("3");
+    storage.cleanupCompactedFiles(regionInfo, cfd, storeFileNames);
+    compactedFiles = storage.getCompactedStoreFiles(regionInfo, cfd);
+    Assert.assertEquals(1, compactedFiles.size());
+  }
+
+  @Test
+  public void testSelectedFilesStoreMultiThread() {
+    int threadNum = 50;
+    TestStorageThread[] testStorageThreads = new TestStorageThread[threadNum];
+    for (int i = 0; i < threadNum; i++) {
+      testStorageThreads[i] =
+          new TestStorageThread(new ArrayList<>(Arrays.asList("" + i)), true);
+    }
+    for (int i = 0; i < threadNum; i++) {
+      testStorageThreads[i].start();
+    }
+    for (int i = 0; i < threadNum; i++) {
+      try {
+        testStorageThreads[i].join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupted();
+      }
+    }
+    Assert.assertEquals(threadNum, storage.getSelectedStoreFiles(regionInfo, 
cfd).size());
+    for (int i = 0; i < threadNum; i++) {
+      testStorageThreads[i] =
+          new TestStorageThread(new ArrayList<>(Arrays.asList("" + i)), false);
+    }
+    for (int i = 0; i < threadNum; i++) {
+      testStorageThreads[i].start();
+    }
+    for (int i = 0; i < threadNum; i++) {
+      try {
+        testStorageThreads[i].join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupted();
+      }
+    }
+    Assert.assertEquals(0, storage.getSelectedStoreFiles(regionInfo, 
cfd).size());
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
index b5da939..1362640 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -17,22 +17,38 @@
  */
 package org.apache.hadoop.hbase.compactionserver;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
+import java.util.ArrayList;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.compaction.CompactionOffloadManager;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 
+import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -43,6 +59,7 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 @Category({CompactionServerTests.class, MediumTests.class})
 public class TestCompactionServer {
 
@@ -87,18 +104,147 @@ public class TestCompactionServer {
     TEST_UTIL.deleteTableIfAny(TABLENAME);
   }
 
+  private void doPutRecord(int start, int end, boolean flush) throws Exception 
{
+    Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+    for (int i = start; i <= end; i++) {
+      Put p = new Put(Bytes.toBytes(i));
+      p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), Bytes.toBytes(i));
+      h.put(p);
+      if (i % 100 == 0 && flush) {
+        TEST_UTIL.flush(TABLENAME);
+      }
+    }
+    h.close();
+  }
+
+  private void doFillRecord(int start, int end, byte[] value) throws Exception 
{
+    Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+    for (int i = start; i <= end; i++) {
+      Put p = new Put(Bytes.toBytes(i));
+      p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), value);
+      h.put(p);
+    }
+    h.close();
+  }
+
+  private void verifyRecord(int start, int end, boolean exist) throws 
Exception {
+    Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+    for (int i = start; i <= end; i++) {
+      Get get = new Get(Bytes.toBytes(i));
+      Result r = h.get(get);
+      if (exist) {
+        assertArrayEquals(Bytes.toBytes(i), r.getValue(Bytes.toBytes(FAMILY), 
Bytes.toBytes(COL)));
+      } else {
+        assertNull(r.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COL)));
+      }
+    }
+    h.close();
+  }
+
+  @Test
+  public void testCompaction() throws Exception {
+    TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+    doPutRecord(1, 1000, true);
+    int hFileCount = 0;
+    for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+      hFileCount += 
region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+    }
+    assertEquals(10, hFileCount);
+    TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+    TEST_UTIL.compact(TABLENAME, true);
+    Thread.sleep(5000);
+    TEST_UTIL.waitFor(60000,
+      () -> COMPACTION_SERVER.requestCount.sum() > 0 && 
COMPACTION_SERVER.compactionThreadManager
+          .getRunningCompactionTasks().values().size() == 0);
+    hFileCount = 0;
+    for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+      hFileCount += 
region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+    }
+    assertEquals(1, hFileCount);
+    verifyRecord(1, 1000, true);
+  }
+
+  @Test
+  public void testCompactionWithVersions() throws Exception {
+    TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+    ColumnFamilyDescriptor cfd =
+        
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setMaxVersions(3).build();
+    TableDescriptor modifiedtableDescriptor =
+        
TableDescriptorBuilder.newBuilder(TABLENAME).setColumnFamily(cfd).build();
+    TEST_UTIL.getAdmin().modifyTable(modifiedtableDescriptor);
+    TEST_UTIL.waitTableAvailable(TABLENAME);
+    doFillRecord(1, 500, RandomUtils.nextBytes(20));
+    doFillRecord(1, 500, RandomUtils.nextBytes(20));
+    doFillRecord(1, 500, RandomUtils.nextBytes(20));
+    TEST_UTIL.flush(TABLENAME);
+    doPutRecord(1, 500, true);
+
+    int kVCount = 0;
+    for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+      for (HStoreFile hStoreFile : 
region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
+        kVCount += 
hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount();
+      }
+    }
+    assertEquals(2000, kVCount);
+    TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+    TEST_UTIL.compact(TABLENAME, true);
+
+    TEST_UTIL.waitFor(60000, () -> {
+      int hFileCount = 0;
+      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) 
{
+        hFileCount += 
region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+
+      }
+      return hFileCount == 1;
+    });
+
+    kVCount = 0;
+    for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+      for (HStoreFile hStoreFile : 
region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
+        kVCount += 
hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount();
+      }
+    }
+    assertEquals(1500, kVCount);
+    verifyRecord(1, 500, true);
+  }
+
+
+  @Test
+  public void testCompactionServerDown() throws Exception {
+    TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+    COMPACTION_SERVER.stop("test");
+    TEST_UTIL.waitFor(60000,
+      () -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() 
== 0);
+    doPutRecord(1, 1000, true);
+    int hFileCount = 0;
+    for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+      hFileCount += 
region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+    }
+    assertEquals(10, hFileCount);
+    TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+    TEST_UTIL.compact(TABLENAME, true);
+    Thread.sleep(5000);
+    TEST_UTIL.waitFor(60000, () -> {
+      int hFile = 0;
+      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) 
{
+        hFile += region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+      }
+      return hFile == 1;
+    });
+    verifyRecord(1, 1000, true);
+  }
 
   @Test
   public void testCompactionServerReport() throws Exception {
     CompactionOffloadManager compactionOffloadManager = 
MASTER.getCompactionOffloadManager();
     TEST_UTIL.waitFor(60000, () -> 
!compactionOffloadManager.getOnlineServers().isEmpty()
-      && null != 
compactionOffloadManager.getOnlineServers().get(COMPACTION_SERVER_NAME));
+        && null != 
compactionOffloadManager.getOnlineServers().get(COMPACTION_SERVER_NAME));
     // invoke compact
     TEST_UTIL.compact(TABLENAME, false);
     TEST_UTIL.waitFor(60000,
-      () -> COMPACTION_SERVER.rpcServices.requestCount.sum() > 0
-        && COMPACTION_SERVER.rpcServices.requestCount.sum() == 
compactionOffloadManager
-        
.getOnlineServers().get(COMPACTION_SERVER_NAME).getTotalNumberOfRequests());
+      () -> COMPACTION_SERVER.requestCount.sum() > 0
+          && COMPACTION_SERVER.requestCount.sum() == 
compactionOffloadManager.getOnlineServers()
+              .get(COMPACTION_SERVER_NAME).getTotalNumberOfRequests());
   }
 
   @Test

Reply via email to