This is an automated email from the ASF dual-hosted git repository.
meszibalu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 04b293ccf0c HBASE-29763 Implement co-processor host for client-meta
service (#7533)
04b293ccf0c is described below
commit 04b293ccf0c8359873c1cf42082b8d0b8fb06e61
Author: Balazs Meszaros <[email protected]>
AuthorDate: Fri Dec 12 08:51:01 2025 +0100
HBASE-29763 Implement co-processor host for client-meta service (#7533)
Added hbase.coprocessor.clientmeta.classes configuration property for
loading client-meta co-processors.
Signed-off-by: Wellington Chevreuil <[email protected]>
---
.../apache/hadoop/hbase/HBaseRpcServicesBase.java | 110 +++++++++--
.../hbase/coprocessor/ClientMetaCoprocessor.java | 32 ++++
.../ClientMetaCoprocessorEnvironment.java | 29 +++
.../coprocessor/ClientMetaCoprocessorHost.java | 212 +++++++++++++++++++++
.../hbase/coprocessor/ClientMetaObserver.java | 153 +++++++++++++++
.../hadoop/hbase/coprocessor/CoprocessorHost.java | 2 +
.../coprocessor/TestClientMetaCoprocessor.java | 208 ++++++++++++++++++++
7 files changed, 728 insertions(+), 18 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java
index b2a0e780362..d6d27780883 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java
@@ -17,17 +17,21 @@
*/
package org.apache.hadoop.hbase;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.coprocessor.ClientMetaCoprocessorHost;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
@@ -110,6 +114,8 @@ public abstract class HBaseRpcServicesBase<S extends
HBaseServerBase<?>>
protected final PriorityFunction priority;
+ private ClientMetaCoprocessorHost clientMetaCoprocessorHost;
+
private AccessChecker accessChecker;
private ZKPermissionWatcher zkPermissionWatcher;
@@ -158,6 +164,8 @@ public abstract class HBaseRpcServicesBase<S extends
HBaseServerBase<?>>
// Set our address, however we need the final port that was given to
rpcServer
isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
rpcServer.setErrorHandler(this);
+
+ clientMetaCoprocessorHost = new ClientMetaCoprocessorHost(conf);
}
protected abstract boolean defaultReservoirEnabled();
@@ -199,6 +207,12 @@ public abstract class HBaseRpcServicesBase<S extends
HBaseServerBase<?>>
}
}
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ public ClientMetaCoprocessorHost getClientMetaCoprocessorHost() {
+ return clientMetaCoprocessorHost;
+ }
+
public AccessChecker getAccessChecker() {
return accessChecker;
}
@@ -261,15 +275,36 @@ public abstract class HBaseRpcServicesBase<S extends
HBaseServerBase<?>>
@Override
public GetClusterIdResponse getClusterId(RpcController controller,
GetClusterIdRequest request)
throws ServiceException {
- return
GetClusterIdResponse.newBuilder().setClusterId(server.getClusterId()).build();
+ try {
+ clientMetaCoprocessorHost.preGetClusterId();
+
+ String clusterId = server.getClusterId();
+ String clusterIdReply =
clientMetaCoprocessorHost.postGetClusterId(clusterId);
+
+ return
GetClusterIdResponse.newBuilder().setClusterId(clusterIdReply).build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
}
@Override
public GetActiveMasterResponse getActiveMaster(RpcController controller,
GetActiveMasterRequest request) throws ServiceException {
GetActiveMasterResponse.Builder builder =
GetActiveMasterResponse.newBuilder();
- server.getActiveMaster()
- .ifPresent(name ->
builder.setServerName(ProtobufUtil.toServerName(name)));
+
+ try {
+ clientMetaCoprocessorHost.preGetActiveMaster();
+
+ ServerName serverName = server.getActiveMaster().orElse(null);
+ ServerName serverNameReply =
clientMetaCoprocessorHost.postGetActiveMaster(serverName);
+
+ if (serverNameReply != null) {
+ builder.setServerName(ProtobufUtil.toServerName(serverNameReply));
+ }
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+
return builder.build();
}
@@ -277,12 +312,25 @@ public abstract class HBaseRpcServicesBase<S extends
HBaseServerBase<?>>
public GetMastersResponse getMasters(RpcController controller,
GetMastersRequest request)
throws ServiceException {
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
- server.getActiveMaster()
- .ifPresent(activeMaster ->
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
-
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true)));
- server.getBackupMasters()
- .forEach(backupMaster ->
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
-
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false)));
+
+ try {
+ clientMetaCoprocessorHost.preGetMasters();
+
+ Map<ServerName, Boolean> serverNames = new LinkedHashMap<>();
+
+ server.getActiveMaster().ifPresent(serverName ->
serverNames.put(serverName, Boolean.TRUE));
+ server.getBackupMasters().forEach(serverName ->
serverNames.put(serverName, Boolean.FALSE));
+
+ Map<ServerName, Boolean> serverNamesReply =
+ clientMetaCoprocessorHost.postGetMasters(serverNames);
+
+ serverNamesReply
+ .forEach((serverName, active) ->
builder.addMasterServers(GetMastersResponseEntry
+
.newBuilder().setServerName(ProtobufUtil.toServerName(serverName)).setIsActive(active)));
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+
return builder.build();
}
@@ -290,22 +338,46 @@ public abstract class HBaseRpcServicesBase<S extends
HBaseServerBase<?>>
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController
controller,
GetMetaRegionLocationsRequest request) throws ServiceException {
GetMetaRegionLocationsResponse.Builder builder =
GetMetaRegionLocationsResponse.newBuilder();
- server.getMetaLocations()
- .forEach(location ->
builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)));
+
+ try {
+ clientMetaCoprocessorHost.preGetMetaLocations();
+
+ List<HRegionLocation> metaLocations = server.getMetaLocations();
+ List<HRegionLocation> metaLocationsReply =
+ clientMetaCoprocessorHost.postGetMetaLocations(metaLocations);
+
+ metaLocationsReply
+ .forEach(location ->
builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)));
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+
return builder.build();
}
@Override
public final GetBootstrapNodesResponse getBootstrapNodes(RpcController
controller,
GetBootstrapNodesRequest request) throws ServiceException {
- int maxNodeCount =
server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
- DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
- ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
- sample.add(server.getBootstrapNodes());
-
GetBootstrapNodesResponse.Builder builder =
GetBootstrapNodesResponse.newBuilder();
- sample.getSamplingResult().stream().map(ProtobufUtil::toServerName)
- .forEach(builder::addServerName);
+
+ try {
+ clientMetaCoprocessorHost.preGetBootstrapNodes();
+
+ int maxNodeCount =
server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
+ DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
+ ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
+ sample.add(server.getBootstrapNodes());
+
+ List<ServerName> bootstrapNodes = sample.getSamplingResult();
+ List<ServerName> bootstrapNodesReply =
+ clientMetaCoprocessorHost.postGetBootstrapNodes(bootstrapNodes);
+
+ bootstrapNodesReply
+ .forEach(serverName ->
builder.addServerName(ProtobufUtil.toServerName(serverName)));
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+
return builder.build();
}
@@ -316,6 +388,8 @@ public abstract class HBaseRpcServicesBase<S extends
HBaseServerBase<?>>
try {
requirePermission("updateConfiguration", Permission.Action.ADMIN);
this.server.updateConfiguration();
+
+ clientMetaCoprocessorHost = new
ClientMetaCoprocessorHost(getConfiguration());
} catch (Exception e) {
throw new ServiceException(e);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessor.java
new file mode 100644
index 00000000000..bbf963e3239
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
[email protected](HBaseInterfaceAudience.COPROC)
[email protected]
+public interface ClientMetaCoprocessor extends Coprocessor {
+ default Optional<ClientMetaObserver> getClientMetaObserver() {
+ return Optional.empty();
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorEnvironment.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorEnvironment.java
new file mode 100644
index 00000000000..ea9e8d190ab
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorEnvironment.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
[email protected](HBaseInterfaceAudience.COPROC)
[email protected]
+public interface ClientMetaCoprocessorEnvironment
+ extends CoprocessorEnvironment<ClientMetaCoprocessor> {
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorHost.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorHost.java
new file mode 100644
index 00000000000..87e5f7586fd
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorHost.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class ClientMetaCoprocessorHost
+ extends CoprocessorHost<ClientMetaCoprocessor,
ClientMetaCoprocessorEnvironment> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ClientMetaCoprocessorHost.class);
+
+ private static class ClientMetaEnvironment extends
BaseEnvironment<ClientMetaCoprocessor>
+ implements ClientMetaCoprocessorEnvironment {
+
+ public ClientMetaEnvironment(ClientMetaCoprocessor impl, int priority, int
seq,
+ Configuration conf) {
+ super(impl, priority, seq, conf);
+ }
+ }
+
+ public ClientMetaCoprocessorHost(Configuration conf) {
+ // RPCServer cannot be aborted, so we don't pass Abortable down here.
+ super(null);
+ this.conf = conf;
+ boolean coprocessorsEnabled =
+ conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
DEFAULT_COPROCESSORS_ENABLED);
+ LOG.trace("System coprocessor loading is {}", (coprocessorsEnabled ?
"enabled" : "disabled"));
+ loadSystemCoprocessors(conf, CLIENT_META_COPROCESSOR_CONF_KEY);
+ }
+
+ @Override
+ public ClientMetaCoprocessorEnvironment
createEnvironment(ClientMetaCoprocessor instance,
+ int priority, int sequence, Configuration conf) {
+ return new ClientMetaEnvironment(instance, priority, sequence, conf);
+ }
+
+ @Override
+ public ClientMetaCoprocessor checkAndGetInstance(Class<?> implClass)
+ throws InstantiationException, IllegalAccessException {
+ try {
+ if (ClientMetaCoprocessor.class.isAssignableFrom(implClass)) {
+ return
implClass.asSubclass(ClientMetaCoprocessor.class).getDeclaredConstructor()
+ .newInstance();
+ } else {
+ LOG.error("{} is not of type ClientMetaCoprocessor. Check the
configuration of {}",
+ implClass.getName(), CLIENT_META_COPROCESSOR_CONF_KEY);
+ return null;
+ }
+ } catch (NoSuchMethodException | InvocationTargetException e) {
+ throw (InstantiationException) new
InstantiationException(implClass.getName()).initCause(e);
+ }
+ }
+
+ private final ObserverGetter<ClientMetaCoprocessor, ClientMetaObserver>
clientMetaObserverGetter =
+ ClientMetaCoprocessor::getClientMetaObserver;
+
+ public void preGetClusterId() throws IOException {
+ execOperation(coprocEnvironments.isEmpty()
+ ? null
+ : new
ObserverOperationWithoutResult<ClientMetaObserver>(clientMetaObserverGetter) {
+ @Override
+ protected void call(ClientMetaObserver observer) throws IOException {
+ observer.preGetClusterId(this);
+ }
+ });
+ }
+
+ public String postGetClusterId(String clusterId) throws IOException {
+ if (coprocEnvironments.isEmpty()) {
+ return clusterId;
+ }
+
+ return execOperationWithResult(new
ObserverOperationWithResult<ClientMetaObserver, String>(
+ clientMetaObserverGetter, clusterId) {
+ @Override
+ protected String call(ClientMetaObserver observer) throws IOException {
+ return observer.postGetClusterId(this, getResult());
+ }
+ });
+ }
+
+ public void preGetActiveMaster() throws IOException {
+ execOperation(coprocEnvironments.isEmpty()
+ ? null
+ : new
ObserverOperationWithoutResult<ClientMetaObserver>(clientMetaObserverGetter) {
+ @Override
+ protected void call(ClientMetaObserver observer) throws IOException {
+ observer.preGetActiveMaster(this);
+ }
+ });
+ }
+
+ public ServerName postGetActiveMaster(ServerName serverName) throws
IOException {
+ if (coprocEnvironments.isEmpty()) {
+ return serverName;
+ }
+
+ return execOperationWithResult(new
ObserverOperationWithResult<ClientMetaObserver, ServerName>(
+ clientMetaObserverGetter, serverName) {
+ @Override
+ protected ServerName call(ClientMetaObserver observer) throws
IOException {
+ return observer.postGetActiveMaster(this, getResult());
+ }
+ });
+ }
+
+ public void preGetMasters() throws IOException {
+ execOperation(coprocEnvironments.isEmpty()
+ ? null
+ : new
ObserverOperationWithoutResult<ClientMetaObserver>(clientMetaObserverGetter) {
+ @Override
+ protected void call(ClientMetaObserver observer) throws IOException {
+ observer.preGetMasters(this);
+ }
+ });
+ }
+
+ public Map<ServerName, Boolean> postGetMasters(Map<ServerName, Boolean>
serverNames)
+ throws IOException {
+ if (coprocEnvironments.isEmpty()) {
+ return serverNames;
+ }
+
+ return execOperationWithResult(
+ new ObserverOperationWithResult<ClientMetaObserver, Map<ServerName,
Boolean>>(
+ clientMetaObserverGetter, serverNames) {
+ @Override
+ protected Map<ServerName, Boolean> call(ClientMetaObserver observer)
throws IOException {
+ return observer.postGetMasters(this, getResult());
+ }
+ });
+ }
+
+ public void preGetBootstrapNodes() throws IOException {
+ execOperation(coprocEnvironments.isEmpty()
+ ? null
+ : new
ObserverOperationWithoutResult<ClientMetaObserver>(clientMetaObserverGetter) {
+ @Override
+ protected void call(ClientMetaObserver observer) throws IOException {
+ observer.preGetBootstrapNodes(this);
+ }
+ });
+ }
+
+ public List<ServerName> postGetBootstrapNodes(List<ServerName>
bootstrapNodes)
+ throws IOException {
+ if (coprocEnvironments.isEmpty()) {
+ return bootstrapNodes;
+ }
+
+ return execOperationWithResult(
+ new ObserverOperationWithResult<ClientMetaObserver, List<ServerName>>(
+ clientMetaObserverGetter, bootstrapNodes) {
+ @Override
+ protected List<ServerName> call(ClientMetaObserver observer) throws
IOException {
+ return observer.postGetBootstrapNodes(this, getResult());
+ }
+ });
+ }
+
+ public void preGetMetaLocations() throws IOException {
+ execOperation(coprocEnvironments.isEmpty()
+ ? null
+ : new
ObserverOperationWithoutResult<ClientMetaObserver>(clientMetaObserverGetter) {
+ @Override
+ protected void call(ClientMetaObserver observer) throws IOException {
+ observer.preGetMetaLocations(this);
+ }
+ });
+ }
+
+ public List<HRegionLocation> postGetMetaLocations(List<HRegionLocation>
metaLocations)
+ throws IOException {
+ if (coprocEnvironments.isEmpty()) {
+ return metaLocations;
+ }
+
+ return execOperationWithResult(
+ new ObserverOperationWithResult<ClientMetaObserver,
List<HRegionLocation>>(
+ clientMetaObserverGetter, metaLocations) {
+ @Override
+ protected List<HRegionLocation> call(ClientMetaObserver observer)
throws IOException {
+ return observer.postGetMetaLocations(this, getResult());
+ }
+ });
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaObserver.java
new file mode 100644
index 00000000000..59930d3b4b7
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaObserver.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Defines coprocessor hooks for interacting operations for ClientMetaService
which is responsible
+ * for ZooKeeperless HBase service discovery. <br>
+ * <br>
+ * Since most implementations will be interested in only a subset of hooks,
this class uses
+ * 'default' functions to avoid having to add unnecessary overrides. When the
functions are
+ * non-empty, it's simply to satisfy the compiler by returning value of
expected (non-void) type. It
+ * is done in a way that these default definitions act as no-op. So our
suggestion to implementation
+ * would be to not call these 'default' methods from overrides. <br>
+ * <br>
+ * <h3>Exception Handling</h3> For all functions, exception handling is done
as follows:
+ * <ul>
+ * <li>Exceptions of type {@link IOException} are reported back to client.</li>
+ * <li>For any other kind of exception:
+ * <ul>
+ * <li>If the configuration {@link CoprocessorHost#ABORT_ON_ERROR_KEY} is set
to true, then the
+ * server aborts.</li>
+ * <li>Otherwise, coprocessor is removed from the server and
+ * {@link org.apache.hadoop.hbase.DoNotRetryIOException} is returned to the
client.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ */
[email protected](HBaseInterfaceAudience.COPROC)
[email protected]
+public interface ClientMetaObserver {
+ /**
+ * Called before getting the cluster ID.
+ * @param ctx the environment to interact with the framework
+ */
+ default void
preGetClusterId(ObserverContext<ClientMetaCoprocessorEnvironment> ctx)
+ throws IOException {
+ }
+
+ /**
+ * Called after we got the cluster ID.
+ * @param ctx the environment to interact with the framework
+ * @param clusterId the actual cluster ID
+ * @return the cluster ID which is returned to the client.
+ */
+ default String
postGetClusterId(ObserverContext<ClientMetaCoprocessorEnvironment> ctx,
+ String clusterId) throws IOException {
+ return clusterId;
+ }
+
+ /**
+ * Called before getting the active master.
+ * @param ctx the environment to interact with the framework
+ */
+ default void
preGetActiveMaster(ObserverContext<ClientMetaCoprocessorEnvironment> ctx)
+ throws IOException {
+ }
+
+ /**
+ * Called after we got the active master.
+ * @param ctx the environment to interact with the framework
+ * @param serverName the actual active master address. It can be {@code
null} if there is no
+ * active master.
+ * @return the active master address which is returned to the client. It can
be {@code null}.
+ */
+ default ServerName
postGetActiveMaster(ObserverContext<ClientMetaCoprocessorEnvironment> ctx,
+ ServerName serverName) throws IOException {
+ return serverName;
+ }
+
+ /**
+ * Called before getting the master servers.
+ * @param ctx the environment to interact with the framework
+ */
+ default void preGetMasters(ObserverContext<ClientMetaCoprocessorEnvironment>
ctx)
+ throws IOException {
+ }
+
+ /**
+ * Called after we got the master servers.
+ * @param ctx the environment to interact with the framework
+ * @param serverNames the actual master servers addresses and active statuses
+ * @return the master servers addresses and active statuses which are
returned to the client.
+ */
+ default Map<ServerName, Boolean> postGetMasters(
+ ObserverContext<ClientMetaCoprocessorEnvironment> ctx, Map<ServerName,
Boolean> serverNames)
+ throws IOException {
+ return serverNames;
+ }
+
+ /**
+ * Called before getting bootstrap nodes.
+ * @param ctx the environment to interact with the framework
+ */
+ default void
preGetBootstrapNodes(ObserverContext<ClientMetaCoprocessorEnvironment> ctx)
+ throws IOException {
+ }
+
+ /**
+ * Called after we got bootstrap nodes.
+ * @param ctx the environment to interact with the framework
+ * @param bootstrapNodes the actual bootstrap nodes
+ * @return the bootstrap nodes which are returned to the client.
+ */
+ default List<ServerName> postGetBootstrapNodes(
+ ObserverContext<ClientMetaCoprocessorEnvironment> ctx, List<ServerName>
bootstrapNodes)
+ throws IOException {
+ return bootstrapNodes;
+ }
+
+ /**
+ * Called before getting the meta region locations.
+ * @param ctx the environment to interact with the framework
+ */
+ default void
preGetMetaLocations(ObserverContext<ClientMetaCoprocessorEnvironment> ctx)
+ throws IOException {
+ }
+
+ /**
+ * Called after we got the meta region locations.
+ * @param ctx the environment to interact with the framework
+ * @param metaLocations the actual meta region locations
+ * @return the meta region locations which are returned to the client.
+ */
+ default List<HRegionLocation> postGetMetaLocations(
+ ObserverContext<ClientMetaCoprocessorEnvironment> ctx,
List<HRegionLocation> metaLocations)
+ throws IOException {
+ return metaLocations;
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index 137fe3b061d..c201b29881c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -61,6 +61,8 @@ public abstract class CoprocessorHost<C extends Coprocessor,
E extends Coprocess
public static final String USER_REGION_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.user.region.classes";
public static final String MASTER_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.master.classes";
+ public static final String CLIENT_META_COPROCESSOR_CONF_KEY =
+ "hbase.coprocessor.clientmeta.classes";
public static final String WAL_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.wal.classes";
public static final String RPC_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.rpc.classes";
public static final String ABORT_ON_ERROR_KEY =
"hbase.coprocessor.abortonerror";
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClientMetaCoprocessor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClientMetaCoprocessor.java
new file mode 100644
index 00000000000..3b0b31c710f
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClientMetaCoprocessor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.ConnectionRegistry;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests invocation of the {@link MasterObserver} interface hooks at all
appropriate times during
+ * normal HMaster operations.
+ */
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestClientMetaCoprocessor {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestClientMetaCoprocessor.class);
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestClientMetaCoprocessor.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ private static final ServerName SERVER_NAME =
ServerName.valueOf("localhost", 1234, 12345);
+
+ public static class TestCoprocessor implements ClientMetaCoprocessor {
+ protected final ClientMetaObserver observer;
+
+ public TestCoprocessor() {
+ observer = mock(ClientMetaObserver.class);
+ resetMock();
+ }
+
+ protected void resetMock() {
+ reset(observer);
+
+ try {
+ doAnswer(answer -> answer.getArgument(1, String.class)).when(observer)
+ .postGetClusterId(any(), any());
+ doAnswer(answer -> {
+ return answer.getArgument(1, ServerName.class);
+ }).when(observer).postGetActiveMaster(any(), any());
+ doAnswer(answer -> answer.getArgument(1,
Map.class)).when(observer).postGetMasters(any(),
+ any());
+ doAnswer(answer -> answer.getArgument(1, List.class)).when(observer)
+ .postGetBootstrapNodes(any(), any());
+ doAnswer(answer -> answer.getArgument(1, List.class)).when(observer)
+ .postGetMetaLocations(any(), any());
+ } catch (IOException e) {
+ throw new IllegalStateException("Could not setup observer mock.", e);
+ }
+ }
+
+ @Override
+ public Optional<ClientMetaObserver> getClientMetaObserver() {
+ return Optional.of(observer);
+ }
+ }
+
+ private static TestCoprocessor getCoprocessor() {
+ HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+ MasterRpcServices masterRpcServices = master.getMasterRpcServices();
+
+ return
masterRpcServices.getClientMetaCoprocessorHost().findCoprocessor(TestCoprocessor.class);
+ }
+
+ private static ClientMetaObserver getObserverMock() {
+ return getCoprocessor().getClientMetaObserver().get();
+ }
+
+ private static void resetObserverMock() {
+ getCoprocessor().resetMock();
+ }
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(CoprocessorHost.CLIENT_META_COPROCESSOR_CONF_KEY,
TestCoprocessor.class.getName());
+
+ UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setupBefore() {
+ resetObserverMock();
+ }
+
+ @Test
+ public void testGetClusterId() throws Exception {
+ ClientMetaObserver observer = getObserverMock();
+
+ try (AsyncConnectionImpl asyncConnection = (AsyncConnectionImpl)
ConnectionFactory
+ .createAsyncConnection(UTIL.getConfiguration()).get()) {
+ ConnectionRegistry connectionRegistry =
asyncConnection.getConnectionRegistry();
+
+ doReturn("cluster-id").when(observer).postGetClusterId(any(), any());
+ clearInvocations(observer);
+
+ String clusterId = connectionRegistry.getClusterId().get();
+ assertEquals("cluster-id", clusterId);
+
+ verify(observer).preGetClusterId(any());
+ verify(observer).postGetClusterId(any(), any());
+ verifyNoMoreInteractions(observer);
+ }
+ }
+
+ @Test
+ public void testGetActiveMaster() throws Exception {
+ ClientMetaObserver observer = getObserverMock();
+
+ try (AsyncConnectionImpl asyncConnection = (AsyncConnectionImpl)
ConnectionFactory
+ .createAsyncConnection(UTIL.getConfiguration()).get()) {
+ ConnectionRegistry connectionRegistry =
asyncConnection.getConnectionRegistry();
+
+ doReturn(SERVER_NAME).when(observer).postGetActiveMaster(any(), any());
+ clearInvocations(observer);
+
+ ServerName activeMaster = connectionRegistry.getActiveMaster().get();
+ assertEquals(SERVER_NAME, activeMaster);
+
+ verify(observer).preGetActiveMaster(any());
+ verify(observer).postGetActiveMaster(any(), any());
+ verifyNoMoreInteractions(observer);
+ }
+ }
+
+ @Test
+ public void testGetMetaRegionLocations() throws Exception {
+ ClientMetaObserver observer = getObserverMock();
+
+ try (AsyncConnectionImpl asyncConnection = (AsyncConnectionImpl)
ConnectionFactory
+ .createAsyncConnection(UTIL.getConfiguration()).get()) {
+ ConnectionRegistry connectionRegistry =
asyncConnection.getConnectionRegistry();
+
+ HRegionLocation metaRegionLocation =
+ new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO,
SERVER_NAME);
+
doReturn(List.of(metaRegionLocation)).when(observer).postGetMetaLocations(any(),
any());
+ clearInvocations(observer);
+
+ RegionLocations regionLocations =
connectionRegistry.getMetaRegionLocations().get();
+ HRegionLocation actualMetaRegionLocation =
regionLocations.getDefaultRegionLocation();
+
+ assertEquals(metaRegionLocation, actualMetaRegionLocation);
+
+ verify(observer).preGetMetaLocations(any());
+ verify(observer).postGetMetaLocations(any(), any());
+ verifyNoMoreInteractions(observer);
+ }
+ }
+}