This is an automated email from the ASF dual-hosted git repository.

niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-25714 by this push:
     new da0fa30  HBASE-25968 Request compact to compaction server (#3378)
da0fa30 is described below

commit da0fa3000e341058cba57b06d2afe2ba452ee6df
Author: niuyulin <[email protected]>
AuthorDate: Mon Jun 21 11:03:49 2021 +0800

    HBASE-25968 Request compact to compaction server (#3378)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |   4 +-
 .../apache/hadoop/hbase/security/SecurityInfo.java |   3 +
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  13 +++
 .../src/main/protobuf/server/Compaction.proto      |  62 ++++++++++
 .../src/main/protobuf/server/region/Admin.proto    |   4 +
 .../hbase/client/AsyncClusterConnection.java       |   5 +
 .../hbase/client/AsyncClusterConnectionImpl.java   |  12 +-
 .../hbase/client/AsyncCompactionServerService.java |  86 ++++++++++++++
 .../hbase/client/AsyncRegionServerAdmin.java       |   7 ++
 .../hbase/compactionserver/CSRpcServices.java      |  34 +++++-
 .../compactionserver/CompactionThreadManager.java  |  59 ++++++++++
 .../hbase/compactionserver/HCompactionServer.java  |  22 +++-
 .../hadoop/hbase/master/MasterRpcServices.java     |  27 ++++-
 .../compaction/CompactionOffloadManager.java       |  47 ++++++++
 .../hadoop/hbase/regionserver/HRegionServer.java   |  67 +++++++++++
 .../apache/hadoop/hbase/regionserver/HStore.java   | 129 ++++++++++++++++++---
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  40 +++++++
 .../hbase/regionserver/RegionServerServices.java   |   6 +
 .../hadoop/hbase/security/HBasePolicyProvider.java |   5 +-
 .../hadoop/hbase/MockRegionServerServices.java     |  12 ++
 .../hbase/client/DummyAsyncClusterConnection.java  |   5 +
 .../compactionserver/TestCompactionServer.java     |  23 +++-
 .../hadoop/hbase/master/MockRegionServer.java      |  21 ++++
 23 files changed, 660 insertions(+), 33 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 8a1ac5a..87f9a6b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -83,11 +83,11 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   final AsyncConnectionConfiguration connConf;
 
-  private final User user;
+  protected final User user;
 
   final ConnectionRegistry registry;
 
-  private final int rpcTimeout;
+  protected final int rpcTimeout;
 
   protected final RpcClient rpcClient;
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
index e4e97e7..7ddc569 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
@@ -24,6 +24,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@@ -54,6 +55,8 @@ public class SecurityInfo {
         new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, 
Kind.HBASE_AUTH_TOKEN));
     
infos.put(CompactionServerStatusProtos.CompactionServerStatusService.getDescriptor().getName(),
       new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, 
Kind.HBASE_AUTH_TOKEN));
+    infos.put(CompactionProtos.CompactionService.getDescriptor().getName(),
+      new SecurityInfo(SecurityConstants.COMPACTION_SERVER_KRB_PRINCIPAL, 
Kind.HBASE_AUTH_TOKEN));
     // NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider 
ALSO ELSE
     // new Service will not be found when all is Kerberized!!!!
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 1b00887..56661b4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -172,6 +172,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ComparatorProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@@ -3801,4 +3802,16 @@ public final class ProtobufUtil {
       .build();
   }
 
