This is an automated email from the ASF dual-hosted git repository.
niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-25714 by this push:
new da0fa30 HBASE-25968 Request compact to compaction server (#3378)
da0fa30 is described below
commit da0fa3000e341058cba57b06d2afe2ba452ee6df
Author: niuyulin <[email protected]>
AuthorDate: Mon Jun 21 11:03:49 2021 +0800
HBASE-25968 Request compact to compaction server (#3378)
Signed-off-by: Duo Zhang <[email protected]>
---
.../hadoop/hbase/client/AsyncConnectionImpl.java | 4 +-
.../apache/hadoop/hbase/security/SecurityInfo.java | 3 +
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 13 +++
.../src/main/protobuf/server/Compaction.proto | 62 ++++++++++
.../src/main/protobuf/server/region/Admin.proto | 4 +
.../hbase/client/AsyncClusterConnection.java | 5 +
.../hbase/client/AsyncClusterConnectionImpl.java | 12 +-
.../hbase/client/AsyncCompactionServerService.java | 86 ++++++++++++++
.../hbase/client/AsyncRegionServerAdmin.java | 7 ++
.../hbase/compactionserver/CSRpcServices.java | 34 +++++-
.../compactionserver/CompactionThreadManager.java | 59 ++++++++++
.../hbase/compactionserver/HCompactionServer.java | 22 +++-
.../hadoop/hbase/master/MasterRpcServices.java | 27 ++++-
.../compaction/CompactionOffloadManager.java | 47 ++++++++
.../hadoop/hbase/regionserver/HRegionServer.java | 67 +++++++++++
.../apache/hadoop/hbase/regionserver/HStore.java | 129 ++++++++++++++++++---
.../hadoop/hbase/regionserver/RSRpcServices.java | 40 +++++++
.../hbase/regionserver/RegionServerServices.java | 6 +
.../hadoop/hbase/security/HBasePolicyProvider.java | 5 +-
.../hadoop/hbase/MockRegionServerServices.java | 12 ++
.../hbase/client/DummyAsyncClusterConnection.java | 5 +
.../compactionserver/TestCompactionServer.java | 23 +++-
.../hadoop/hbase/master/MockRegionServer.java | 21 ++++
23 files changed, 660 insertions(+), 33 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 8a1ac5a..87f9a6b 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -83,11 +83,11 @@ class AsyncConnectionImpl implements AsyncConnection {
final AsyncConnectionConfiguration connConf;
- private final User user;
+ protected final User user;
final ConnectionRegistry registry;
- private final int rpcTimeout;
+ protected final int rpcTimeout;
protected final RpcClient rpcClient;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
index e4e97e7..7ddc569 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
@@ -24,6 +24,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@@ -54,6 +55,8 @@ public class SecurityInfo {
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL,
Kind.HBASE_AUTH_TOKEN));
infos.put(CompactionServerStatusProtos.CompactionServerStatusService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL,
Kind.HBASE_AUTH_TOKEN));
+ infos.put(CompactionProtos.CompactionService.getDescriptor().getName(),
+ new SecurityInfo(SecurityConstants.COMPACTION_SERVER_KRB_PRINCIPAL,
Kind.HBASE_AUTH_TOKEN));
// NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider
ALSO ELSE
// new Service will not be found when all is Kerberized!!!!
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 1b00887..56661b4 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -172,6 +172,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ComparatorProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@@ -3801,4 +3802,16 @@ public final class ProtobufUtil {
.build();
}
+ public static String toString(CompactionProtos.CompactRequest request) {
+ ServerName rsServerName = ProtobufUtil.toServerName(request.getServer());
+ org.apache.hadoop.hbase.client.RegionInfo regionInfo =
+ ProtobufUtil.toRegionInfo(request.getRegionInfo());
+ ColumnFamilyDescriptor cfd =
ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
+ boolean major = request.getMajor();
+ int priority = request.getPriority();
+ return new StringBuilder("RS: ").append(rsServerName).append(", region: ")
+ .append(regionInfo.getRegionNameAsString()).append(", CF:
").append(cfd.getNameAsString())
+ .append(", major:").append(major).append(",
priority:").append(priority).toString();
+ }
+
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/Compaction.proto
b/hbase-protocol-shaded/src/main/protobuf/server/Compaction.proto
new file mode 100644
index 0000000..5e036c8
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/server/Compaction.proto
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+
+// This file contains protocol buffers that are used for
CompactionManagerProtocol.
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "CompactionProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "HBase.proto";
+import "server/ClusterStatus.proto";
+import "server/ErrorHandling.proto";
+
+message CompactRequest {
+ required ServerName server = 1;
+ required RegionInfo region_info = 2;
+ required ColumnFamilySchema family = 3;
+ required bool major = 4;
+ required int32 priority = 5;
+ repeated ServerName favored_nodes = 6;
+}
+
+message CompactResponse {
+}
+
+message CompleteCompactionRequest {
+ required RegionInfo region_info = 1;
+ required ColumnFamilySchema family = 2;
+ repeated string selected_files = 3;
+ repeated string new_files = 4;
+ required bool new_force_major = 5;
+}
+
+message CompleteCompactionResponse {
+ required bool success = 1;
+}
+
+service CompactionService {
+ /** Called when a region server request compact a column of a region. */
+ rpc RequestCompaction(CompactRequest)
+ returns(CompactResponse);
+
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
index 0667292..b86b499 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
@@ -30,6 +30,7 @@ import "server/Quota.proto";
import "server/ClusterStatus.proto";
import "server/region/WAL.proto";
import "server/region/TooSlowLog.proto";
+import "server/Compaction.proto";
message GetRegionInfoRequest {
required RegionSpecifier region = 1;
@@ -393,6 +394,9 @@ service AdminService {
rpc ExecuteProcedures(ExecuteProceduresRequest)
returns(ExecuteProceduresResponse);
+ rpc CompleteCompaction(CompleteCompactionRequest)
+ returns(CompleteCompactionResponse);
+
rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest)
returns(ClearSlowLogResponses);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 92118ac..28915d5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -42,6 +42,11 @@ public interface AsyncClusterConnection extends
AsyncConnection {
AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
/**
+ * Get the compaction service for the given compaction server.
+ */
+ AsyncCompactionServerService getCompactionServerService(ServerName
serverName);
+
+ /**
* Get the nonce generator for this connection.
*/
NonceGenerator getNonceGenerator();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 39fc3a2..06b4bbe 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -41,6 +41,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBul
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@@ -49,12 +50,16 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci
*/
@InterfaceAudience.Private
class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements
AsyncClusterConnection {
-
public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry
registry,
String clusterId, SocketAddress localAddress, User user) {
super(conf, registry, clusterId, localAddress, user);
}
+ CompactionProtos.CompactionService.Interface
createCompactionServerStub(ServerName serverName) {
+ return CompactionProtos.CompactionService
+ .newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
+ }
+
@Override
public NonceGenerator getNonceGenerator() {
return super.getNonceGenerator();
@@ -71,6 +76,11 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl
implements AsyncClu
}
@Override
+ public AsyncCompactionServerService getCompactionServerService(ServerName
serverName) {
+ return new AsyncCompactionServerService(serverName, this);
+ }
+
+ @Override
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
boolean writeFlushWALMarker) {
RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncCompactionServerService.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncCompactionServerService.java
new file mode 100644
index 0000000..27dd45e
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncCompactionServerService.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
+
+
+/**
+ * A simple wrapper of the {@link CompactionService} for a compaction server,
which returns a
+ * {@link CompletableFuture}. This is easier to use, as if you use the raw
protobuf interface, you
+ * need to get the result from the {@link RpcCallback}, and if there is an
exception, you need to
+ * get it from the {@link RpcController} passed in.
+ * <p/>
+ * Notice that there is no retry, and this is intentional. We have different
retry for different
+ * usage for now, if later we want to unify them, we can move the retry logic
into this class.
+ */
[email protected]
+public class AsyncCompactionServerService {
+
+ private final ServerName server;
+
+ private final AsyncClusterConnectionImpl conn;
+
+ AsyncCompactionServerService(ServerName server, AsyncClusterConnectionImpl
conn) {
+ this.server = server;
+ this.conn = conn;
+ }
+
+ @FunctionalInterface
+ private interface RpcCall<RESP> {
+ void call(CompactionService.Interface stub, HBaseRpcController controller,
+ RpcCallback<RESP> done);
+ }
+
+ // TODO: eliminate duplicate code in AsyncRegionServerAdmin and maybe we
could also change the
+ // way on how to do regionServerReport
+ private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+ CompletableFuture<RESP> future = new CompletableFuture<>();
+ HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ try {
+ rpcCall.call(conn.createCompactionServerStub(server), controller, new
RpcCallback<RESP>() {
+ @Override
+ public void run(RESP resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ }
+ });
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ public CompletableFuture<CompactResponse> requestCompaction(CompactRequest
request) {
+ return call((stub, controller, done) -> stub.requestCompaction(controller,
request, done));
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index cb06137..a625adc 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -66,6 +66,8 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
@@ -133,6 +135,11 @@ public class AsyncRegionServerAdmin {
return call((stub, controller, done) -> stub.getOnlineRegion(controller,
request, done));
}
+ public CompletableFuture<CompleteCompactionResponse>
+ completeCompaction(CompleteCompactionRequest request) {
+ return call((stub, controller, done) ->
stub.completeCompaction(controller, request, done));
+ }
+
public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest
request) {
return call((stub, controller, done) -> stub.openRegion(controller,
request, done));
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
index 9ef9c21..2a76d7a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AbstractRpcServices;
@@ -31,16 +32,25 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
@InterfaceAudience.Private
-public class CSRpcServices extends AbstractRpcServices {
+public class CSRpcServices extends AbstractRpcServices
+ implements CompactionService.BlockingInterface {
protected static final Logger LOG =
LoggerFactory.getLogger(CSRpcServices.class);
- protected final HCompactionServer compactionServer;
+ private final HCompactionServer compactionServer;
+
+ // Request counter.
+ final LongAdder requestCount = new LongAdder();
/** RPC scheduler to use for the compaction server. */
public static final String COMPACTION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.compaction.server.rpc.scheduler.factory.class";
-
/**
* @return immutable list of blocking services and the security info classes
that this server
* supports
@@ -48,6 +58,9 @@ public class CSRpcServices extends AbstractRpcServices {
protected List<RpcServer.BlockingServiceAndInterface> getServices(final
Configuration conf) {
// now return empty, compaction server do not receive rpc request
List<RpcServer.BlockingServiceAndInterface> bssi = new ArrayList<>();
+ bssi.add(new RpcServer.BlockingServiceAndInterface(
+ CompactionService.newReflectiveBlockingService(this),
+ CompactionService.BlockingInterface.class));
return new
ImmutableList.Builder<RpcServer.BlockingServiceAndInterface>().addAll(bssi).build();
}
@@ -65,4 +78,19 @@ public class CSRpcServices extends AbstractRpcServices {
compactionServer = cs;
}
+
+ /**
+ * Request compaction on the compaction server.
+ * @param controller the RPC controller
+ * @param request the compaction request
+ */
+ @Override
+ public CompactResponse requestCompaction(RpcController controller,
+ CompactionProtos.CompactRequest request) {
+ requestCount.increment();
+ LOG.info("Receive compaction request from {}",
ProtobufUtil.toString(request));
+ compactionServer.compactionThreadManager.requestCompaction();
+ return CompactionProtos.CompactResponse.newBuilder().build();
+ }
+
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
new file mode 100644
index 0000000..e7a5068
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class CompactionThreadManager {
+ private static Logger LOG =
LoggerFactory.getLogger(CompactionThreadManager.class);
+
+ private final Configuration conf;
+ private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
+ new ConcurrentHashMap<>();
+ private final HCompactionServer server;
+
+ public CompactionThreadManager(final Configuration conf, HCompactionServer
server) {
+ this.conf = conf;
+ this.server = server;
+ }
+
+ private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws
IOException {
+ AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
+ if (admin == null) {
+ LOG.debug("New RS admin connection to {}", sn);
+ admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
+ this.rsAdmins.put(sn, admin);
+ }
+ return admin;
+ }
+
+ public void requestCompaction() {
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
index 88a3176..cd09480 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos.CompactionServerStatusService;
@@ -75,6 +76,7 @@ public class HCompactionServer extends AbstractServer {
// Stub to do compaction server status calls against the master.
private volatile CompactionServerStatusService.BlockingInterface cssStub;
+ CompactionThreadManager compactionThreadManager;
/**
* Get the current master from ZooKeeper and open the RPC connection to it.
To get a fresh
* connection, the current cssStub must be null. Method will block until a
master is available.
@@ -118,6 +120,7 @@ public class HCompactionServer extends AbstractServer {
// login the server principal (if using secure Hadoop)
login(userProvider, this.rpcServices.getIsa().getHostName());
Superusers.initialize(conf);
+ this.compactionThreadManager = new CompactionThreadManager(conf, this);
this.rpcServices.start();
}
@@ -131,15 +134,30 @@ public class HCompactionServer extends AbstractServer {
SecurityConstants.COMPACTION_SERVER_KRB_PRINCIPAL, host);
}
- private boolean tryCompactionServerReport() throws IOException {
+ private ClusterStatusProtos.CompactionServerLoad buildServerLoad(long
reportStartTime,
+ long reportEndTime) {
+ ClusterStatusProtos.CompactionServerLoad.Builder serverLoad =
+ ClusterStatusProtos.CompactionServerLoad.newBuilder();
+ serverLoad.setCompactedCells(0);
+ serverLoad.setCompactingCells(0);
+ serverLoad.setTotalNumberOfRequests(rpcServices.requestCount.sum());
+ serverLoad.setReportStartTime(reportStartTime);
+ serverLoad.setReportEndTime(reportEndTime);
+ return serverLoad.build();
+ }
+
+ private boolean tryCompactionServerReport(long reportStartTime, long
reportEndTime)
+ throws IOException {
CompactionServerStatusService.BlockingInterface css = cssStub;
if (css == null) {
return false;
}
+ ClusterStatusProtos.CompactionServerLoad sl =
buildServerLoad(reportStartTime, reportEndTime);
try {
CompactionServerStatusProtos.CompactionServerReportRequest.Builder
request =
CompactionServerStatusProtos.CompactionServerReportRequest.newBuilder();
request.setServer(ProtobufUtil.toServerName(getServerName()));
+ request.setLoad(sl);
this.cssStub.compactionServerReport(null, request.build());
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
@@ -194,7 +212,7 @@ public class HCompactionServer extends AbstractServer {
while (!isStopped()) {
long now = System.currentTimeMillis();
if ((now - lastMsg) >= msgInterval) {
- if (tryCompactionServerReport() && !online.get()) {
+ if (tryCompactionServerReport(lastMsg, now) && !online.get()) {
synchronized (online) {
online.set(true);
online.notifyAll();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index bcf5e76..b8c8891 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -161,6 +161,12 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionIn
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos.CompactionServerReportRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos.CompactionServerReportResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos.CompactionServerStatusService;
@@ -418,11 +424,12 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.
public class MasterRpcServices extends RSRpcServices implements
MasterService.BlockingInterface,
RegionServerStatusService.BlockingInterface,
LockService.BlockingInterface, HbckService.BlockingInterface,
- ClientMetaService.BlockingInterface,
CompactionServerStatusService.BlockingInterface {
+ ClientMetaService.BlockingInterface,
CompactionServerStatusService.BlockingInterface,
+ CompactionProtos.CompactionService.BlockingInterface {
private static final Logger LOG =
LoggerFactory.getLogger(MasterRpcServices.class.getName());
private static final Logger AUDITLOG =
-
LoggerFactory.getLogger("SecurityLogger."+MasterRpcServices.class.getName());
+ LoggerFactory.getLogger("SecurityLogger." +
MasterRpcServices.class.getName());
private final HMaster master;
@@ -567,6 +574,9 @@ public class MasterRpcServices extends RSRpcServices
implements
bssi.add(new BlockingServiceAndInterface(
CompactionServerStatusService.newReflectiveBlockingService(this),
CompactionServerStatusService.BlockingInterface.class));
+ bssi.add(new BlockingServiceAndInterface(
+ CompactionService.newReflectiveBlockingService(this),
+ CompactionService.BlockingInterface.class));
bssi.addAll(super.getServices(conf));
return bssi;
}
@@ -3473,4 +3483,17 @@ public class MasterRpcServices extends RSRpcServices
implements
}
}
+ @Override
+ public CompactResponse requestCompaction(RpcController controller,
CompactRequest request)
+ throws ServiceException {
+ master.getCompactionOffloadManager().requestCompaction(request);
+ return CompactResponse.newBuilder().build();
+ }
+
+ @Override
+ public CompleteCompactionResponse completeCompaction(RpcController
controller,
+ CompleteCompactionRequest request) {
+ throw new UnsupportedOperationException("master not receive
completeCompaction");
+ }
+
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
index 03f6829..b0e80d1 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
@@ -22,22 +22,30 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CompactionServerMetrics;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncCompactionServerService;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import
org.apache.hadoop.hbase.master.procedure.SwitchCompactionOffloadProcedure;
import org.apache.hadoop.hbase.regionserver.CompactionOffloadSwitchStorage;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCompactionOffloadEnabledRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCompactionOffloadEnabledResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchCompactionOffloadRequest;
@@ -51,6 +59,8 @@ public class CompactionOffloadManager {
private CompactionOffloadSwitchStorage compactionOffloadSwitchStorage;
private static final Logger LOG =
LoggerFactory.getLogger(CompactionOffloadManager.class.getName());
+ private final ConcurrentMap<ServerName, AsyncCompactionServerService>
csStubs =
+ new ConcurrentHashMap<>();
public CompactionOffloadManager(final MasterServices master) {
this.masterServices = master;
@@ -138,4 +148,41 @@ public class CompactionOffloadManager {
.postSwitchCompactionOffload(oldCompactionOffloadEnable,
compactionOffloadEnabled);
return response;
}
+
+ /**
+ * Like there is a 1-1 mapping for region to RS, we will have it for
compaction of region to CS.
+ */
+ private ServerName selectCompactionServer(CompactRequest request) throws
ServiceException {
+ List<ServerName> compactionServerList = getOnlineServersList();
+ if (compactionServerList.size() <= 0) {
+ throw new ServiceException("compaction server is not available");
+ }
+ // TODO: need more complex and effective way to manage compaction of
region to CS mapping.
+ // maybe another assignment and balance module
+ long index = (request.getRegionInfo().getStartKey().hashCode() &
Integer.MAX_VALUE)
+ % compactionServerList.size();
+ return compactionServerList.get((int) index);
+ }
+
+ private AsyncCompactionServerService getCsStub(final ServerName sn) throws
IOException {
+ AsyncCompactionServerService csStub = this.csStubs.get(sn);
+ if (csStub == null) {
+ LOG.debug("New CS stub connection to {}", sn);
+ csStub =
this.masterServices.getAsyncClusterConnection().getCompactionServerService(sn);
+ this.csStubs.put(sn, csStub);
+ }
+ return csStub;
+ }
+
+ public CompactResponse requestCompaction(CompactRequest request) throws
ServiceException {
+ ServerName targetCompactionServer = selectCompactionServer(request);
+ LOG.info("Receive compaction request from {}, and send to Compaction
server:{}",
+ ProtobufUtil.toString(request), targetCompactionServer);
+ try {
+
FutureUtils.get(getCsStub(targetCompactionServer).requestCompaction(request));
+ } catch (Throwable t) {
+ LOG.error("requestCompaction from master to CS error: {}", t);
+ }
+ return null;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index ca2e1df..e49ad87 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZNodeClearer;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -211,6 +212,8 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
@@ -357,6 +360,7 @@ public class HRegionServer extends AbstractServer implements
// Stub to do region server status calls against the master.
private volatile RegionServerStatusService.BlockingInterface rssStub;
private volatile LockService.BlockingInterface lockStub;
+ private volatile CompactionService.BlockingInterface cmsStub;
private UncaughtExceptionHandler uncaughtExceptionHandler;
@@ -3736,4 +3740,67 @@ public class HRegionServer extends AbstractServer
implements
public long getRetryPauseTime() {
return this.retryPauseTime;
}
+
+ @Override
+ public boolean isCompactionOffloadEnabled(){
+ return regionServerCompactionOffloadManager.isCompactionOffloadEnabled();
+ }
+
+ private synchronized void createCompactionManagerStub(boolean refresh) {
+ // Create Master Compaction service stub without refreshing the master
node from ZK,
+ // use cached data
+ if (cmsStub == null) {
+ cmsStub =
+ (CompactionService.BlockingInterface)
createMasterStub(CompactionService.class, refresh);
+ }
+ }
+
+ /**
+ * Send compaction request to compaction manager
+ * @return True if send request successfully, otherwise false
+ * @throws IOException If an error occurs
+ */
+ @Override
+ public boolean requestCompactRegion(RegionInfo regionInfo,
ColumnFamilyDescriptor cfd,
+ boolean major, int priority) {
+ if (!isCompactionOffloadEnabled()) {
+ return false;
+ }
+ if (cmsStub == null) {
+ createCompactionManagerStub(false);
+ }
+ if (cmsStub == null) {
+ return false;
+ }
+ CompactionService.BlockingInterface cms = cmsStub;
+ InetSocketAddress[] favoredNodesForRegion =
+ getFavoredNodesForRegion(regionInfo.getEncodedName());
+ CompactRequest.Builder builder =
+
CompactRequest.newBuilder().setServer(ProtobufUtil.toServerName(getServerName()))
+ .setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
+
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setMajor(major).setPriority(priority);
+ if (favoredNodesForRegion != null) {
+ for (InetSocketAddress address : favoredNodesForRegion) {
+ builder.addFavoredNodes(ProtobufUtil
+ .toServerName(ServerName.valueOf(address.getHostName(),
address.getPort(), 0L)));
+ }
+ }
+ CompactRequest compactRequest = builder.build();
+ try {
+ LOG.debug("Request compaction to CompactionManager, region: {}, store:
{}",
+ regionInfo.getRegionNameAsString(), cfd.getNameAsString());
+ cms.requestCompaction(null, compactRequest);
+ LOG.debug("Receive response of compaction from CompactionManager,
region: {}, store: {}",
+ regionInfo.getRegionNameAsString(), cfd.getNameAsString());
+ return true;
+ } catch (ServiceException se) {
+ LOG.error("Failed to request compact region", se);
+ if (cmsStub == cms) {
+ cmsStub = null;
+ }
+ // Couldn't connect to the compaction manager, get location from zk and
reconnect
+ createCompactionManagerStub(true);
+ return false;
+ }
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a2a8f9d..9749576 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1525,14 +1525,64 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
}
}
- protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
- Collection<HStoreFile> filesToCompact, User user, long
compactionStartTime,
- List<Path> newFiles) throws IOException {
+ protected boolean completeCompaction(CompactionRequestImpl cr, List<String>
filesToCompact,
+ User user, List<String> newFiles) throws IOException {
+ Collection<HStoreFile> selectedStoreFiles = new ArrayList<>();
+ for (String selectedFile : filesToCompact) {
+ HStoreFile storeFile = getStoreFileBasedOnFileName(selectedFile);
+ if (storeFile == null) {
+ return false;
+ } else {
+ selectedStoreFiles.add(storeFile);
+ }
+ }
+ Path regionDir = getRegionFileSystem().getRegionDir();
+ Path regionTmpDir = new Path(regionDir, ".tmp");
+ Path storeTmpDir = new Path(regionTmpDir, getColumnFamilyName());
+ List<Path> newFilePaths = new ArrayList<>();
+ for (String newFile : newFiles) {
+ newFilePaths.add(new Path(storeTmpDir, newFile));
+ }
+ completeCompaction(cr, selectedStoreFiles, user, newFilePaths);
+ return true;
+ }
+
+ private HStoreFile getStoreFileBasedOnFileName(String fileName) {
+ for (HStoreFile storefile : getStorefiles()) {
+ if (storefile.getPath().getName().equals(fileName)) {
+ LOG.debug("Found store file: {} for selectFileName: {}", storefile,
fileName);
+ return storefile;
+ }
+ }
+ LOG.warn("Does not found store file for selectFileName: {}", fileName);
+ return null;
+ }
+
+ private HStoreFile getStoreFile(Path path) {
+ for (HStoreFile storefile : getStorefiles()) {
+ if (storefile.getPath().equals(path)) {
+ return storefile;
+ }
+ }
+ return null;
+ }
+
+ private synchronized List<HStoreFile>
completeCompaction(CompactionRequestImpl cr,
+ Collection<HStoreFile> filesToCompact, User user, List<Path> newFiles)
throws IOException {
+ // TODO check store contains files to compact
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
+ return sfs;
+ }
+
+ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
+ Collection<HStoreFile> filesToCompact, User user, long
compactionStartTime,
+ List<Path> newFiles) throws IOException {
+ List<HStoreFile> sfs = completeCompaction(cr, filesToCompact, user,
newFiles);
+
if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
@@ -1577,7 +1627,7 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
for (Path newFile : newFiles) {
assert newFile != null;
HStoreFile sf = moveFileIntoPlace(newFile);
- if (this.getCoprocessorHost() != null) {
+ if (this.getCoprocessorHost() != null && cr != null) {
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
}
assert sf != null;
@@ -1880,21 +1930,30 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
// Before we do compaction, try to get rid of unneeded files to simplify
things.
removeUnneededFiles();
+ if (region.getRegionServerServices() != null
+ && region.getRegionServerServices().isCompactionOffloadEnabled()) {
+ if (!requestToCompactionManager(forceMajor, priority)) {
+ // if request to cm error, do local compaction or retry
+ return selectCompaction(priority, tracker, user, filesCompacting);
+ } else {
+ LOG.debug("request compaction to compaction server, regioninfo:{},
store ",
+ this.getRegionInfo(), this);
+ }
+ return Optional.empty();
+ } else {
+ return selectCompaction(priority, tracker, user, filesCompacting);
+ }
+ }
+
+ public Optional<CompactionContext> selectCompaction(int priority,
+ CompactionLifeCycleTracker tracker, User user, List<HStoreFile>
filesCompacting)
+ throws IOException {
final CompactionContext compaction = storeEngine.createCompaction();
CompactionRequestImpl request = null;
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
- // First, see if coprocessor would want to override selection.
- if (this.getCoprocessorHost() != null) {
- final List<HStoreFile> candidatesForCoproc =
compaction.preSelect(this.filesCompacting);
- boolean override = getCoprocessorHost().preCompactSelection(this,
- candidatesForCoproc, tracker, user);
- if (override) {
- // Coprocessor is overriding normal file selection.
- compaction.forceSelect(new
CompactionRequestImpl(candidatesForCoproc));
- }
- }
+ preCompactionSelection(compaction, tracker, user);
// Normal case - coprocessor is not overriding file selection.
if (!compaction.hasSelection()) {
@@ -1916,11 +1975,7 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
offPeakCompactionTracker.set(false);
}
}
- if (this.getCoprocessorHost() != null) {
- this.getCoprocessorHost().postCompactSelection(
- this, ImmutableList.copyOf(compaction.getRequest().getFiles()),
tracker,
- compaction.getRequest(), user);
- }
+ postCompactionSelection(compaction, tracker, user);
// Finally, we have the resulting files list. Check if we have any
files at all.
request = compaction.getRequest();
Collection<HStoreFile> selectedFiles = request.getFiles();
@@ -1966,6 +2021,34 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
return Optional.of(compaction);
}
+ private void preCompactionSelection(CompactionContext compaction,
+ CompactionLifeCycleTracker tracker, User user) throws IOException {
+ // First, see if coprocessor would want to override selection.
+ if (this.getCoprocessorHost() != null) {
+ final List<HStoreFile> candidatesForCoproc =
compaction.preSelect(this.filesCompacting);
+ boolean override =
+ getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
tracker, user);
+ if (override) {
+ // Coprocessor is overriding normal file selection.
+ compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
+ }
+ }
+ }
+
+ private void postCompactionSelection(CompactionContext compaction,
+ CompactionLifeCycleTracker tracker, User user) throws IOException {
+ if (this.getCoprocessorHost() != null) {
+ this.getCoprocessorHost().postCompactSelection(this,
+ ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker,
compaction.getRequest(),
+ user);
+ }
+ }
+
+ private boolean requestToCompactionManager(boolean forceMajor, int priority)
{
+ return
region.getRegionServerServices().requestCompactRegion(region.getRegionInfo(),
+ storeContext.getFamily(), forceMajor, priority);
+ }
+
/** Adds the files to compacting files. filesCompacting must be locked. */
private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
if (CollectionUtils.isEmpty(filesToAdd)) {
@@ -2917,4 +3000,12 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
mixedRowReadsCount.increment();
}
}
+
+ public boolean getForceMajor() {
+ return this.forceMajor;
+ }
+
+ public void setForceMajor(boolean newForceMajor) {
+ this.forceMajor = newForceMajor;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index bfb25ab..3910ab4 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -69,6 +70,7 @@ import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -237,6 +239,8 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanReques
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
@@ -3762,6 +3766,42 @@ public class RSRpcServices extends AbstractRpcServices
implements
return
builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
}
+
+ @Override
+ public CompleteCompactionResponse completeCompaction(RpcController
controller,
+ CompleteCompactionRequest request) throws ServiceException {
+ RegionInfo regionInfo = ProtobufUtil.toRegionInfo(request.getRegionInfo());
+ ColumnFamilyDescriptor family =
ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
+ LOG.debug("Region server receive complete compaction for region: {}, cf:
{}",
+ regionInfo.getRegionNameAsString(), family.getNameAsString());
+ boolean success = false;
+ HRegion onlineRegion =
regionServer.getOnlineRegion(regionInfo.getRegionName());
+ if (onlineRegion != null) {
+ HStore store = onlineRegion.getStore(family.getName());
+ if (store != null) {
+ if (store.getForceMajor()) {
+ store.setForceMajor(request.getNewForceMajor());
+ }
+ List<String> selectedFiles =
+
request.getSelectedFilesList().stream().collect(Collectors.toList());
+ List<String> newFiles =
request.getNewFilesList().stream().collect(Collectors.toList());
+ try {
+ // TODO: If we could write HFile directly into the data directory,
here the completion
+ // will be easier
+ success = store.completeCompaction(null, selectedFiles, null,
newFiles);
+ LOG.debug("Complete compaction result: {} for region: {}, store {}",
success,
+ regionInfo.getRegionNameAsString(),
+ store.getColumnFamilyDescriptor().getNameAsString());
+ } catch (IOException e) {
+ LOG.error("Failed to complete compaction for region: {}, store {}",
+ regionInfo.getRegionNameAsString(),
store.getColumnFamilyDescriptor().getNameAsString(),
+ e);
+ }
+ }
+ }
+ return CompleteCompactionResponse.newBuilder().setSuccess(success).build();
+ }
+
private void executeOpenRegionProcedures(OpenRegionRequest request,
Map<TableName, TableDescriptor> tdCache) {
long masterSystemTime = request.hasMasterSystemTime() ?
request.getMasterSystemTime() : -1;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 7d9d25b..c718f15 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -322,4 +323,9 @@ public interface RegionServerServices extends Server,
MutableOnlineRegions, Favo
* @return {@link ZKPermissionWatcher}
*/
ZKPermissionWatcher getZKPermissionWatcher();
+
+ boolean isCompactionOffloadEnabled();
+
+ boolean requestCompactRegion(RegionInfo regionInfo, ColumnFamilyDescriptor
cfd, boolean major,
+ int priority);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
index fb449c6..b7031b4 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
@@ -22,6 +22,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -47,7 +48,9 @@ public class HBasePolicyProvider extends PolicyProvider {
new Service("security.masterregion.protocol.acl",
RegionServerStatusService.BlockingInterface.class),
new Service("security.masterregion.protocol.acl",
-
CompactionServerStatusProtos.CompactionServerStatusService.BlockingInterface.class)
+
CompactionServerStatusProtos.CompactionServerStatusService.BlockingInterface.class),
+ new Service("security.masterregion.protocol.acl",
+ CompactionProtos.CompactionService.BlockingInterface.class)
};
@Override
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 76f2220..c7e172a 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.locking.EntityLock;
@@ -385,4 +386,15 @@ public class MockRegionServerServices implements
RegionServerServices {
public AsyncClusterConnection getAsyncClusterConnection() {
return null;
}
+
+ @Override
+ public boolean isCompactionOffloadEnabled() {
+ return true;
+ }
+
+ @Override
+ public boolean requestCompactRegion(RegionInfo regionInfo,
ColumnFamilyDescriptor cfd,
+ boolean major, int priority) {
+ return false;
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 8755749..37829bd 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -109,6 +109,11 @@ public class DummyAsyncClusterConnection implements
AsyncClusterConnection {
}
@Override
+ public AsyncCompactionServerService getCompactionServerService(ServerName
serverName) {
+ return null;
+ }
+
+ @Override
public NonceGenerator getNonceGenerator() {
return null;
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
index 1edc414..b5da939 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.compactionserver;
+import static org.junit.Assert.assertNull;
+
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
@@ -41,8 +43,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-
@Category({CompactionServerTests.class, MediumTests.class})
public class TestCompactionServer {
@@ -63,6 +63,7 @@ public class TestCompactionServer {
@BeforeClass
public static void beforeClass() throws Exception {
TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1).build());
+ TEST_UTIL.getAdmin().switchCompactionOffload(true);
MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
COMPACTION_SERVER =
TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
@@ -86,6 +87,20 @@ public class TestCompactionServer {
TEST_UTIL.deleteTableIfAny(TABLENAME);
}
+
+ @Test
+ public void testCompactionServerReport() throws Exception {
+ CompactionOffloadManager compactionOffloadManager =
MASTER.getCompactionOffloadManager();
+ TEST_UTIL.waitFor(60000, () ->
!compactionOffloadManager.getOnlineServers().isEmpty()
+ && null !=
compactionOffloadManager.getOnlineServers().get(COMPACTION_SERVER_NAME));
+ // invoke compact
+ TEST_UTIL.compact(TABLENAME, false);
+ TEST_UTIL.waitFor(60000,
+ () -> COMPACTION_SERVER.rpcServices.requestCount.sum() > 0
+ && COMPACTION_SERVER.rpcServices.requestCount.sum() ==
compactionOffloadManager
+
.getOnlineServers().get(COMPACTION_SERVER_NAME).getTotalNumberOfRequests());
+ }
+
@Test
public void testCompactionServerExpire() throws Exception {
int initialNum =
TEST_UTIL.getMiniHBaseCluster().getNumLiveCompactionServers();
@@ -98,11 +113,13 @@ public class TestCompactionServer {
CompactionOffloadManager compactionOffloadManager =
MASTER.getCompactionOffloadManager();
TEST_UTIL.waitFor(60000,
- () -> initialNum + 1 ==
compactionOffloadManager.getOnlineServersList().size());
+ () -> initialNum + 1 ==
compactionOffloadManager.getOnlineServersList().size()
+ && null != compactionOffloadManager.getLoad(compactionServerName));
compactionServer.stop("test");
TEST_UTIL.waitFor(60000,
() -> initialNum ==
compactionOffloadManager.getOnlineServersList().size());
+ assertNull(compactionOffloadManager.getLoad(compactionServerName));
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 878f804..db4a5eb 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -139,6 +140,8 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
@@ -756,4 +759,22 @@ class MockRegionServer implements
AdminProtos.AdminService.BlockingInterface,
public AsyncClusterConnection getAsyncClusterConnection() {
return null;
}
+
+
+ @Override
+ public CompleteCompactionResponse completeCompaction(RpcController
controller,
+ CompleteCompactionRequest request) throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public boolean isCompactionOffloadEnabled() {
+ return true;
+ }
+
+ @Override
+ public boolean requestCompactRegion(RegionInfo regionInfo,
ColumnFamilyDescriptor cfd,
+ boolean major, int priority) {
+ return false;
+ }
}