This is an automated email from the ASF dual-hosted git repository.

meszibalu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 04b293ccf0c HBASE-29763 Implement co-processor host for client-meta 
service (#7533)
04b293ccf0c is described below

commit 04b293ccf0c8359873c1cf42082b8d0b8fb06e61
Author: Balazs Meszaros <[email protected]>
AuthorDate: Fri Dec 12 08:51:01 2025 +0100

    HBASE-29763 Implement co-processor host for client-meta service (#7533)
    
    Added hbase.coprocessor.clientmeta.classes configuration property for 
loading client-meta co-processors.
    
    Signed-off-by: Wellington Chevreuil <[email protected]>
---
 .../apache/hadoop/hbase/HBaseRpcServicesBase.java  | 110 +++++++++--
 .../hbase/coprocessor/ClientMetaCoprocessor.java   |  32 ++++
 .../ClientMetaCoprocessorEnvironment.java          |  29 +++
 .../coprocessor/ClientMetaCoprocessorHost.java     | 212 +++++++++++++++++++++
 .../hbase/coprocessor/ClientMetaObserver.java      | 153 +++++++++++++++
 .../hadoop/hbase/coprocessor/CoprocessorHost.java  |   2 +
 .../coprocessor/TestClientMetaCoprocessor.java     | 208 ++++++++++++++++++++
 7 files changed, 728 insertions(+), 18 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java
index b2a0e780362..d6d27780883 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java
@@ -17,17 +17,21 @@
  */
 package org.apache.hadoop.hbase;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.coprocessor.ClientMetaCoprocessorHost;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
@@ -110,6 +114,8 @@ public abstract class HBaseRpcServicesBase<S extends 
HBaseServerBase<?>>
 
   protected final PriorityFunction priority;
 
+  private ClientMetaCoprocessorHost clientMetaCoprocessorHost;
+
   private AccessChecker accessChecker;
 
   private ZKPermissionWatcher zkPermissionWatcher;
@@ -158,6 +164,8 @@ public abstract class HBaseRpcServicesBase<S extends 
HBaseServerBase<?>>
     // Set our address, however we need the final port that was given to 
rpcServer
     isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
     rpcServer.setErrorHandler(this);
+
+    clientMetaCoprocessorHost = new ClientMetaCoprocessorHost(conf);
   }
 
   protected abstract boolean defaultReservoirEnabled();
@@ -199,6 +207,12 @@ public abstract class HBaseRpcServicesBase<S extends 
HBaseServerBase<?>>
     }
   }
 
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  public ClientMetaCoprocessorHost getClientMetaCoprocessorHost() {
+    return clientMetaCoprocessorHost;
+  }
+
   public AccessChecker getAccessChecker() {
     return accessChecker;
   }
