HBASE-12126 Region server coprocessor endpoint (Virag Kothari)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/884b049c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/884b049c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/884b049c Branch: refs/heads/0.98 Commit: 884b049ce127122a2d64de3f82340aa05f9c7158 Parents: a0d0fe9 Author: Andrew Purtell <[email protected]> Authored: Fri Oct 10 12:07:07 2014 -0700 Committer: Andrew Purtell <[email protected]> Committed: Fri Oct 10 14:02:33 2014 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hbase/client/HBaseAdmin.java | 29 + .../ipc/RegionServerCoprocessorRpcChannel.java | 76 ++ .../hadoop/hbase/protobuf/ProtobufUtil.java | 25 + .../hbase/client/TestClientNoCluster.java | 6 + .../hbase/protobuf/generated/ClientProtos.java | 87 +- hbase-protocol/src/main/protobuf/Client.proto | 3 + .../SingletonCoprocessorService.java | 34 + .../hbase/regionserver/HRegionServer.java | 110 +- .../RegionServerCoprocessorHost.java | 8 + .../regionserver/RegionServerServices.java | 10 + .../hadoop/hbase/MockRegionServerServices.java | 8 + .../TestRegionServerCoprocessorEndpoint.java | 108 ++ .../DummyRegionServerEndpointProtos.java | 1151 ++++++++++++++++++ .../hadoop/hbase/master/MockRegionServer.java | 16 + .../protobuf/DummyRegionServerEndpoint.proto | 33 + 15 files changed, 1680 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index f91a6b4..db93bab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.client; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -68,6 +69,7 @@ import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; @@ -3374,4 +3376,31 @@ public class HBaseAdmin implements Abortable, Closeable { public CoprocessorRpcChannel coprocessorService() { return new MasterCoprocessorRpcChannel(connection); } + + /** + * Creates and returns a {@link com.google.protobuf.RpcChannel} instance + * connected to the passed region server. + * + * <p> + * The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published + * coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations: + * </p> + * + * <div style="background-color: #cccccc; padding: 2px"> + * <blockquote><pre> + * CoprocessorRpcChannel channel = myAdmin.coprocessorService(serverName); + * MyService.BlockingInterface service = MyService.newBlockingStub(channel); + * MyCallRequest request = MyCallRequest.newBuilder() + * ... + * .build(); + * MyCallResponse response = service.myCall(null, request); + * </pre></blockquote></div> + * + * @param the server name to which the endpoint call is made + * @return A RegionServerCoprocessorRpcChannel instance + */ + public CoprocessorRpcChannel coprocessorService(ServerName sn) { + return new RegionServerCoprocessorRpcChannel(connection, sn); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java new file mode 100644 index 0000000..6a174e5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import com.google.protobuf.HBaseZeroCopyByteString; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; + + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; + +/** + * Provides clients with an RPC connection to call coprocessor endpoint + * {@link com.google.protobuf.Service}s against a given region server. An instance of this class may + * be obtained by calling {@link org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService(ServerName)}, + * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to + * call the endpoint methods. + * @see org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService(ServerName) + */ [email protected] +public class RegionServerCoprocessorRpcChannel extends CoprocessorRpcChannel { + private static Log LOG = LogFactory.getLog(RegionServerCoprocessorRpcChannel.class); + private final HConnection connection; + private final ServerName serverName; + + public RegionServerCoprocessorRpcChannel(HConnection conn, ServerName serverName) { + this.connection = conn; + this.serverName = serverName; + } + + @Override + protected Message callExecService(Descriptors.MethodDescriptor method, Message request, + Message responsePrototype) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Call: " + method.getName() + ", " + request.toString()); + } + + final ClientProtos.CoprocessorServiceCall call = + ClientProtos.CoprocessorServiceCall.newBuilder() + .setRow(HBaseZeroCopyByteString.wrap(HConstants.EMPTY_BYTE_ARRAY)) + .setServiceName(method.getService().getFullName()).setMethodName(method.getName()) + .setRequest(request.toByteString()).build(); + CoprocessorServiceResponse result = + ProtobufUtil.execRegionServerService(connection.getClient(serverName), call); + Message response = null; + if (result.getValue().hasValue()) { + response = + responsePrototype.newBuilderForType().mergeFrom(result.getValue().getValue()).build(); + } else { + response = responsePrototype.getDefaultInstanceForType(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Result is value=" + response); + } + return response; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 747d710..5385b29 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -1623,6 +1623,31 @@ public final class ProtobufUtil { } } + /** + * Make a region server endpoint call + * @param client + * @param call + * @return + * @throws IOException + */ + public static CoprocessorServiceResponse execRegionServerService( + final ClientService.BlockingInterface client, final CoprocessorServiceCall call) + throws IOException { + CoprocessorServiceRequest request = + CoprocessorServiceRequest + .newBuilder() + .setCall(call) + .setRegion( + RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)) + .build(); + try { + CoprocessorServiceResponse response = client.execRegionServerService(null, request); + return response; + } catch (ServiceException se) { + throw getRemoteException(se); + } + } + @SuppressWarnings("unchecked") public static <T extends Service> T newServiceStub(Class<T> service, RpcChannel channel) throws Exception { http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index fa0819d..8bffd7f 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -495,6 +495,12 @@ public class TestClientNoCluster extends Configured implements Tool { this.multiInvocationsCount.decrementAndGet(); } } + + @Override + public CoprocessorServiceResponse execRegionServerService(RpcController controller, + CoprocessorServiceRequest request) throws ServiceException { + throw new NotImplementedException(); + } } static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index ab6a68c..5e23bb5 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -29835,6 +29835,14 @@ public final class ClientProtos { com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done); /** + * <code>rpc ExecRegionServerService(.CoprocessorServiceRequest) returns (.CoprocessorServiceResponse);</code> + */ + public abstract void execRegionServerService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request, + com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done); + + /** * <code>rpc Multi(.MultiRequest) returns (.MultiResponse);</code> */ public abstract void multi( @@ -29888,6 +29896,14 @@ public final class ClientProtos { } @java.lang.Override + public void execRegionServerService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request, + com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done) { + impl.execRegionServerService(controller, request, done); + } + + @java.lang.Override public void multi( com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request, @@ -29928,6 +29944,8 @@ public final class ClientProtos { case 4: return impl.execService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request); case 5: + return impl.execRegionServerService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request); + case 6: return impl.multi(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest)request); default: throw new java.lang.AssertionError("Can't get here."); @@ -29954,6 +29972,8 @@ public final class ClientProtos { case 4: return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest.getDefaultInstance(); case 5: + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest.getDefaultInstance(); + case 6: return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); @@ -29980,6 +30000,8 @@ public final class ClientProtos { case 4: return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance(); case 5: + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance(); + case 6: return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); @@ -30030,6 +30052,14 @@ public final class ClientProtos { com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done); /** + * <code>rpc ExecRegionServerService(.CoprocessorServiceRequest) returns (.CoprocessorServiceResponse);</code> + */ + public abstract void execRegionServerService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request, + com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done); + + /** * <code>rpc Multi(.MultiRequest) returns (.MultiResponse);</code> */ public abstract void multi( @@ -30085,6 +30115,11 @@ public final class ClientProtos { done)); return; case 5: + this.execRegionServerService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request, + com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse>specializeCallback( + done)); + return; + case 6: this.multi(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest)request, com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse>specializeCallback( done)); @@ -30114,6 +30149,8 @@ public final class ClientProtos { case 4: return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest.getDefaultInstance(); case 5: + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest.getDefaultInstance(); + case 6: return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); @@ -30140,6 +30177,8 @@ public final class ClientProtos { case 4: return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance(); case 5: + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance(); + case 6: return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); @@ -30237,12 +30276,27 @@ public final class ClientProtos { org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance())); } + public void execRegionServerService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request, + com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done) { + channel.callMethod( + getDescriptor().getMethods().get(5), + controller, + request, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance(), + com.google.protobuf.RpcUtil.generalizeCallback( + done, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.class, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance())); + } + public void multi( com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request, com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse> done) { channel.callMethod( - getDescriptor().getMethods().get(5), + getDescriptor().getMethods().get(6), controller, request, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance(), @@ -30284,6 +30338,11 @@ public final class ClientProtos { org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request) throws com.google.protobuf.ServiceException; + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse execRegionServerService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request) + throws com.google.protobuf.ServiceException; + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse multi( com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request) @@ -30357,12 +30416,24 @@ public final class ClientProtos { } + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse execRegionServerService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request) + throws com.google.protobuf.ServiceException { + return (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse) channel.callBlockingMethod( + getDescriptor().getMethods().get(5), + controller, + request, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance()); + } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse multi( com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request) throws com.google.protobuf.ServiceException { return (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse) channel.callBlockingMethod( - getDescriptor().getMethods().get(5), + getDescriptor().getMethods().get(6), controller, request, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance()); @@ -30620,17 +30691,19 @@ public final class ClientProtos { "egionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondi" + "tion\030\003 \001(\0132\n.Condition\"S\n\rMultiResponse\022" + "/\n\022regionActionResult\030\001 \003(\0132\023.RegionActi" + - "onResult\022\021\n\tprocessed\030\002 \001(\0102\261\002\n\rClientSe" + + "onResult\022\021\n\tprocessed\030\002 \001(\0102\205\003\n\rClientSe" + "rvice\022 \n\003Get\022\013.GetRequest\032\014.GetResponse\022" + ")\n\006Mutate\022\016.MutateRequest\032\017.MutateRespon" + "se\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResponse\022" + ">\n\rBulkLoadHFile\022\025.BulkLoadHFileRequest\032" + "\026.BulkLoadHFileResponse\022F\n\013ExecService\022\032" + ".CoprocessorServiceRequest\032\033.Coprocessor", - "ServiceResponse\022&\n\005Multi\022\r.MultiRequest\032" + - "\016.MultiResponseBB\n*org.apache.hadoop.hba" + - "se.protobuf.generatedB\014ClientProtosH\001\210\001\001" + - "\240\001\001" + "ServiceResponse\022R\n\027ExecRegionServerServi" + + "ce\022\032.CoprocessorServiceRequest\032\033.Coproce" + + "ssorServiceResponse\022&\n\005Multi\022\r.MultiRequ" + + "est\032\016.MultiResponseBB\n*org.apache.hadoop" + + ".hbase.protobuf.generatedB\014ClientProtosH" + + "\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-protocol/src/main/protobuf/Client.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 537c1d4..3b5627b 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -397,6 +397,9 @@ service ClientService { rpc ExecService(CoprocessorServiceRequest) returns(CoprocessorServiceResponse); + + rpc ExecRegionServerService(CoprocessorServiceRequest) + returns(CoprocessorServiceResponse); rpc Multi(MultiRequest) returns(MultiResponse); http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SingletonCoprocessorService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SingletonCoprocessorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SingletonCoprocessorService.java new file mode 100644 index 0000000..88db6b6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SingletonCoprocessorService.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import com.google.protobuf.Service; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; + +/** + * Coprocessor endpoints registered once per server and providing protobuf services should implement + * this interface and return the {@link Service} instance via {@link #getService()}. + */ [email protected](HBaseInterfaceAudience.COPROC) [email protected] +public interface SingletonCoprocessorService { + Service getService(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7639b48..3a003da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -53,6 +53,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; @@ -105,6 +106,7 @@ import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; +import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.filter.ByteArrayComparable; @@ -165,6 +167,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; @@ -248,8 +251,11 @@ import org.cliffc.high_scale_lib.Counter; import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import com.google.protobuf.ServiceException; import com.google.protobuf.TextFormat; @@ -462,6 +468,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa /** The nonce manager chore. */ private Chore nonceManagerChore; + private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap(); + /** * The server name the Master sees us as. Its made from the hostname the * master passes us, port, and server startcode. Gets set after registration @@ -646,6 +654,25 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa this.rsInfo.setInfoPort(putUpWebUI()); } + @Override + public boolean registerService(Service instance) { + /* + * No stacking of instances is allowed for a single service name + */ + Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType(); + if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) { + LOG.error("Coprocessor service " + serviceDesc.getFullName() + + " already registered, rejecting request from " + instance); + return false; + } + + coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance); + if (LOG.isDebugEnabled()) { + LOG.debug("Registered regionserver coprocessor service: service=" + serviceDesc.getFullName()); + } + return true; + } + /** * @return list of blocking services and their security info classes that this server supports */ @@ -659,7 +686,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa AdminProtos.AdminService.BlockingInterface.class)); return bssi; } - + /** * Run test on configured codecs to make sure supporting libs are in place. * @param c @@ -1948,22 +1975,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } } - /** - * @return Return the object that implements the replication - * source service. - */ - ReplicationSourceService getReplicationSourceService() { - return replicationSourceHandler; - } - - /** - * @return Return the object that implements the replication - * sink service. - */ - ReplicationSinkService getReplicationSinkService() { - return replicationSinkHandler; - } - @Override public boolean reportRegionTransition(TransitionCode code, HRegionInfo... hris) { return reportRegionTransition(code, HConstants.NO_SEQNUM, hris); @@ -3399,6 +3410,70 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } return result; } + + @Override + public CoprocessorServiceResponse execRegionServerService(final RpcController controller, + final CoprocessorServiceRequest serviceRequest) throws ServiceException { + try { + ServerRpcController execController = new ServerRpcController(); + CoprocessorServiceCall call = serviceRequest.getCall(); + String serviceName = call.getServiceName(); + String methodName = call.getMethodName(); + if (!coprocessorServiceHandlers.containsKey(serviceName)) { + throw new UnknownProtocolException(null, + "No registered coprocessor service found for name " + serviceName); + } + Service service = coprocessorServiceHandlers.get(serviceName); + Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType(); + Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName); + if (methodDesc == null) { + throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName + + " called on service " + serviceName); + } + Message request = + service.getRequestPrototype(methodDesc).newBuilderForType().mergeFrom(call.getRequest()) + .build(); + final Message.Builder responseBuilder = + service.getResponsePrototype(methodDesc).newBuilderForType(); + service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() { + @Override + public void run(Message message) { + if (message != null) { + responseBuilder.mergeFrom(message); + } + } + }); + Message execResult = responseBuilder.build(); + if (execController.getFailedOn() != null) { + throw execController.getFailedOn(); + } + ClientProtos.CoprocessorServiceResponse.Builder builder = + ClientProtos.CoprocessorServiceResponse.newBuilder(); + builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, + HConstants.EMPTY_BYTE_ARRAY)); + builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName()) + .setValue(execResult.toByteString())); + return builder.build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + /** + * @return Return the object that implements the replication + * source service. + */ + public ReplicationSourceService getReplicationSourceService() { + return replicationSourceHandler; + } + + /** + * @return Return the object that implements the replication + * sink service. + */ + public ReplicationSinkService getReplicationSinkService() { + return replicationSinkHandler; + } /** * Execute multiple actions on a table: get, mutate, and/or execCoprocessor @@ -4803,4 +4878,5 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa public CacheConfig getCacheConfig() { return this.cacheConfig; } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 54552c6..f91642b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -31,9 +31,11 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.MetaMutationAnnotation; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; +import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving @@ -209,6 +211,12 @@ public class RegionServerCoprocessorHost extends final Configuration conf, final RegionServerServices services) { super(impl, priority, seq, conf); this.regionServerServices = services; + for (Class c : implClass.getInterfaces()) { + if (SingletonCoprocessorService.class.isAssignableFrom(c)) { + this.regionServerServices.registerService(((SingletonCoprocessorService) impl).getService()); + break; + } + } } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index a8a4e68..7fbc5d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.zookeeper.KeeperException; +import com.google.protobuf.Service; + import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -132,4 +134,12 @@ public interface RegionServerServices * @return The RegionServer's NonceManager */ public ServerNonceManager getNonceManager(); + + /** + * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to + * be available for handling + * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint + * @return {@code true} if the registration was successful, {@code false} + */ + boolean registerService(Service service); } http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index e399ad5..47ed875 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import com.google.protobuf.Message; +import com.google.protobuf.Service; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.catalog.CatalogTracker; @@ -240,4 +242,10 @@ class MockRegionServerServices implements RegionServerServices { HRegionInfo... hris) { return false; } + + @Override + public boolean registerService(Service service) { + // TODO Auto-generated method stub + return false; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/884b049c/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java new file mode 100644 index 0000000..d21ea6f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + +@Category(MediumTests.class) +public class TestRegionServerCoprocessorEndpoint { + private static HBaseTestingUtility TEST_UTIL = null; + private static Configuration CONF = null; + private static final String DUMMY_VALUE = "val"; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + CONF = TEST_UTIL.getConfiguration(); + CONF.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + DummyRegionServerEndpoint.class.getName()); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testEndpoint() throws Exception { + final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + final ServerRpcController controller = new ServerRpcController(); + final BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse> rpcCallback = + new BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>(); + DummyRegionServerEndpointProtos.DummyService service = + ProtobufUtil.newServiceStub(DummyRegionServerEndpointProtos.DummyService.class, + new HBaseAdmin(CONF).coprocessorService(serverName)); + service.dummyCall(controller, + DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(), rpcCallback); + assertEquals(DUMMY_VALUE, rpcCallback.get().getValue()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + } + + static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService { + + @Override + public Service getService() { + return this; + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void dummyCall(RpcController controller, DummyRequest request, + RpcCallback<DummyResponse> callback) { + callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build()); + } + } +}