+  public static String toString(CompactionProtos.CompactRequest request) {
+    ServerName rsServerName = ProtobufUtil.toServerName(request.getServer());
+    org.apache.hadoop.hbase.client.RegionInfo regionInfo =
+        ProtobufUtil.toRegionInfo(request.getRegionInfo());
+    ColumnFamilyDescriptor cfd = 
ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
+    boolean major = request.getMajor();
+    int priority = request.getPriority();
+    return new StringBuilder("RS: ").append(rsServerName).append(", region: ")
+        .append(regionInfo.getRegionNameAsString()).append(", CF: 
").append(cfd.getNameAsString())
+        .append(", major:").append(major).append(", 
priority:").append(priority).toString();
+  }
+
 }
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/Compaction.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/Compaction.proto
new file mode 100644
index 0000000..5e036c8
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/server/Compaction.proto
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+
+// This file contains protocol buffers that are used for 
CompactionManagerProtocol.
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "CompactionProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "HBase.proto";
+import "server/ClusterStatus.proto";
+import "server/ErrorHandling.proto";
+
+message CompactRequest {
+  required ServerName server = 1;
+  required RegionInfo region_info = 2;
+  required ColumnFamilySchema family = 3;
+  required bool major = 4;
+  required int32 priority = 5;
+  repeated ServerName favored_nodes = 6;
+}
+
+message CompactResponse {
+}
+
+message CompleteCompactionRequest {
+  required RegionInfo region_info = 1;
+  required ColumnFamilySchema family = 2;
+  repeated string selected_files = 3;
+  repeated string new_files = 4;
+  required bool new_force_major = 5;
+}
+
+message CompleteCompactionResponse {
+  required bool success = 1;
+}
+
+service CompactionService {
+  /** Called when a region server request compact a column of a region. */
+  rpc RequestCompaction(CompactRequest)
+  returns(CompactResponse);
+
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
index 0667292..b86b499 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
@@ -30,6 +30,7 @@ import "server/Quota.proto";
 import "server/ClusterStatus.proto";
 import "server/region/WAL.proto";
 import "server/region/TooSlowLog.proto";
+import "server/Compaction.proto";
 
 message GetRegionInfoRequest {
   required RegionSpecifier region = 1;
@@ -393,6 +394,9 @@ service AdminService {
   rpc ExecuteProcedures(ExecuteProceduresRequest)
     returns(ExecuteProceduresResponse);
 
+  rpc CompleteCompaction(CompleteCompactionRequest)
+    returns(CompleteCompactionResponse);
+
   rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest)
     returns(ClearSlowLogResponses);
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 92118ac..28915d5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -42,6 +42,11 @@ public interface AsyncClusterConnection extends 
AsyncConnection {
   AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
 
   /**
+   * Get the compaction service for the given compaction server.
+   */
+  AsyncCompactionServerService getCompactionServerService(ServerName 
serverName);
+
+  /**
    * Get the nonce generator for this connection.
    */
   NonceGenerator getNonceGenerator();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 39fc3a2..06b4bbe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -41,6 +41,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBul
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 
@@ -49,12 +50,16 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci
  */
 @InterfaceAudience.Private
 class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements 
AsyncClusterConnection {
-
   public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry 
registry,
       String clusterId, SocketAddress localAddress, User user) {
     super(conf, registry, clusterId, localAddress, user);
   }
 
+  CompactionProtos.CompactionService.Interface 
createCompactionServerStub(ServerName serverName) {
+    return CompactionProtos.CompactionService
+        .newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
+  }
+
   @Override
   public NonceGenerator getNonceGenerator() {
     return super.getNonceGenerator();
@@ -71,6 +76,11 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl 
implements AsyncClu
   }
 
   @Override
+  public AsyncCompactionServerService getCompactionServerService(ServerName 
serverName) {
+    return new AsyncCompactionServerService(serverName, this);
+  }
+
+  @Override
   public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
       boolean writeFlushWALMarker) {
     RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncCompactionServerService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncCompactionServerService.java
new file mode 100644
index 0000000..27dd45e
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncCompactionServerService.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
+
+
+/**
+ * A simple wrapper of the {@link CompactionService} for a compaction server, 
which returns a
+ * {@link CompletableFuture}. This is easier to use, as if you use the raw 
protobuf interface, you
+ * need to get the result from the {@link RpcCallback}, and if there is an 
exception, you need to
+ * get it from the {@link RpcController} passed in.
+ * <p/>
+ * Notice that there is no retry, and this is intentional. We have different 
retry for different
+ * usage for now, if later we want to unify them, we can move the retry logic 
into this class.
+ */
[email protected]
+public class AsyncCompactionServerService {
+
+  private final ServerName server;
+
+  private final AsyncClusterConnectionImpl conn;
+
+  AsyncCompactionServerService(ServerName server, AsyncClusterConnectionImpl 
conn) {
+    this.server = server;
+    this.conn = conn;
+  }
+
+  @FunctionalInterface
+  private interface RpcCall<RESP> {
+    void call(CompactionService.Interface stub, HBaseRpcController controller,
+        RpcCallback<RESP> done);
+  }
+
+  // TODO: eliminate duplicate code in AsyncRegionServerAdmin and maybe we 
could also change the
+  //  way on how to do regionServerReport
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    HBaseRpcController controller = conn.rpcControllerFactory.newController();
+    try {
+      rpcCall.call(conn.createCompactionServerStub(server), controller, new 
RpcCallback<RESP>() {
+        @Override
+        public void run(RESP resp) {
+          if (controller.failed()) {
+            future.completeExceptionally(controller.getFailed());
+          } else {
+            future.complete(resp);
+          }
+        }
+      });
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  public CompletableFuture<CompactResponse> requestCompaction(CompactRequest 
request) {
+    return call((stub, controller, done) -> stub.requestCompaction(controller, 
request, done));
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index cb06137..a625adc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -66,6 +66,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 
@@ -133,6 +135,11 @@ public class AsyncRegionServerAdmin {
     return call((stub, controller, done) -> stub.getOnlineRegion(controller, 
request, done));
   }
 
+  public CompletableFuture<CompleteCompactionResponse>
+      completeCompaction(CompleteCompactionRequest request) {
+    return call((stub, controller, done) -> 
stub.completeCompaction(controller, request, done));
+  }
+
   public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest 
request) {
     return call((stub, controller, done) -> stub.openRegion(controller, 
request, done));
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
index 9ef9c21..2a76d7a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.compactionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.AbstractRpcServices;
@@ -31,16 +32,25 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
 
 @InterfaceAudience.Private
-public class CSRpcServices extends AbstractRpcServices {
+public class CSRpcServices extends AbstractRpcServices
+    implements CompactionService.BlockingInterface {
   protected static final Logger LOG = 
LoggerFactory.getLogger(CSRpcServices.class);
 
-  protected final HCompactionServer compactionServer;
+  private final HCompactionServer compactionServer;
+
+  // Request counter.
+  final LongAdder requestCount = new LongAdder();
   /** RPC scheduler to use for the compaction server. */
   public static final String COMPACTION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
       "hbase.compaction.server.rpc.scheduler.factory.class";
-
   /**
    * @return immutable list of blocking services and the security info classes 
that this server
    *         supports
@@ -48,6 +58,9 @@ public class CSRpcServices extends AbstractRpcServices {
   protected List<RpcServer.BlockingServiceAndInterface> getServices(final 
Configuration conf) {
     // now return empty, compaction server do not receive rpc request
     List<RpcServer.BlockingServiceAndInterface> bssi = new ArrayList<>();
+    bssi.add(new RpcServer.BlockingServiceAndInterface(
+        CompactionService.newReflectiveBlockingService(this),
+        CompactionService.BlockingInterface.class));
     return new 
ImmutableList.Builder<RpcServer.BlockingServiceAndInterface>().addAll(bssi).build();
   }
 
@@ -65,4 +78,19 @@ public class CSRpcServices extends AbstractRpcServices {
     compactionServer = cs;
   }
 
+
+  /**
+   * Request compaction on the compaction server.
+   * @param controller the RPC controller
+   * @param request the compaction request
+   */
+  @Override
+  public CompactResponse requestCompaction(RpcController controller,
+      CompactionProtos.CompactRequest request) {
+    requestCount.increment();
+    LOG.info("Receive compaction request from {}", 
ProtobufUtil.toString(request));
+    compactionServer.compactionThreadManager.requestCompaction();
+    return CompactionProtos.CompactResponse.newBuilder().build();
+  }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
new file mode 100644
index 0000000..e7a5068
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class CompactionThreadManager {
+  private static Logger LOG = 
LoggerFactory.getLogger(CompactionThreadManager.class);
+
+  private final Configuration conf;
+  private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
+      new ConcurrentHashMap<>();
+  private final HCompactionServer server;
+
+  public CompactionThreadManager(final Configuration conf, HCompactionServer 
server) {
+    this.conf = conf;
+    this.server = server;
+  }
+
+  private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws 
IOException {
+    AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
+    if (admin == null) {
+      LOG.debug("New RS admin connection to {}", sn);
+      admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
+      this.rsAdmins.put(sn, admin);
+    }
+    return admin;
+  }
+
+  public void requestCompaction() {
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
index 88a3176..cd09480 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos.CompactionServerStatusService;
 
@@ -75,6 +76,7 @@ public class HCompactionServer extends AbstractServer {
   // Stub to do compaction server status calls against the master.
   private volatile CompactionServerStatusService.BlockingInterface cssStub;
 
+  CompactionThreadManager compactionThreadManager;
   /**
    * Get the current master from ZooKeeper and open the RPC connection to it. 
To get a fresh
    * connection, the current cssStub must be null. Method will block until a 
master is available.
@@ -118,6 +120,7 @@ public class HCompactionServer extends AbstractServer {
     // login the server principal (if using secure Hadoop)
     login(userProvider, this.rpcServices.getIsa().getHostName());
     Superusers.initialize(conf);
+    this.compactionThreadManager = new CompactionThreadManager(conf, this);
     this.rpcServices.start();
   }
 
@@ -131,15 +134,30 @@ public class HCompactionServer extends AbstractServer {
       SecurityConstants.COMPACTION_SERVER_KRB_PRINCIPAL, host);
   }
 
-  private boolean tryCompactionServerReport() throws IOException {
+  private ClusterStatusProtos.CompactionServerLoad buildServerLoad(long 
reportStartTime,
+    long reportEndTime) {
+    ClusterStatusProtos.CompactionServerLoad.Builder serverLoad =
+      ClusterStatusProtos.CompactionServerLoad.newBuilder();
+    serverLoad.setCompactedCells(0);
+    serverLoad.setCompactingCells(0);
+    serverLoad.setTotalNumberOfRequests(rpcServices.requestCount.sum());
+    serverLoad.setReportStartTime(reportStartTime);
+    serverLoad.setReportEndTime(reportEndTime);
+    return serverLoad.build();
+  }
+
+  private boolean tryCompactionServerReport(long reportStartTime, long 
reportEndTime)
+      throws IOException {
     CompactionServerStatusService.BlockingInterface css = cssStub;
     if (css == null) {
       return false;
     }
+    ClusterStatusProtos.CompactionServerLoad sl = 
buildServerLoad(reportStartTime, reportEndTime);
     try {
       CompactionServerStatusProtos.CompactionServerReportRequest.Builder 
request =
           
CompactionServerStatusProtos.CompactionServerReportRequest.newBuilder();
       request.setServer(ProtobufUtil.toServerName(getServerName()));
+      request.setLoad(sl);
       this.cssStub.compactionServerReport(null, request.build());
     } catch (ServiceException se) {
       IOException ioe = ProtobufUtil.getRemoteException(se);
@@ -194,7 +212,7 @@ public class HCompactionServer extends AbstractServer {
       while (!isStopped()) {
         long now = System.currentTimeMillis();
         if ((now - lastMsg) >= msgInterval) {
-          if (tryCompactionServerReport() && !online.get()) {
+          if (tryCompactionServerReport(lastMsg, now) && !online.get()) {
             synchronized (online) {
               online.set(true);
               online.notifyAll();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index bcf5e76..b8c8891 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -161,6 +161,12 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionIn
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos.CompactionServerReportRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos.CompactionServerReportResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos.CompactionServerStatusService;
@@ -418,11 +424,12 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.
 public class MasterRpcServices extends RSRpcServices implements
     MasterService.BlockingInterface, 
RegionServerStatusService.BlockingInterface,
     LockService.BlockingInterface, HbckService.BlockingInterface,
-    ClientMetaService.BlockingInterface, 
CompactionServerStatusService.BlockingInterface {
+    ClientMetaService.BlockingInterface, 
CompactionServerStatusService.BlockingInterface,
+  CompactionProtos.CompactionService.BlockingInterface {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MasterRpcServices.class.getName());
   private static final Logger AUDITLOG =
-      
LoggerFactory.getLogger("SecurityLogger."+MasterRpcServices.class.getName());
+      LoggerFactory.getLogger("SecurityLogger." + 
MasterRpcServices.class.getName());
 
   private final HMaster master;
 
@@ -567,6 +574,9 @@ public class MasterRpcServices extends RSRpcServices 
implements
     bssi.add(new BlockingServiceAndInterface(
         CompactionServerStatusService.newReflectiveBlockingService(this),
         CompactionServerStatusService.BlockingInterface.class));
+    bssi.add(new BlockingServiceAndInterface(
+      CompactionService.newReflectiveBlockingService(this),
+      CompactionService.BlockingInterface.class));
     bssi.addAll(super.getServices(conf));
     return bssi;
   }
@@ -3473,4 +3483,17 @@ public class MasterRpcServices extends RSRpcServices 
implements
     }
   }
 
+  @Override
+  public CompactResponse requestCompaction(RpcController controller, 
CompactRequest request)
+      throws ServiceException {
+    master.getCompactionOffloadManager().requestCompaction(request);
+    return CompactResponse.newBuilder().build();
+  }
+
+  @Override
+  public CompleteCompactionResponse completeCompaction(RpcController 
controller,
+    CompleteCompactionRequest request) {
+    throw new UnsupportedOperationException("master not receive 
completeCompaction");
+  }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
index 03f6829..b0e80d1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
@@ -22,22 +22,30 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.CompactionServerMetrics;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncCompactionServerService;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import 
org.apache.hadoop.hbase.master.procedure.SwitchCompactionOffloadProcedure;
 import org.apache.hadoop.hbase.regionserver.CompactionOffloadSwitchStorage;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCompactionOffloadEnabledRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCompactionOffloadEnabledResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchCompactionOffloadRequest;
@@ -51,6 +59,8 @@ public class CompactionOffloadManager {
   private CompactionOffloadSwitchStorage compactionOffloadSwitchStorage;
   private static final Logger LOG =
       LoggerFactory.getLogger(CompactionOffloadManager.class.getName());
+  private final ConcurrentMap<ServerName, AsyncCompactionServerService> 
csStubs =
+      new ConcurrentHashMap<>();
 
   public CompactionOffloadManager(final MasterServices master) {
     this.masterServices = master;
@@ -138,4 +148,41 @@ public class CompactionOffloadManager {
         .postSwitchCompactionOffload(oldCompactionOffloadEnable, 
compactionOffloadEnabled);
     return response;
   }
+
+  /**
+   * Like there is a 1-1 mapping for region to RS, we will have it for 
compaction of region to CS.
+   */
+  private ServerName selectCompactionServer(CompactRequest request) throws 
ServiceException {
+    List<ServerName> compactionServerList = getOnlineServersList();
+    if (compactionServerList.size() <= 0) {
+      throw new ServiceException("compaction server is not available");
+    }
+    // TODO: need more complex and effective way to manage compaction of 
region to CS mapping.
+    //  maybe another assignment and balance module
+    long index = (request.getRegionInfo().getStartKey().hashCode() & 
Integer.MAX_VALUE)
+        % compactionServerList.size();
+    return compactionServerList.get((int) index);
+  }
+
+  private AsyncCompactionServerService getCsStub(final ServerName sn) throws 
IOException {
+    AsyncCompactionServerService csStub = this.csStubs.get(sn);
+    if (csStub == null) {
+      LOG.debug("New CS stub connection to {}", sn);
+      csStub = 
this.masterServices.getAsyncClusterConnection().getCompactionServerService(sn);
+      this.csStubs.put(sn, csStub);
+    }
+    return csStub;
+  }
+
+  public CompactResponse requestCompaction(CompactRequest request) throws 
ServiceException {
+    ServerName targetCompactionServer = selectCompactionServer(request);
+    LOG.info("Receive compaction request from {}, and send to Compaction 
server:{}",
+      ProtobufUtil.toString(request), targetCompactionServer);
+    try {
+      
FutureUtils.get(getCsStub(targetCompactionServer).requestCompaction(request));
+    } catch (Throwable t) {
+      LOG.error("requestCompaction from master to CS error: {}", t);
+    }
+    return null;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index ca2e1df..e49ad87 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.ZNodeClearer;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -211,6 +212,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
@@ -357,6 +360,7 @@ public class HRegionServer extends AbstractServer implements
   // Stub to do region server status calls against the master.
   private volatile RegionServerStatusService.BlockingInterface rssStub;
   private volatile LockService.BlockingInterface lockStub;
+  private volatile CompactionService.BlockingInterface cmsStub;
 
   private UncaughtExceptionHandler uncaughtExceptionHandler;
 
@@ -3736,4 +3740,67 @@ public class HRegionServer extends AbstractServer 
implements
   public long getRetryPauseTime() {
     return this.retryPauseTime;
   }
+
+  @Override
+  public boolean isCompactionOffloadEnabled(){
+    return regionServerCompactionOffloadManager.isCompactionOffloadEnabled();
+  }
+
+  private synchronized void createCompactionManagerStub(boolean refresh) {
+    // Create Master Compaction service stub without refreshing the master 
node from ZK,
+    // use cached data
+    if (cmsStub == null) {
+      cmsStub =
+          (CompactionService.BlockingInterface) 
createMasterStub(CompactionService.class, refresh);
+    }
+  }
+
+  /**
+   * Send compaction request to compaction manager
+   * @return True if send request successfully, otherwise false
+   * @throws IOException If an error occurs
+   */
+  @Override
+  public boolean requestCompactRegion(RegionInfo regionInfo, 
ColumnFamilyDescriptor cfd,
+    boolean major, int priority) {
+    if (!isCompactionOffloadEnabled()) {
+      return false;
+    }
+    if (cmsStub == null) {
+      createCompactionManagerStub(false);
+    }
+    if (cmsStub == null) {
+      return false;
+    }
+    CompactionService.BlockingInterface cms = cmsStub;
+    InetSocketAddress[] favoredNodesForRegion =
+      getFavoredNodesForRegion(regionInfo.getEncodedName());
+    CompactRequest.Builder builder =
+      
CompactRequest.newBuilder().setServer(ProtobufUtil.toServerName(getServerName()))
+        .setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
+        
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setMajor(major).setPriority(priority);
+    if (favoredNodesForRegion != null) {
+      for (InetSocketAddress address : favoredNodesForRegion) {
+        builder.addFavoredNodes(ProtobufUtil
+          .toServerName(ServerName.valueOf(address.getHostName(), 
address.getPort(), 0L)));
+      }
+    }
+    CompactRequest compactRequest = builder.build();
+    try {
+      LOG.debug("Request compaction to CompactionManager, region: {}, store: 
{}",
+        regionInfo.getRegionNameAsString(), cfd.getNameAsString());
+      cms.requestCompaction(null, compactRequest);
+      LOG.debug("Receive response of compaction from CompactionManager, 
region: {}, store: {}",
+        regionInfo.getRegionNameAsString(), cfd.getNameAsString());
+      return true;
+    } catch (ServiceException se) {
+      LOG.error("Failed to request compact region", se);
+      if (cmsStub == cms) {
+        cmsStub = null;
+      }
+      // Couldn't connect to the compaction manager, get location from zk and 
reconnect
+      createCompactionManagerStub(true);
+      return false;
+    }
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a2a8f9d..9749576 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1525,14 +1525,64 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     }
   }
 
-  protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
-      Collection<HStoreFile> filesToCompact, User user, long 
compactionStartTime,
-      List<Path> newFiles) throws IOException {
+  protected boolean completeCompaction(CompactionRequestImpl cr, List<String> 
filesToCompact,
+    User user, List<String> newFiles) throws IOException {
+    Collection<HStoreFile> selectedStoreFiles = new ArrayList<>();
+    for (String selectedFile : filesToCompact) {
+      HStoreFile storeFile = getStoreFileBasedOnFileName(selectedFile);
+      if (storeFile == null) {
+        return false;
+      } else {
+        selectedStoreFiles.add(storeFile);
+      }
+    }
+    Path regionDir = getRegionFileSystem().getRegionDir();
+    Path regionTmpDir = new Path(regionDir, ".tmp");
+    Path storeTmpDir = new Path(regionTmpDir, getColumnFamilyName());
+    List<Path> newFilePaths = new ArrayList<>();
+    for (String newFile : newFiles) {
+      newFilePaths.add(new Path(storeTmpDir, newFile));
+    }
+    completeCompaction(cr, selectedStoreFiles, user, newFilePaths);
+    return true;
+  }
+
+  private HStoreFile getStoreFileBasedOnFileName(String fileName) {
+    for (HStoreFile storefile : getStorefiles()) {
+      if (storefile.getPath().getName().equals(fileName)) {
+        LOG.debug("Found store file: {} for selectFileName: {}", storefile, 
fileName);
+        return storefile;
+      }
+    }
+    LOG.warn("Does not found store file for selectFileName: {}", fileName);
+    return null;
+  }
+
+  private HStoreFile getStoreFile(Path path) {
+    for (HStoreFile storefile : getStorefiles()) {
+      if (storefile.getPath().equals(path)) {
+        return storefile;
+      }
+    }
+    return null;
+  }
+
+  private synchronized List<HStoreFile> 
completeCompaction(CompactionRequestImpl cr,
+    Collection<HStoreFile> filesToCompact, User user, List<Path> newFiles) 
throws IOException {
+    // TODO check store contains files to compact
     // Do the steps necessary to complete the compaction.
     setStoragePolicyFromFileName(newFiles);
     List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
     writeCompactionWalRecord(filesToCompact, sfs);
     replaceStoreFiles(filesToCompact, sfs);
+    return sfs;
+  }
+
+  protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
+      Collection<HStoreFile> filesToCompact, User user, long 
compactionStartTime,
+      List<Path> newFiles) throws IOException {
+    List<HStoreFile> sfs = completeCompaction(cr, filesToCompact, user, 
newFiles);
+
     if (cr.isMajor()) {
       
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
       
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
@@ -1577,7 +1627,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     for (Path newFile : newFiles) {
       assert newFile != null;
       HStoreFile sf = moveFileIntoPlace(newFile);
-      if (this.getCoprocessorHost() != null) {
+      if (this.getCoprocessorHost() != null && cr != null) {
         getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
       }
       assert sf != null;
@@ -1880,21 +1930,30 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     // Before we do compaction, try to get rid of unneeded files to simplify 
things.
     removeUnneededFiles();
 
+    if (region.getRegionServerServices() != null
+        && region.getRegionServerServices().isCompactionOffloadEnabled()) {
+      if (!requestToCompactionManager(forceMajor, priority)) {
+        // if request to cm error, do local compaction or retry
+        return selectCompaction(priority, tracker, user, filesCompacting);
+      } else {
+        LOG.debug("request compaction to compaction server, regioninfo:{}, 
store ",
+          this.getRegionInfo(), this);
+      }
+      return Optional.empty();
+    } else {
+      return selectCompaction(priority, tracker, user, filesCompacting);
+    }
+  }
+
+  public Optional<CompactionContext> selectCompaction(int priority,
+      CompactionLifeCycleTracker tracker, User user, List<HStoreFile> 
filesCompacting)
+      throws IOException {
     final CompactionContext compaction = storeEngine.createCompaction();
     CompactionRequestImpl request = null;
     this.lock.readLock().lock();
     try {
       synchronized (filesCompacting) {
-        // First, see if coprocessor would want to override selection.
-        if (this.getCoprocessorHost() != null) {
-          final List<HStoreFile> candidatesForCoproc = 
compaction.preSelect(this.filesCompacting);
-          boolean override = getCoprocessorHost().preCompactSelection(this,
-              candidatesForCoproc, tracker, user);
-          if (override) {
-            // Coprocessor is overriding normal file selection.
-            compaction.forceSelect(new 
CompactionRequestImpl(candidatesForCoproc));
-          }
-        }
+        preCompactionSelection(compaction, tracker, user);
 
         // Normal case - coprocessor is not overriding file selection.
         if (!compaction.hasSelection()) {
@@ -1916,11 +1975,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
             offPeakCompactionTracker.set(false);
           }
         }
-        if (this.getCoprocessorHost() != null) {
-          this.getCoprocessorHost().postCompactSelection(
-              this, ImmutableList.copyOf(compaction.getRequest().getFiles()), 
tracker,
-              compaction.getRequest(), user);
-        }
+        postCompactionSelection(compaction, tracker, user);
         // Finally, we have the resulting files list. Check if we have any 
files at all.
         request = compaction.getRequest();
         Collection<HStoreFile> selectedFiles = request.getFiles();
@@ -1966,6 +2021,34 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     return Optional.of(compaction);
   }
 
+  private void preCompactionSelection(CompactionContext compaction,
+    CompactionLifeCycleTracker tracker, User user) throws IOException {
+    // First, see if coprocessor would want to override selection.
+    if (this.getCoprocessorHost() != null) {
+      final List<HStoreFile> candidatesForCoproc = 
compaction.preSelect(this.filesCompacting);
+      boolean override =
+        getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, 
tracker, user);
+      if (override) {
+        // Coprocessor is overriding normal file selection.
+        compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
+      }
+    }
+  }
+
+  private void postCompactionSelection(CompactionContext compaction,
+    CompactionLifeCycleTracker tracker, User user) throws IOException {
+    if (this.getCoprocessorHost() != null) {
+      this.getCoprocessorHost().postCompactSelection(this,
+        ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 
compaction.getRequest(),
+        user);
+    }
+  }
+
+  private boolean requestToCompactionManager(boolean forceMajor, int priority) 
{
+    return 
region.getRegionServerServices().requestCompactRegion(region.getRegionInfo(),
+      storeContext.getFamily(), forceMajor, priority);
+  }
+
   /** Adds the files to compacting files. filesCompacting must be locked. */
   private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
     if (CollectionUtils.isEmpty(filesToAdd)) {
@@ -2917,4 +3000,12 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       mixedRowReadsCount.increment();
     }
   }
+
+  public boolean getForceMajor() {
+    return this.forceMajor;
+  }
+
+  public void setForceMajor(boolean newForceMajor) {
+    this.forceMajor = newForceMajor;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index bfb25ab..3910ab4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -69,6 +70,7 @@ import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.CheckAndMutate;
 import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -237,6 +239,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanReques
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
@@ -3762,6 +3766,42 @@ public class RSRpcServices extends AbstractRpcServices 
implements
     return 
builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
   }
 
+
+  @Override
+  public CompleteCompactionResponse completeCompaction(RpcController 
controller,
+      CompleteCompactionRequest request) throws ServiceException {
+    RegionInfo regionInfo = ProtobufUtil.toRegionInfo(request.getRegionInfo());
+    ColumnFamilyDescriptor family = 
ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
+    LOG.debug("Region server receive complete compaction for region: {}, cf: 
{}",
+      regionInfo.getRegionNameAsString(), family.getNameAsString());
+    boolean success = false;
+    HRegion onlineRegion = 
regionServer.getOnlineRegion(regionInfo.getRegionName());
+    if (onlineRegion != null) {
+      HStore store = onlineRegion.getStore(family.getName());
+      if (store != null) {
+        if (store.getForceMajor()) {
+          store.setForceMajor(request.getNewForceMajor());
+        }
+        List<String> selectedFiles =
+            
request.getSelectedFilesList().stream().collect(Collectors.toList());
+        List<String> newFiles = 
request.getNewFilesList().stream().collect(Collectors.toList());
+        try {
+          // TODO: If we could write HFile directly into the data directory, 
here the completion
+          //  will be easier
+          success = store.completeCompaction(null, selectedFiles, null, 
newFiles);
+          LOG.debug("Complete compaction result: {} for region: {}, store {}", 
success,
+            regionInfo.getRegionNameAsString(),
+            store.getColumnFamilyDescriptor().getNameAsString());
+        } catch (IOException e) {
+          LOG.error("Failed to complete compaction for region: {}, store {}",
+            regionInfo.getRegionNameAsString(), 
store.getColumnFamilyDescriptor().getNameAsString(),
+            e);
+        }
+      }
+    }
+    return CompleteCompactionResponse.newBuilder().setSuccess(success).build();
+  }
+
   private void executeOpenRegionProcedures(OpenRegionRequest request,
       Map<TableName, TableDescriptor> tdCache) {
     long masterSystemTime = request.hasMasterSystemTime() ? 
request.getMasterSystemTime() : -1;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 7d9d25b..c718f15 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -322,4 +323,9 @@ public interface RegionServerServices extends Server, 
MutableOnlineRegions, Favo
    * @return {@link ZKPermissionWatcher}
    */
   ZKPermissionWatcher getZKPermissionWatcher();
+
+  boolean isCompactionOffloadEnabled();
+
+  boolean requestCompactRegion(RegionInfo regionInfo, ColumnFamilyDescriptor 
cfd, boolean major,
+      int priority);
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
index fb449c6..b7031b4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
@@ -22,6 +22,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -47,7 +48,9 @@ public class HBasePolicyProvider extends PolicyProvider {
     new Service("security.masterregion.protocol.acl",
       RegionServerStatusService.BlockingInterface.class),
     new Service("security.masterregion.protocol.acl",
-      
CompactionServerStatusProtos.CompactionServerStatusService.BlockingInterface.class)
+      
CompactionServerStatusProtos.CompactionServerStatusService.BlockingInterface.class),
+    new Service("security.masterregion.protocol.acl",
+      CompactionProtos.CompactionService.BlockingInterface.class)
   };
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 76f2220..c7e172a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.locking.EntityLock;
@@ -385,4 +386,15 @@ public class MockRegionServerServices implements 
RegionServerServices {
   public AsyncClusterConnection getAsyncClusterConnection() {
     return null;
   }
+
+  @Override
+  public boolean isCompactionOffloadEnabled() {
+    return true;
+  }
+
+  @Override
+  public boolean requestCompactRegion(RegionInfo regionInfo, 
ColumnFamilyDescriptor cfd,
+    boolean major, int priority) {
+    return false;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 8755749..37829bd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -109,6 +109,11 @@ public class DummyAsyncClusterConnection implements 
AsyncClusterConnection {
   }
 
   @Override
+  public AsyncCompactionServerService getCompactionServerService(ServerName 
serverName) {
+    return null;
+  }
+
+  @Override
   public NonceGenerator getNonceGenerator() {
     return null;
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
index 1edc414..b5da939 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.compactionserver;
 
+import static org.junit.Assert.assertNull;
+
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -41,8 +43,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
 @Category({CompactionServerTests.class, MediumTests.class})
 public class TestCompactionServer {
 
@@ -63,6 +63,7 @@ public class TestCompactionServer {
   @BeforeClass
   public static void beforeClass() throws Exception {
     
TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1).build());
+    TEST_UTIL.getAdmin().switchCompactionOffload(true);
     MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
     TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
     COMPACTION_SERVER = 
TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
@@ -86,6 +87,20 @@ public class TestCompactionServer {
     TEST_UTIL.deleteTableIfAny(TABLENAME);
   }
 
+
+  @Test
+  public void testCompactionServerReport() throws Exception {
+    CompactionOffloadManager compactionOffloadManager = 
MASTER.getCompactionOffloadManager();
+    TEST_UTIL.waitFor(60000, () -> 
!compactionOffloadManager.getOnlineServers().isEmpty()
+      && null != 
compactionOffloadManager.getOnlineServers().get(COMPACTION_SERVER_NAME));
+    // invoke compact
+    TEST_UTIL.compact(TABLENAME, false);
+    TEST_UTIL.waitFor(60000,
+      () -> COMPACTION_SERVER.rpcServices.requestCount.sum() > 0
+        && COMPACTION_SERVER.rpcServices.requestCount.sum() == 
compactionOffloadManager
+        
.getOnlineServers().get(COMPACTION_SERVER_NAME).getTotalNumberOfRequests());
+  }
+
   @Test
   public void testCompactionServerExpire() throws Exception {
     int initialNum = 
TEST_UTIL.getMiniHBaseCluster().getNumLiveCompactionServers();
@@ -98,11 +113,13 @@ public class TestCompactionServer {
 
     CompactionOffloadManager compactionOffloadManager = 
MASTER.getCompactionOffloadManager();
     TEST_UTIL.waitFor(60000,
-      () -> initialNum + 1 == 
compactionOffloadManager.getOnlineServersList().size());
+      () -> initialNum + 1 == 
compactionOffloadManager.getOnlineServersList().size()
+          && null != compactionOffloadManager.getLoad(compactionServerName));
 
     compactionServer.stop("test");
 
     TEST_UTIL.waitFor(60000,
       () -> initialNum == 
compactionOffloadManager.getOnlineServersList().size());
+    assertNull(compactionOffloadManager.getLoad(compactionServerName));
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 878f804..db4a5eb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -139,6 +140,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 
@@ -756,4 +759,22 @@ class MockRegionServer implements 
AdminProtos.AdminService.BlockingInterface,
   public AsyncClusterConnection getAsyncClusterConnection() {
     return null;
   }
+
+
+  @Override
+  public CompleteCompactionResponse completeCompaction(RpcController 
controller,
+    CompleteCompactionRequest request) throws ServiceException {
+    return null;
+  }
+
+  @Override
+  public boolean isCompactionOffloadEnabled() {
+    return true;
+  }
+
+  @Override
+  public boolean requestCompactRegion(RegionInfo regionInfo, 
ColumnFamilyDescriptor cfd,
+      boolean major, int priority) {
+    return false;
+  }
 }

Reply via email to