@@ -261,15 +275,36 @@ public abstract class HBaseRpcServicesBase<S extends 
HBaseServerBase<?>>
   @Override
   public GetClusterIdResponse getClusterId(RpcController controller, 
GetClusterIdRequest request)
     throws ServiceException {
-    return 
GetClusterIdResponse.newBuilder().setClusterId(server.getClusterId()).build();
+    try {
+      clientMetaCoprocessorHost.preGetClusterId();
+
+      String clusterId = server.getClusterId();
+      String clusterIdReply = 
clientMetaCoprocessorHost.postGetClusterId(clusterId);
+
+      return 
GetClusterIdResponse.newBuilder().setClusterId(clusterIdReply).build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
   }
 
   @Override
   public GetActiveMasterResponse getActiveMaster(RpcController controller,
     GetActiveMasterRequest request) throws ServiceException {
     GetActiveMasterResponse.Builder builder = 
GetActiveMasterResponse.newBuilder();
-    server.getActiveMaster()
-      .ifPresent(name -> 
builder.setServerName(ProtobufUtil.toServerName(name)));
+
+    try {
+      clientMetaCoprocessorHost.preGetActiveMaster();
+
+      ServerName serverName = server.getActiveMaster().orElse(null);
+      ServerName serverNameReply = 
clientMetaCoprocessorHost.postGetActiveMaster(serverName);
+
+      if (serverNameReply != null) {
+        builder.setServerName(ProtobufUtil.toServerName(serverNameReply));
+      }
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+
     return builder.build();
   }
 
@@ -277,12 +312,25 @@ public abstract class HBaseRpcServicesBase<S extends 
HBaseServerBase<?>>
   public GetMastersResponse getMasters(RpcController controller, 
GetMastersRequest request)
     throws ServiceException {
     GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
-    server.getActiveMaster()
-      .ifPresent(activeMaster -> 
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
-        
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true)));
-    server.getBackupMasters()
-      .forEach(backupMaster -> 
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
-        
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false)));
+
+    try {
+      clientMetaCoprocessorHost.preGetMasters();
+
+      Map<ServerName, Boolean> serverNames = new LinkedHashMap<>();
+
+      server.getActiveMaster().ifPresent(serverName -> 
serverNames.put(serverName, Boolean.TRUE));
+      server.getBackupMasters().forEach(serverName -> 
serverNames.put(serverName, Boolean.FALSE));
+
+      Map<ServerName, Boolean> serverNamesReply =
+        clientMetaCoprocessorHost.postGetMasters(serverNames);
+
+      serverNamesReply
+        .forEach((serverName, active) -> 
builder.addMasterServers(GetMastersResponseEntry
+          
.newBuilder().setServerName(ProtobufUtil.toServerName(serverName)).setIsActive(active)));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+
     return builder.build();
   }
 
@@ -290,22 +338,46 @@ public abstract class HBaseRpcServicesBase<S extends 
HBaseServerBase<?>>
   public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController 
controller,
     GetMetaRegionLocationsRequest request) throws ServiceException {
     GetMetaRegionLocationsResponse.Builder builder = 
GetMetaRegionLocationsResponse.newBuilder();
-    server.getMetaLocations()
-      .forEach(location -> 
builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)));
+
+    try {
+      clientMetaCoprocessorHost.preGetMetaLocations();
+
+      List<HRegionLocation> metaLocations = server.getMetaLocations();
+      List<HRegionLocation> metaLocationsReply =
+        clientMetaCoprocessorHost.postGetMetaLocations(metaLocations);
+
+      metaLocationsReply
+        .forEach(location -> 
builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+
     return builder.build();
   }
 
   @Override
   public final GetBootstrapNodesResponse getBootstrapNodes(RpcController 
controller,
     GetBootstrapNodesRequest request) throws ServiceException {
-    int maxNodeCount = 
server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
-      DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
-    ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
-    sample.add(server.getBootstrapNodes());
-
     GetBootstrapNodesResponse.Builder builder = 
GetBootstrapNodesResponse.newBuilder();
-    sample.getSamplingResult().stream().map(ProtobufUtil::toServerName)
-      .forEach(builder::addServerName);
+
+    try {
+      clientMetaCoprocessorHost.preGetBootstrapNodes();
+
+      int maxNodeCount = 
server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
+        DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
+      ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
+      sample.add(server.getBootstrapNodes());
+
+      List<ServerName> bootstrapNodes = sample.getSamplingResult();
+      List<ServerName> bootstrapNodesReply =
+        clientMetaCoprocessorHost.postGetBootstrapNodes(bootstrapNodes);
+
+      bootstrapNodesReply
+        .forEach(serverName -> 
builder.addServerName(ProtobufUtil.toServerName(serverName)));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+
     return builder.build();
   }
 
@@ -316,6 +388,8 @@ public abstract class HBaseRpcServicesBase<S extends 
HBaseServerBase<?>>
     try {
       requirePermission("updateConfiguration", Permission.Action.ADMIN);
       this.server.updateConfiguration();
+
+      clientMetaCoprocessorHost = new 
ClientMetaCoprocessorHost(getConfiguration());
     } catch (Exception e) {
       throw new ServiceException(e);
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessor.java
new file mode 100644
index 00000000000..bbf963e3239
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
[email protected](HBaseInterfaceAudience.COPROC)
[email protected]
+public interface ClientMetaCoprocessor extends Coprocessor {
+  default Optional<ClientMetaObserver> getClientMetaObserver() {
+    return Optional.empty();
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorEnvironment.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorEnvironment.java
new file mode 100644
index 00000000000..ea9e8d190ab
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorEnvironment.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
[email protected](HBaseInterfaceAudience.COPROC)
[email protected]
+public interface ClientMetaCoprocessorEnvironment
+  extends CoprocessorEnvironment<ClientMetaCoprocessor> {
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorHost.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorHost.java
new file mode 100644
index 00000000000..87e5f7586fd
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaCoprocessorHost.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class ClientMetaCoprocessorHost
+  extends CoprocessorHost<ClientMetaCoprocessor, 
ClientMetaCoprocessorEnvironment> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClientMetaCoprocessorHost.class);
+
+  private static class ClientMetaEnvironment extends 
BaseEnvironment<ClientMetaCoprocessor>
+    implements ClientMetaCoprocessorEnvironment {
+
+    public ClientMetaEnvironment(ClientMetaCoprocessor impl, int priority, int 
seq,
+      Configuration conf) {
+      super(impl, priority, seq, conf);
+    }
+  }
+
+  public ClientMetaCoprocessorHost(Configuration conf) {
+    // RPCServer cannot be aborted, so we don't pass Abortable down here.
+    super(null);
+    this.conf = conf;
+    boolean coprocessorsEnabled =
+      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, 
DEFAULT_COPROCESSORS_ENABLED);
+    LOG.trace("System coprocessor loading is {}", (coprocessorsEnabled ? 
"enabled" : "disabled"));
+    loadSystemCoprocessors(conf, CLIENT_META_COPROCESSOR_CONF_KEY);
+  }
+
+  @Override
+  public ClientMetaCoprocessorEnvironment 
createEnvironment(ClientMetaCoprocessor instance,
+    int priority, int sequence, Configuration conf) {
+    return new ClientMetaEnvironment(instance, priority, sequence, conf);
+  }
+
+  @Override
+  public ClientMetaCoprocessor checkAndGetInstance(Class<?> implClass)
+    throws InstantiationException, IllegalAccessException {
+    try {
+      if (ClientMetaCoprocessor.class.isAssignableFrom(implClass)) {
+        return 
implClass.asSubclass(ClientMetaCoprocessor.class).getDeclaredConstructor()
+          .newInstance();
+      } else {
+        LOG.error("{} is not of type ClientMetaCoprocessor. Check the 
configuration of {}",
+          implClass.getName(), CLIENT_META_COPROCESSOR_CONF_KEY);
+        return null;
+      }
+    } catch (NoSuchMethodException | InvocationTargetException e) {
+      throw (InstantiationException) new 
InstantiationException(implClass.getName()).initCause(e);
+    }
+  }
+
+  private final ObserverGetter<ClientMetaCoprocessor, ClientMetaObserver> 
clientMetaObserverGetter =
+    ClientMetaCoprocessor::getClientMetaObserver;
+
+  public void preGetClusterId() throws IOException {
+    execOperation(coprocEnvironments.isEmpty()
+      ? null
+      : new 
ObserverOperationWithoutResult<ClientMetaObserver>(clientMetaObserverGetter) {
+        @Override
+        protected void call(ClientMetaObserver observer) throws IOException {
+          observer.preGetClusterId(this);
+        }
+      });
+  }
+
+  public String postGetClusterId(String clusterId) throws IOException {
+    if (coprocEnvironments.isEmpty()) {
+      return clusterId;
+    }
+
+    return execOperationWithResult(new 
ObserverOperationWithResult<ClientMetaObserver, String>(
+      clientMetaObserverGetter, clusterId) {
+      @Override
+      protected String call(ClientMetaObserver observer) throws IOException {
+        return observer.postGetClusterId(this, getResult());
+      }
+    });
+  }
+
+  public void preGetActiveMaster() throws IOException {
+    execOperation(coprocEnvironments.isEmpty()
+      ? null
+      : new 
ObserverOperationWithoutResult<ClientMetaObserver>(clientMetaObserverGetter) {
+        @Override
+        protected void call(ClientMetaObserver observer) throws IOException {
+          observer.preGetActiveMaster(this);
+        }
+      });
+  }
+
+  public ServerName postGetActiveMaster(ServerName serverName) throws 
IOException {
+    if (coprocEnvironments.isEmpty()) {
+      return serverName;
+    }
+
+    return execOperationWithResult(new 
ObserverOperationWithResult<ClientMetaObserver, ServerName>(
+      clientMetaObserverGetter, serverName) {
+      @Override
+      protected ServerName call(ClientMetaObserver observer) throws 
IOException {
+        return observer.postGetActiveMaster(this, getResult());
+      }
+    });
+  }
+
+  public void preGetMasters() throws IOException {
+    execOperation(coprocEnvironments.isEmpty()
+      ? null
+      : new 
ObserverOperationWithoutResult<ClientMetaObserver>(clientMetaObserverGetter) {
+        @Override
+        protected void call(ClientMetaObserver observer) throws IOException {
+          observer.preGetMasters(this);
+        }
+      });
+  }
+
+  public Map<ServerName, Boolean> postGetMasters(Map<ServerName, Boolean> 
serverNames)
+    throws IOException {
+    if (coprocEnvironments.isEmpty()) {
+      return serverNames;
+    }
+
+    return execOperationWithResult(
+      new ObserverOperationWithResult<ClientMetaObserver, Map<ServerName, 
Boolean>>(
+        clientMetaObserverGetter, serverNames) {
+        @Override
+        protected Map<ServerName, Boolean> call(ClientMetaObserver observer) 
throws IOException {
+          return observer.postGetMasters(this, getResult());
+        }
+      });
+  }
+
+  public void preGetBootstrapNodes() throws IOException {
+    execOperation(coprocEnvironments.isEmpty()
+      ? null
+      : new 
ObserverOperationWithoutResult<ClientMetaObserver>(clientMetaObserverGetter) {
+        @Override
+        protected void call(ClientMetaObserver observer) throws IOException {
+          observer.preGetBootstrapNodes(this);
+        }
+      });
+  }
+
+  public List<ServerName> postGetBootstrapNodes(List<ServerName> 
bootstrapNodes)
+    throws IOException {
+    if (coprocEnvironments.isEmpty()) {
+      return bootstrapNodes;
+    }
+
+    return execOperationWithResult(
+      new ObserverOperationWithResult<ClientMetaObserver, List<ServerName>>(
+        clientMetaObserverGetter, bootstrapNodes) {
+        @Override
+        protected List<ServerName> call(ClientMetaObserver observer) throws 
IOException {
+          return observer.postGetBootstrapNodes(this, getResult());
+        }
+      });
+  }
+
+  public void preGetMetaLocations() throws IOException {
+    execOperation(coprocEnvironments.isEmpty()
+      ? null
+      : new 
ObserverOperationWithoutResult<ClientMetaObserver>(clientMetaObserverGetter) {
+        @Override
+        protected void call(ClientMetaObserver observer) throws IOException {
+          observer.preGetMetaLocations(this);
+        }
+      });
+  }
+
+  public List<HRegionLocation> postGetMetaLocations(List<HRegionLocation> 
metaLocations)
+    throws IOException {
+    if (coprocEnvironments.isEmpty()) {
+      return metaLocations;
+    }
+
+    return execOperationWithResult(
+      new ObserverOperationWithResult<ClientMetaObserver, 
List<HRegionLocation>>(
+        clientMetaObserverGetter, metaLocations) {
+        @Override
+        protected List<HRegionLocation> call(ClientMetaObserver observer) 
throws IOException {
+          return observer.postGetMetaLocations(this, getResult());
+        }
+      });
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaObserver.java
new file mode 100644
index 00000000000..59930d3b4b7
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ClientMetaObserver.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Defines coprocessor hooks for interacting operations for ClientMetaService 
which is responsible
+ * for ZooKeeperless HBase service discovery. <br>
+ * <br>
+ * Since most implementations will be interested in only a subset of hooks, 
this class uses
+ * 'default' functions to avoid having to add unnecessary overrides. When the 
functions are
+ * non-empty, it's simply to satisfy the compiler by returning value of 
expected (non-void) type. It
+ * is done in a way that these default definitions act as no-op. So our 
suggestion to implementation
+ * would be to not call these 'default' methods from overrides. <br>
+ * <br>
+ * <h3>Exception Handling</h3> For all functions, exception handling is done 
as follows:
+ * <ul>
+ * <li>Exceptions of type {@link IOException} are reported back to client.</li>
+ * <li>For any other kind of exception:
+ * <ul>
+ * <li>If the configuration {@link CoprocessorHost#ABORT_ON_ERROR_KEY} is set 
to true, then the
+ * server aborts.</li>
+ * <li>Otherwise, coprocessor is removed from the server and
+ * {@link org.apache.hadoop.hbase.DoNotRetryIOException} is returned to the 
client.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ */
[email protected](HBaseInterfaceAudience.COPROC)
[email protected]
+public interface ClientMetaObserver {
+  /**
+   * Called before getting the cluster ID.
+   * @param ctx the environment to interact with the framework
+   */
+  default void 
preGetClusterId(ObserverContext<ClientMetaCoprocessorEnvironment> ctx)
+    throws IOException {
+  }
+
+  /**
+   * Called after we got the cluster ID.
+   * @param ctx       the environment to interact with the framework
+   * @param clusterId the actual cluster ID
+   * @return the cluster ID which is returned to the client.
+   */
+  default String 
postGetClusterId(ObserverContext<ClientMetaCoprocessorEnvironment> ctx,
+    String clusterId) throws IOException {
+    return clusterId;
+  }
+
+  /**
+   * Called before getting the active master.
+   * @param ctx the environment to interact with the framework
+   */
+  default void 
preGetActiveMaster(ObserverContext<ClientMetaCoprocessorEnvironment> ctx)
+    throws IOException {
+  }
+
+  /**
+   * Called after we got the active master.
+   * @param ctx        the environment to interact with the framework
+   * @param serverName the actual active master address. It can be {@code 
null} if there is no
+   *                   active master.
+   * @return the active master address which is returned to the client. It can 
be {@code null}.
+   */
+  default ServerName 
postGetActiveMaster(ObserverContext<ClientMetaCoprocessorEnvironment> ctx,
+    ServerName serverName) throws IOException {
+    return serverName;
+  }
+
+  /**
+   * Called before getting the master servers.
+   * @param ctx the environment to interact with the framework
+   */
+  default void preGetMasters(ObserverContext<ClientMetaCoprocessorEnvironment> 
ctx)
+    throws IOException {
+  }
+
+  /**
+   * Called after we got the master servers.
+   * @param ctx         the environment to interact with the framework
+   * @param serverNames the actual master servers addresses and active statuses
+   * @return the master servers addresses and active statuses which are 
returned to the client.
+   */
+  default Map<ServerName, Boolean> postGetMasters(
+    ObserverContext<ClientMetaCoprocessorEnvironment> ctx, Map<ServerName, 
Boolean> serverNames)
+    throws IOException {
+    return serverNames;
+  }
+
+  /**
+   * Called before getting bootstrap nodes.
+   * @param ctx the environment to interact with the framework
+   */
+  default void 
preGetBootstrapNodes(ObserverContext<ClientMetaCoprocessorEnvironment> ctx)
+    throws IOException {
+  }
+
+  /**
+   * Called after we got bootstrap nodes.
+   * @param ctx            the environment to interact with the framework
+   * @param bootstrapNodes the actual bootstrap nodes
+   * @return the bootstrap nodes which are returned to the client.
+   */
+  default List<ServerName> postGetBootstrapNodes(
+    ObserverContext<ClientMetaCoprocessorEnvironment> ctx, List<ServerName> 
bootstrapNodes)
+    throws IOException {
+    return bootstrapNodes;
+  }
+
+  /**
+   * Called before getting the meta region locations.
+   * @param ctx the environment to interact with the framework
+   */
+  default void 
preGetMetaLocations(ObserverContext<ClientMetaCoprocessorEnvironment> ctx)
+    throws IOException {
+  }
+
+  /**
+   * Called after we got the meta region locations.
+   * @param ctx           the environment to interact with the framework
+   * @param metaLocations the actual meta region locations
+   * @return the meta region locations which are returned to the client.
+   */
+  default List<HRegionLocation> postGetMetaLocations(
+    ObserverContext<ClientMetaCoprocessorEnvironment> ctx, 
List<HRegionLocation> metaLocations)
+    throws IOException {
+    return metaLocations;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index 137fe3b061d..c201b29881c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -61,6 +61,8 @@ public abstract class CoprocessorHost<C extends Coprocessor, 
E extends Coprocess
   public static final String USER_REGION_COPROCESSOR_CONF_KEY =
     "hbase.coprocessor.user.region.classes";
   public static final String MASTER_COPROCESSOR_CONF_KEY = 
"hbase.coprocessor.master.classes";
+  public static final String CLIENT_META_COPROCESSOR_CONF_KEY =
+    "hbase.coprocessor.clientmeta.classes";
   public static final String WAL_COPROCESSOR_CONF_KEY = 
"hbase.coprocessor.wal.classes";
   public static final String RPC_COPROCESSOR_CONF_KEY = 
"hbase.coprocessor.rpc.classes";
   public static final String ABORT_ON_ERROR_KEY = 
"hbase.coprocessor.abortonerror";
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClientMetaCoprocessor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClientMetaCoprocessor.java
new file mode 100644
index 00000000000..3b0b31c710f
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClientMetaCoprocessor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.ConnectionRegistry;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests invocation of the {@link MasterObserver} interface hooks at all 
appropriate times during
+ * normal HMaster operations.
+ */
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestClientMetaCoprocessor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestClientMetaCoprocessor.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestClientMetaCoprocessor.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  private static final ServerName SERVER_NAME = 
ServerName.valueOf("localhost", 1234, 12345);
+
+  public static class TestCoprocessor implements ClientMetaCoprocessor {
+    protected final ClientMetaObserver observer;
+
+    public TestCoprocessor() {
+      observer = mock(ClientMetaObserver.class);
+      resetMock();
+    }
+
+    protected void resetMock() {
+      reset(observer);
+
+      try {
+        doAnswer(answer -> answer.getArgument(1, String.class)).when(observer)
+          .postGetClusterId(any(), any());
+        doAnswer(answer -> {
+          return answer.getArgument(1, ServerName.class);
+        }).when(observer).postGetActiveMaster(any(), any());
+        doAnswer(answer -> answer.getArgument(1, 
Map.class)).when(observer).postGetMasters(any(),
+          any());
+        doAnswer(answer -> answer.getArgument(1, List.class)).when(observer)
+          .postGetBootstrapNodes(any(), any());
+        doAnswer(answer -> answer.getArgument(1, List.class)).when(observer)
+          .postGetMetaLocations(any(), any());
+      } catch (IOException e) {
+        throw new IllegalStateException("Could not setup observer mock.", e);
+      }
+    }
+
+    @Override
+    public Optional<ClientMetaObserver> getClientMetaObserver() {
+      return Optional.of(observer);
+    }
+  }
+
+  private static TestCoprocessor getCoprocessor() {
+    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+    MasterRpcServices masterRpcServices = master.getMasterRpcServices();
+
+    return 
masterRpcServices.getClientMetaCoprocessorHost().findCoprocessor(TestCoprocessor.class);
+  }
+
+  private static ClientMetaObserver getObserverMock() {
+    return getCoprocessor().getClientMetaObserver().get();
+  }
+
+  private static void resetObserverMock() {
+    getCoprocessor().resetMock();
+  }
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.set(CoprocessorHost.CLIENT_META_COPROCESSOR_CONF_KEY, 
TestCoprocessor.class.getName());
+
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setupBefore() {
+    resetObserverMock();
+  }
+
+  @Test
+  public void testGetClusterId() throws Exception {
+    ClientMetaObserver observer = getObserverMock();
+
+    try (AsyncConnectionImpl asyncConnection = (AsyncConnectionImpl) 
ConnectionFactory
+      .createAsyncConnection(UTIL.getConfiguration()).get()) {
+      ConnectionRegistry connectionRegistry = 
asyncConnection.getConnectionRegistry();
+
+      doReturn("cluster-id").when(observer).postGetClusterId(any(), any());
+      clearInvocations(observer);
+
+      String clusterId = connectionRegistry.getClusterId().get();
+      assertEquals("cluster-id", clusterId);
+
+      verify(observer).preGetClusterId(any());
+      verify(observer).postGetClusterId(any(), any());
+      verifyNoMoreInteractions(observer);
+    }
+  }
+
+  @Test
+  public void testGetActiveMaster() throws Exception {
+    ClientMetaObserver observer = getObserverMock();
+
+    try (AsyncConnectionImpl asyncConnection = (AsyncConnectionImpl) 
ConnectionFactory
+      .createAsyncConnection(UTIL.getConfiguration()).get()) {
+      ConnectionRegistry connectionRegistry = 
asyncConnection.getConnectionRegistry();
+
+      doReturn(SERVER_NAME).when(observer).postGetActiveMaster(any(), any());
+      clearInvocations(observer);
+
+      ServerName activeMaster = connectionRegistry.getActiveMaster().get();
+      assertEquals(SERVER_NAME, activeMaster);
+
+      verify(observer).preGetActiveMaster(any());
+      verify(observer).postGetActiveMaster(any(), any());
+      verifyNoMoreInteractions(observer);
+    }
+  }
+
+  @Test
+  public void testGetMetaRegionLocations() throws Exception {
+    ClientMetaObserver observer = getObserverMock();
+
+    try (AsyncConnectionImpl asyncConnection = (AsyncConnectionImpl) 
ConnectionFactory
+      .createAsyncConnection(UTIL.getConfiguration()).get()) {
+      ConnectionRegistry connectionRegistry = 
asyncConnection.getConnectionRegistry();
+
+      HRegionLocation metaRegionLocation =
+        new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, 
SERVER_NAME);
+      
doReturn(List.of(metaRegionLocation)).when(observer).postGetMetaLocations(any(),
 any());
+      clearInvocations(observer);
+
+      RegionLocations regionLocations = 
connectionRegistry.getMetaRegionLocations().get();
+      HRegionLocation actualMetaRegionLocation = 
regionLocations.getDefaultRegionLocation();
+
+      assertEquals(metaRegionLocation, actualMetaRegionLocation);
+
+      verify(observer).preGetMetaLocations(any());
+      verify(observer).postGetMetaLocations(any(), any());
+      verifyNoMoreInteractions(observer);
+    }
+  }
+}


Reply via email to