http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java deleted file mode 100644 index 1113cfd..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. - */ - -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -import com.google.common.annotations.VisibleForTesting; - -/** - * - * Configuration is a heavy weight registry that does a lot of string operations and regex matching. - * Method calls into Configuration account for high CPU usage and have huge performance impact. - * This class caches the value in the TableConfiguration object to improve performance. - * see HBASE-12128 - * - */ [email protected] -public class TableConfiguration { - - public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; - public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; - public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; - public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1; - - private final long writeBufferSize; - private final int metaOperationTimeout; - private final int operationTimeout; - private final int scannerCaching; - private final long scannerMaxResultSize; - private final int primaryCallTimeoutMicroSecond; - private final int replicaCallTimeoutMicroSecondScan; - private final int retries; - private final int maxKeyValueSize; - - // toggle for async/sync prefetch - private final boolean clientScannerAsyncPrefetch; - - /** - * Constructor - * @param conf Configuration object - */ - TableConfiguration(Configuration conf) { - this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); - - this.metaOperationTimeout = conf.getInt( - HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - - this.operationTimeout = conf.getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - - this.scannerCaching = conf.getInt( - HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); - - this.scannerMaxResultSize = - conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - - this.primaryCallTimeoutMicroSecond = - conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms - - this.replicaCallTimeoutMicroSecondScan = - conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms - - this.retries = conf.getInt( - HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - - this.clientScannerAsyncPrefetch = conf.getBoolean( - Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH); - - this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); - } - - /** - * Constructor - * This is for internal testing purpose (using the default value). - * In real usage, we should read the configuration from the Configuration object. - */ - @VisibleForTesting - protected TableConfiguration() { - this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT; - this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; - this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; - this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; - this.scannerMaxResultSize = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; - this.primaryCallTimeoutMicroSecond = 10000; - this.replicaCallTimeoutMicroSecondScan = 1000000; - this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; - this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; - this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; - } - - public long getWriteBufferSize() { - return writeBufferSize; - } - - public int getMetaOperationTimeout() { - return metaOperationTimeout; - } - - public int getOperationTimeout() { - return operationTimeout; - } - - public int getScannerCaching() { - return scannerCaching; - } - - public int getPrimaryCallTimeoutMicroSecond() { - return primaryCallTimeoutMicroSecond; - } - - public int getReplicaCallTimeoutMicroSecondScan() { - return replicaCallTimeoutMicroSecondScan; - } - - public int getRetriesNumber() { - return retries; - } - - public int getMaxKeyValueSize() { - return maxKeyValueSize; - } - - public long getScannerMaxResultSize() { - return scannerMaxResultSize; - } - - public boolean isClientScannerAsyncPrefetch() { - return clientScannerAsyncPrefetch; - } - -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index a53fb70..ec6332a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -318,7 +318,7 @@ public abstract class AbstractRpcClient implements RpcClient { public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { PayloadCarryingRpcController pcrc; - if (controller != null) { + if (controller != null && controller instanceof PayloadCarryingRpcController) { pcrc = (PayloadCarryingRpcController) controller; if (!pcrc.hasCallTimeout()) { pcrc.setCallTimeout(channelOperationTimeout); http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java index e60fbd6..b1d54a4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java @@ -36,7 +36,7 @@ import com.google.protobuf.ServiceException; /** * Base class which provides clients with an RPC connection to - * call coprocessor endpoint {@link com.google.protobuf.Service}s. + * call coprocessor endpoint {@link com.google.protobuf.Service}s. * Note that clients should not use this class directly, except through * {@link org.apache.hadoop.hbase.client.HTableInterface#coprocessorService(byte[])}. */ @@ -53,7 +53,7 @@ public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcCh RpcCallback<Message> callback) { Message response = null; try { - response = callExecService(method, request, responsePrototype); + response = callExecService(controller, method, request, responsePrototype); } catch (IOException ioe) { LOG.warn("Call failed on IOException", ioe); ResponseConverter.setControllerException(controller, ioe); @@ -70,12 +70,13 @@ public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcCh Message request, Message responsePrototype) throws ServiceException { try { - return callExecService(method, request, responsePrototype); + return callExecService(controller, method, request, responsePrototype); } catch (IOException ioe) { throw new ServiceException("Error calling method "+method.getFullName(), ioe); } } - protected abstract Message callExecService(Descriptors.MethodDescriptor method, - Message request, Message responsePrototype) throws IOException; + protected abstract Message callExecService(RpcController controller, + Descriptors.MethodDescriptor method, Message request, Message responsePrototype) + throws IOException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java index 98a74ef..6e59972 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java @@ -24,7 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.util.ByteStringer; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcController; /** * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s @@ -45,18 +46,18 @@ import com.google.protobuf.Message; public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{ private static final Log LOG = LogFactory.getLog(MasterCoprocessorRpcChannel.class); - private final HConnection connection; + private final ClusterConnection connection; - public MasterCoprocessorRpcChannel(HConnection conn) { + public MasterCoprocessorRpcChannel(ClusterConnection conn) { this.connection = conn; } @Override - protected Message callExecService(Descriptors.MethodDescriptor method, + protected Message callExecService(RpcController controller, Descriptors.MethodDescriptor method, Message request, Message responsePrototype) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Call: "+method.getName()+", "+request.toString()); + if (LOG.isTraceEnabled()) { + LOG.trace("Call: "+method.getName()+", "+request.toString()); } final ClientProtos.CoprocessorServiceCall call = @@ -65,7 +66,10 @@ public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{ .setServiceName(method.getService().getFullName()) .setMethodName(method.getName()) .setRequest(request.toByteString()).build(); - CoprocessorServiceResponse result = ProtobufUtil.execService(connection.getMaster(), call); + + // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller + CoprocessorServiceResponse result = ProtobufUtil.execService(controller, + connection.getMaster(), call); Message response = null; if (result.getValue().hasValue()) { Message.Builder builder = responsePrototype.newBuilderForType(); http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 3fcfceb..321dd62 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -22,10 +22,9 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcController; /** * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s @@ -49,28 +49,28 @@ import com.google.protobuf.Message; public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ private static final Log LOG = LogFactory.getLog(RegionCoprocessorRpcChannel.class); - private final HConnection connection; + private final ClusterConnection connection; private final TableName table; private final byte[] row; private byte[] lastRegion; private int operationTimeout; - private RpcRetryingCallerFactory rpcFactory; + private RpcRetryingCallerFactory rpcCallerFactory; + private RpcControllerFactory rpcControllerFactory; - public RegionCoprocessorRpcChannel(HConnection conn, TableName table, byte[] row) { + public RegionCoprocessorRpcChannel(ClusterConnection conn, TableName table, byte[] row) { this.connection = conn; this.table = table; this.row = row; - this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null); - this.operationTimeout = conn.getConfiguration().getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.rpcCallerFactory = conn.getRpcRetryingCallerFactory(); + this.rpcControllerFactory = conn.getRpcControllerFactory(); + this.operationTimeout = conn.getConnectionConfiguration().getOperationTimeout(); } @Override - protected Message callExecService(Descriptors.MethodDescriptor method, - Message request, Message responsePrototype) - throws IOException { + protected Message callExecService(RpcController controller, + Descriptors.MethodDescriptor method, Message request, Message responsePrototype) + throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Call: "+method.getName()+", "+request.toString()); } @@ -79,6 +79,9 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ throw new IllegalArgumentException("Missing row property for remote region location"); } + final RpcController rpcController = controller == null + ? rpcControllerFactory.newController() : controller; + final ClientProtos.CoprocessorServiceCall call = ClientProtos.CoprocessorServiceCall.newBuilder() .setRow(ByteStringer.wrap(row)) @@ -87,12 +90,19 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ .setRequest(request.toByteString()).build(); RegionServerCallable<CoprocessorServiceResponse> callable = new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) { - public CoprocessorServiceResponse call(int callTimeout) throws Exception { - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - return ProtobufUtil.execService(getStub(), call, regionName); - } - }; - CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller() + @Override + public CoprocessorServiceResponse call(int callTimeout) throws Exception { + if (rpcController instanceof PayloadCarryingRpcController) { + ((PayloadCarryingRpcController) rpcController).setPriority(tableName); + } + if (rpcController instanceof TimeLimitedRpcController) { + ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout); + } + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + return ProtobufUtil.execService(rpcController, getStub(), call, regionName); + } + }; + CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller() .callWithRetries(callable, operationTimeout); Message response = null; if (result.getValue().hasValue()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java index 3f0a5d9..24d2de4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.ByteStringer; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcController; /** * Provides clients with an RPC connection to call coprocessor endpoint @@ -47,8 +48,9 @@ public class RegionServerCoprocessorRpcChannel extends CoprocessorRpcChannel { } @Override - protected Message callExecService(Descriptors.MethodDescriptor method, Message request, - Message responsePrototype) throws IOException { + protected Message callExecService(RpcController controller, + Descriptors.MethodDescriptor method, Message request, Message responsePrototype) + throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Call: " + method.getName() + ", " + request.toString()); } @@ -57,8 +59,10 @@ public class RegionServerCoprocessorRpcChannel extends CoprocessorRpcChannel { .setRow(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY)) .setServiceName(method.getService().getFullName()).setMethodName(method.getName()) .setRequest(request.toByteString()).build(); + + // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller CoprocessorServiceResponse result = - ProtobufUtil.execRegionServerService(connection.getClient(serverName), call); + ProtobufUtil.execRegionServerService(controller, connection.getClient(serverName), call); Message response = null; if (result.getValue().hasValue()) { Message.Builder builder = responsePrototype.newBuilderForType(); http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 2636777..f9fa21c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -173,6 +173,19 @@ import org.apache.hadoop.security.token.Token; import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier .RegionSpecifierType.REGION_NAME; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; /** * Protobufs utility. @@ -1575,21 +1588,22 @@ public final class ProtobufUtil { } } - public static CoprocessorServiceResponse execService(final ClientService.BlockingInterface client, - final CoprocessorServiceCall call, final byte[] regionName) throws IOException { + public static CoprocessorServiceResponse execService(final RpcController controller, + final ClientService.BlockingInterface client, final CoprocessorServiceCall call, + final byte[] regionName) throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() .setCall(call).setRegion( RequestConverter.buildRegionSpecifier(REGION_NAME, regionName)).build(); try { CoprocessorServiceResponse response = - client.execService(null, request); + client.execService(controller, request); return response; } catch (ServiceException se) { throw getRemoteException(se); } } - public static CoprocessorServiceResponse execService( + public static CoprocessorServiceResponse execService(final RpcController controller, final MasterService.BlockingInterface client, final CoprocessorServiceCall call) throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() @@ -1597,7 +1611,7 @@ public final class ProtobufUtil { RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build(); try { CoprocessorServiceResponse response = - client.execMasterService(null, request); + client.execMasterService(controller, request); return response; } catch (ServiceException se) { throw getRemoteException(se); @@ -1612,7 +1626,8 @@ public final class ProtobufUtil { * @throws IOException */ public static CoprocessorServiceResponse execRegionServerService( - final ClientService.BlockingInterface client, final CoprocessorServiceCall call) + final RpcController controller, final ClientService.BlockingInterface client, + final CoprocessorServiceCall call) throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest @@ -1622,7 +1637,7 @@ public final class ProtobufUtil { RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)) .build(); try { - CoprocessorServiceResponse response = client.execRegionServerService(null, request); + CoprocessorServiceResponse response = client.execRegionServerService(controller, request); return response; } catch (ServiceException se) { throw getRemoteException(se); @@ -1648,13 +1663,13 @@ public final class ProtobufUtil { * @return the retrieved region info * @throws IOException */ - public static HRegionInfo getRegionInfo(final AdminService.BlockingInterface admin, - final byte[] regionName) throws IOException { + public static HRegionInfo getRegionInfo(final RpcController controller, + final AdminService.BlockingInterface admin, final byte[] regionName) throws IOException { try { GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(regionName); GetRegionInfoResponse response = - admin.getRegionInfo(null, request); + admin.getRegionInfo(controller, request); return HRegionInfo.convert(response.getRegionInfo()); } catch (ServiceException se) { throw getRemoteException(se); @@ -1669,12 +1684,13 @@ public final class ProtobufUtil { * @param regionName * @throws IOException */ - public static void closeRegion(final AdminService.BlockingInterface admin, - final ServerName server, final byte[] regionName) throws IOException { + public static void closeRegion(final RpcController controller, + final AdminService.BlockingInterface admin, final ServerName server, final byte[] regionName) + throws IOException { CloseRegionRequest closeRegionRequest = RequestConverter.buildCloseRegionRequest(server, regionName); try { - admin.closeRegion(null, closeRegionRequest); + admin.closeRegion(controller, closeRegionRequest); } catch (ServiceException se) { throw getRemoteException(se); } @@ -1689,14 +1705,15 @@ public final class ProtobufUtil { * @return true if the region is closed * @throws IOException */ - public static boolean closeRegion(final AdminService.BlockingInterface admin, + public static boolean closeRegion(final RpcController controller, + final AdminService.BlockingInterface admin, final ServerName server, final byte[] regionName, final ServerName destinationServer) throws IOException { CloseRegionRequest closeRegionRequest = RequestConverter.buildCloseRegionRequest(server, regionName, destinationServer); try { - CloseRegionResponse response = admin.closeRegion(null, closeRegionRequest); + CloseRegionResponse response = admin.closeRegion(controller, closeRegionRequest); return ResponseConverter.isClosed(response); } catch (ServiceException se) { throw getRemoteException(se); @@ -1711,14 +1728,14 @@ public final class ProtobufUtil { * @param regionInfo * */ - public static void warmupRegion(final AdminService.BlockingInterface admin, - final HRegionInfo regionInfo) throws IOException { + public static void warmupRegion(final RpcController controller, + final AdminService.BlockingInterface admin, final HRegionInfo regionInfo) throws IOException { try { WarmupRegionRequest warmupRegionRequest = RequestConverter.buildWarmupRegionRequest(regionInfo); - admin.warmupRegion(null, warmupRegionRequest); + admin.warmupRegion(controller, warmupRegionRequest); } catch (ServiceException e) { throw getRemoteException(e); } @@ -1730,18 +1747,18 @@ public final class ProtobufUtil { * @param region * @throws IOException */ - public static void openRegion(final AdminService.BlockingInterface admin, - ServerName server, final HRegionInfo region) throws IOException { + public static void openRegion(final RpcController controller, + final AdminService.BlockingInterface admin, ServerName server, final HRegionInfo region) + throws IOException { OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, region, null, null); try { - admin.openRegion(null, request); + admin.openRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } } - /** * A helper to get the all the online regions on a region * server using admin protocol. @@ -1751,11 +1768,22 @@ public final class ProtobufUtil { * @throws IOException */ public static List<HRegionInfo> getOnlineRegions(final AdminService.BlockingInterface admin) + throws IOException { + return getOnlineRegions(null, admin); + } + + /** + * A helper to get the all the online regions on a region + * server using admin protocol. + * @return a list of online region info + */ + public static List<HRegionInfo> getOnlineRegions(final RpcController controller, + final AdminService.BlockingInterface admin) throws IOException { GetOnlineRegionRequest request = RequestConverter.buildGetOnlineRegionRequest(); GetOnlineRegionResponse response = null; try { - response = admin.getOnlineRegion(null, request); + response = admin.getOnlineRegion(controller, request); } catch (ServiceException se) { throw getRemoteException(se); } @@ -1779,16 +1807,14 @@ public final class ProtobufUtil { /** * A helper to get the info of a region server using admin protocol. - * - * @param admin * @return the server name - * @throws IOException */ - public static ServerInfo getServerInfo(final AdminService.BlockingInterface admin) + public static ServerInfo getServerInfo(final RpcController controller, + final AdminService.BlockingInterface admin) throws IOException { GetServerInfoRequest request = RequestConverter.buildGetServerInfoRequest(); try { - GetServerInfoResponse response = admin.getServerInfo(null, request); + GetServerInfoResponse response = admin.getServerInfo(controller, request); return response.getServerInfo(); } catch (ServiceException se) { throw getRemoteException(se); @@ -1799,19 +1825,27 @@ public final class ProtobufUtil { * A helper to get the list of files of a column family * on a given region using admin protocol. * - * @param admin - * @param regionName - * @param family * @return the list of store files - * @throws IOException */ public static List<String> getStoreFiles(final AdminService.BlockingInterface admin, final byte[] regionName, final byte[] family) throws IOException { + return getStoreFiles(null, admin, regionName, family); + } + + /** + * A helper to get the list of files of a column family + * on a given region using admin protocol. + * + * @return the list of store files + */ + public static List<String> getStoreFiles(final RpcController controller, + final AdminService.BlockingInterface admin, final byte[] regionName, final byte[] family) + throws IOException { GetStoreFileRequest request = RequestConverter.buildGetStoreFileRequest(regionName, family); try { - GetStoreFileResponse response = admin.getStoreFile(null, request); + GetStoreFileResponse response = admin.getStoreFile(controller, request); return response.getStoreFileList(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -1826,12 +1860,13 @@ public final class ProtobufUtil { * @param splitPoint * @throws IOException */ - public static void split(final AdminService.BlockingInterface admin, - final HRegionInfo hri, byte[] splitPoint) throws IOException { + public static void split(final RpcController controller, + final AdminService.BlockingInterface admin, final HRegionInfo hri, byte[] splitPoint) + throws IOException { SplitRegionRequest request = RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint); try { - admin.splitRegion(null, request); + admin.splitRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1848,7 +1883,8 @@ public final class ProtobufUtil { * @param user effective user * @throws IOException */ - public static void mergeRegions(final AdminService.BlockingInterface admin, + public static void mergeRegions(final RpcController controller, + final AdminService.BlockingInterface admin, final HRegionInfo region_a, final HRegionInfo region_b, final boolean forcible, final User user) throws IOException { final MergeRegionsRequest request = RequestConverter.buildMergeRegionsRequest( @@ -1858,7 +1894,7 @@ public final class ProtobufUtil { user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { - admin.mergeRegions(null, request); + admin.mergeRegions(controller, request); return null; } }); @@ -1869,7 +1905,7 @@ public final class ProtobufUtil { } } else { try { - admin.mergeRegions(null, request); + admin.mergeRegions(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -2144,8 +2180,9 @@ public final class ProtobufUtil { * @param actions the permissions to be granted * @throws ServiceException */ - public static void grant(AccessControlService.BlockingInterface protocol, - String userShortName, Permission.Action... actions) throws ServiceException { + public static void grant(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, + Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2154,7 +2191,7 @@ public final class ProtobufUtil { AccessControlProtos.GrantRequest request = RequestConverter. buildGrantRequest(userShortName, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.grant(null, request); + protocol.grant(controller, request); } /** @@ -2171,9 +2208,9 @@ public final class ProtobufUtil { * @param actions the permissions to be granted * @throws ServiceException */ - public static void grant(AccessControlService.BlockingInterface protocol, - String userShortName, TableName tableName, byte[] f, byte[] q, - Permission.Action... actions) throws ServiceException { + public static void grant(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, TableName tableName, + byte[] f, byte[] q, Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2182,7 +2219,7 @@ public final class ProtobufUtil { AccessControlProtos.GrantRequest request = RequestConverter. buildGrantRequest(userShortName, tableName, f, q, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.grant(null, request); + protocol.grant(controller, request); } /** @@ -2195,8 +2232,8 @@ public final class ProtobufUtil { * @param actions the permissions to be granted * @throws ServiceException */ - public static void grant(AccessControlService.BlockingInterface protocol, - String userShortName, String namespace, + public static void grant(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, String namespace, Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); @@ -2206,7 +2243,7 @@ public final class ProtobufUtil { AccessControlProtos.GrantRequest request = RequestConverter. buildGrantRequest(userShortName, namespace, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.grant(null, request); + protocol.grant(controller, request); } /** @@ -2219,8 +2256,9 @@ public final class ProtobufUtil { * @param actions the permissions to be revoked * @throws ServiceException */ - public static void revoke(AccessControlService.BlockingInterface protocol, - String userShortName, Permission.Action... actions) throws ServiceException { + public static void revoke(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, + Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2229,7 +2267,7 @@ public final class ProtobufUtil { AccessControlProtos.RevokeRequest request = RequestConverter. buildRevokeRequest(userShortName, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.revoke(null, request); + protocol.revoke(controller, request); } /** @@ -2246,9 +2284,9 @@ public final class ProtobufUtil { * @param actions the permissions to be revoked * @throws ServiceException */ - public static void revoke(AccessControlService.BlockingInterface protocol, - String userShortName, TableName tableName, byte[] f, byte[] q, - Permission.Action... actions) throws ServiceException { + public static void revoke(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, TableName tableName, + byte[] f, byte[] q, Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2257,7 +2295,7 @@ public final class ProtobufUtil { AccessControlProtos.RevokeRequest request = RequestConverter. buildRevokeRequest(userShortName, tableName, f, q, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.revoke(null, request); + protocol.revoke(controller, request); } /** @@ -2271,8 +2309,8 @@ public final class ProtobufUtil { * @param actions the permissions to be revoked * @throws ServiceException */ - public static void revoke(AccessControlService.BlockingInterface protocol, - String userShortName, String namespace, + public static void revoke(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, String namespace, Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); @@ -2282,7 +2320,7 @@ public final class ProtobufUtil { AccessControlProtos.RevokeRequest request = RequestConverter. buildRevokeRequest(userShortName, namespace, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.revoke(null, request); + protocol.revoke(controller, request); } /** @@ -2293,14 +2331,14 @@ public final class ProtobufUtil { * @param protocol the AccessControlService protocol proxy * @throws ServiceException */ - public static List<UserPermission> getUserPermissions( + public static List<UserPermission> getUserPermissions(RpcController controller, AccessControlService.BlockingInterface protocol) throws ServiceException { AccessControlProtos.GetUserPermissionsRequest.Builder builder = AccessControlProtos.GetUserPermissionsRequest.newBuilder(); builder.setType(AccessControlProtos.Permission.Type.Global); AccessControlProtos.GetUserPermissionsRequest request = builder.build(); AccessControlProtos.GetUserPermissionsResponse response = - protocol.getUserPermissions(null, request); + protocol.getUserPermissions(controller, request); List<UserPermission> perms = new ArrayList<UserPermission>(response.getUserPermissionCount()); for (AccessControlProtos.UserPermission perm: response.getUserPermissionList()) { perms.add(ProtobufUtil.toUserPermission(perm)); @@ -2317,7 +2355,7 @@ public final class ProtobufUtil { * @param t optional table name * @throws ServiceException */ - public static List<UserPermission> getUserPermissions( + public static List<UserPermission> getUserPermissions(RpcController controller, AccessControlService.BlockingInterface protocol, TableName t) throws ServiceException { AccessControlProtos.GetUserPermissionsRequest.Builder builder = @@ -2328,7 +2366,7 @@ public final class ProtobufUtil { builder.setType(AccessControlProtos.Permission.Type.Table); AccessControlProtos.GetUserPermissionsRequest request = builder.build(); AccessControlProtos.GetUserPermissionsResponse response = - protocol.getUserPermissions(null, request); + protocol.getUserPermissions(controller, request); List<UserPermission> perms = new ArrayList<UserPermission>(response.getUserPermissionCount()); for (AccessControlProtos.UserPermission perm: response.getUserPermissionList()) { perms.add(ProtobufUtil.toUserPermission(perm)); @@ -2345,7 +2383,7 @@ public final class ProtobufUtil { * @param namespace name of the namespace * @throws ServiceException */ - public static List<UserPermission> getUserPermissions( + public static List<UserPermission> getUserPermissions(RpcController controller, AccessControlService.BlockingInterface protocol, byte[] namespace) throws ServiceException { AccessControlProtos.GetUserPermissionsRequest.Builder builder = @@ -2356,7 +2394,7 @@ public final class ProtobufUtil { builder.setType(AccessControlProtos.Permission.Type.Namespace); AccessControlProtos.GetUserPermissionsRequest request = builder.build(); AccessControlProtos.GetUserPermissionsResponse response = - protocol.getUserPermissions(null, request); + protocol.getUserPermissions(controller, request); List<UserPermission> perms = new ArrayList<UserPermission>(response.getUserPermissionCount()); for (AccessControlProtos.UserPermission perm: response.getUserPermissionList()) { perms.add(ProtobufUtil.toUserPermission(perm)); http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java index c50abc1..25ac01f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java @@ -31,10 +31,12 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService.BlockingInterface; @@ -92,9 +94,12 @@ public class AccessControlClient { public static void grant(Connection connection, final TableName tableName, final String userName, final byte[] family, final byte[] qual, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + controller.setPriority(tableName); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.grant(getAccessControlServiceStub(table), userName, tableName, family, qual, - actions); + ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, tableName, + family, qual, actions); } } @@ -108,8 +113,12 @@ public class AccessControlClient { */ public static void grant(Connection connection, final String namespace, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.grant(getAccessControlServiceStub(table), userName, namespace, actions); + ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, namespace, + actions); } } @@ -119,8 +128,10 @@ public class AccessControlClient { */ public static void grant(Connection connection, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.grant(getAccessControlServiceStub(table), userName, actions); + ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, actions); } } @@ -144,9 +155,12 @@ public class AccessControlClient { public static void revoke(Connection connection, final TableName tableName, final String username, final byte[] family, final byte[] qualifier, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + controller.setPriority(tableName); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.revoke(getAccessControlServiceStub(table), username, tableName, family, - qualifier, actions); + ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), username, tableName, + family, qualifier, actions); } } @@ -160,8 +174,11 @@ public class AccessControlClient { */ public static void revoke(Connection connection, final String namespace, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.revoke(getAccessControlServiceStub(table), userName, namespace, actions); + ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, namespace, + actions); } } @@ -171,10 +188,11 @@ public class AccessControlClient { */ public static void revoke(Connection connection, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.revoke(getAccessControlServiceStub(table), userName, actions); + ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, actions); } - } /** @@ -188,6 +206,8 @@ public class AccessControlClient { */ public static List<UserPermission> getUserPermissions(Connection connection, String tableRegex) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); List<UserPermission> permList = new ArrayList<UserPermission>(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { try (Admin admin = connection.getAdmin()) { @@ -196,14 +216,16 @@ public class AccessControlClient { AccessControlProtos.AccessControlService.newBlockingStub(service); HTableDescriptor[] htds = null; if (tableRegex == null || tableRegex.isEmpty()) { - permList = ProtobufUtil.getUserPermissions(protocol); + permList = ProtobufUtil.getUserPermissions(controller, protocol); } else if (tableRegex.charAt(0) == '@') { String namespace = tableRegex.substring(1); - permList = ProtobufUtil.getUserPermissions(protocol, Bytes.toBytes(namespace)); + permList = ProtobufUtil.getUserPermissions(controller, protocol, + Bytes.toBytes(namespace)); } else { htds = admin.listTables(Pattern.compile(tableRegex), true); for (HTableDescriptor hd : htds) { - permList.addAll(ProtobufUtil.getUserPermissions(protocol, hd.getTableName())); + permList.addAll(ProtobufUtil.getUserPermissions(controller, protocol, + hd.getTableName())); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index 0b844a2..0b53f95 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -36,11 +36,14 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.ipc.FailedServerException; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -287,7 +290,7 @@ public class MetaTableLocator { } catch (RegionServerStoppedException e) { // Pass -- server name sends us to a server that is dying or already dead. } - return (service != null) && verifyRegionLocation(service, + return (service != null) && verifyRegionLocation(hConnection, service, getMetaRegionLocation(zkw, replicaId), RegionReplicaUtil.getRegionInfoForReplica( HRegionInfo.FIRST_META_REGIONINFO, replicaId).getRegionName()); } @@ -307,17 +310,22 @@ public class MetaTableLocator { // rather than have to pass it in. Its made awkward by the fact that the // HRI is likely a proxy against remote server so the getServerName needs // to be fixed to go to a local method or to a cache before we can do this. - private boolean verifyRegionLocation(AdminService.BlockingInterface hostingServer, - final ServerName address, final byte [] regionName) + private boolean verifyRegionLocation(final Connection connection, + AdminService.BlockingInterface hostingServer, final ServerName address, + final byte [] regionName) throws IOException { if (hostingServer == null) { LOG.info("Passed hostingServer is null"); return false; } Throwable t; + PayloadCarryingRpcController controller = null; + if (connection instanceof ClusterConnection) { + controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + } try { // Try and get regioninfo from the hosting server. - return ProtobufUtil.getRegionInfo(hostingServer, regionName) != null; + return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null; } catch (ConnectException e) { t = e; } catch (RetriesExhaustedException e) { @@ -594,7 +602,7 @@ public class MetaTableLocator { ServerName sn = null; while (true) { sn = getMetaRegionLocation(zkw, replicaId); - if (sn != null || (System.currentTimeMillis() - startTime) + if (sn != null || (System.currentTimeMillis() - startTime) > timeout - HConstants.SOCKET_RETRY_WAIT_MS) { break; } http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java index 1b039bd..4d55c33 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java @@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; @@ -78,26 +80,34 @@ public class TestSnapshotFromAdmin { // setup the conf to match the expected properties conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); conf.setLong("hbase.client.pause", pauseTime); + // mock the master admin to our mock MasterKeepAliveConnection mockMaster = Mockito.mock(MasterKeepAliveConnection.class); Mockito.when(mockConnection.getConfiguration()).thenReturn(conf); Mockito.when(mockConnection.getKeepAliveMasterService()).thenReturn(mockMaster); + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(controllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); + Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); // set the max wait time for the snapshot to complete SnapshotResponse response = SnapshotResponse.newBuilder() .setExpectedTimeout(maxWaitTime) .build(); Mockito - .when( - mockMaster.snapshot((RpcController) Mockito.isNull(), - Mockito.any(SnapshotRequest.class))).thenReturn(response); + .when( + mockMaster.snapshot((RpcController) Mockito.any(), + Mockito.any(SnapshotRequest.class))).thenReturn(response); // setup the response IsSnapshotDoneResponse.Builder builder = IsSnapshotDoneResponse.newBuilder(); builder.setDone(false); // first five times, we return false, last we get success Mockito.when( - mockMaster.isSnapshotDone((RpcController) Mockito.isNull(), + mockMaster.isSnapshotDone((RpcController) Mockito.any(), Mockito.any(IsSnapshotDoneRequest.class))).thenReturn(builder.build(), builder.build(), - builder.build(), builder.build(), builder.build(), builder.setDone(true).build()); + builder.build(), builder.build(), builder.build(), builder.setDone(true).build()); // setup the admin and run the test Admin admin = new HBaseAdmin(mockConnection); @@ -123,6 +133,13 @@ public class TestSnapshotFromAdmin { .mock(ConnectionImplementation.class); Configuration conf = HBaseConfiguration.create(); Mockito.when(mockConnection.getConfiguration()).thenReturn(conf); + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(controllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); + Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); Admin admin = new HBaseAdmin(mockConnection); SnapshotDescription.Builder builder = SnapshotDescription.newBuilder(); // check that invalid snapshot names fail @@ -142,11 +159,11 @@ public class TestSnapshotFromAdmin { Mockito.when(mockConnection.getKeepAliveMasterService()).thenReturn(master); SnapshotResponse response = SnapshotResponse.newBuilder().setExpectedTimeout(0).build(); Mockito.when( - master.snapshot((RpcController) Mockito.isNull(), Mockito.any(SnapshotRequest.class))) + master.snapshot((RpcController) Mockito.any(), Mockito.any(SnapshotRequest.class))) .thenReturn(response); IsSnapshotDoneResponse doneResponse = IsSnapshotDoneResponse.newBuilder().setDone(true).build(); Mockito.when( - master.isSnapshotDone((RpcController) Mockito.isNull(), + master.isSnapshotDone((RpcController) Mockito.any(), Mockito.any(IsSnapshotDoneRequest.class))).thenReturn(doneResponse); // make sure that we can use valid names http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java index 6f3baa0..31ca996 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -279,7 +279,7 @@ public class DistributedHBaseCluster extends HBaseCluster { AdminProtos.AdminService.BlockingInterface client = ((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName()); - ServerInfo info = ProtobufUtil.getServerInfo(client); + ServerInfo info = ProtobufUtil.getServerInfo(null, client); return ProtobufUtil.toServerName(info.getServerName()); } @@ -433,7 +433,7 @@ public class DistributedHBaseCluster extends HBaseCluster { Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator()); toStart.addAll(initial.getServers()); toKill.addAll(current.getServers()); - + ServerName master = initial.getMaster(); for (ServerName server : current.getServers()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon index 158a239..b98f50d 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -42,7 +42,7 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; <%java return; %> </%if> <%java> - ServerInfo serverInfo = ProtobufUtil.getServerInfo(regionServer.getRSRpcServices()); + ServerInfo serverInfo = ProtobufUtil.getServerInfo(null, regionServer.getRSRpcServices()); ServerName serverName = ProtobufUtil.toServerName(serverInfo.getServerName()); List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(regionServer.getRSRpcServices()); MasterAddressTracker masterAddressTracker = regionServer.getMasterAddressTracker(); http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 64a75b9..a9cf0f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -22,7 +22,9 @@ import java.nio.channels.ClosedChannelException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.util.Pair; @@ -38,7 +40,8 @@ import com.google.protobuf.Message; * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained * RpcServer.Call */ [email protected] [email protected]({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) [email protected] public class CallRunner { private static final Log LOG = LogFactory.getLog(CallRunner.class); http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index 2414e3d..91c152b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -37,7 +37,7 @@ public abstract class RpcScheduler { "hbase.ipc.server.priority.max.callqueue.length"; /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ - static abstract class Context { + public static abstract class Context { public abstract InetSocketAddress getListenerAddress(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 2c6084a..f0aed2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -289,7 +289,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. */ - class Call implements RpcCallContext { + @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) + @InterfaceStability.Evolving + public class Call implements RpcCallContext { protected int id; // the client's call id protected BlockingService service; protected MethodDescriptor md; @@ -369,6 +371,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.header; } + public boolean hasPriority() { + return this.header.hasPriority(); + } + + public int getPriority() { + return this.header.getPriority(); + } + /* * Short string representation without param info because param itself could be huge depends on * the payload of a command http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java index 196320d..d31711e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java @@ -695,7 +695,7 @@ public class RegionPlacementMaintainer { UpdateFavoredNodesResponse updateFavoredNodesResponse = currentRegionServer.updateFavoredNodes(null, request); LOG.info("Region server " + - ProtobufUtil.getServerInfo(currentRegionServer).getServerName() + + ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() + " has updated " + updateFavoredNodesResponse.getResponse() + " / " + singleServerPlan.getAssignmentMap().size() + " regions with the assignment plan"); http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 341d51c..dabef71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -158,6 +160,7 @@ public class ServerManager { private final long warningSkew; private final RetryCounterFactory pingRetryCounterFactory; + private final RpcControllerFactory rpcControllerFactory; /** * Set of region servers which are dead but not processed immediately. If one @@ -222,6 +225,9 @@ public class ServerManager { int pingSleepInterval = Math.max(1, master.getConfiguration().getInt( "hbase.master.ping.server.retry.sleep.interval", 100)); this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval); + this.rpcControllerFactory = this.connection == null + ? null + : connection.getRpcControllerFactory(); } /** @@ -784,6 +790,10 @@ public class ServerManager { } } + private PayloadCarryingRpcController newRpcController() { + return rpcControllerFactory == null ? null : rpcControllerFactory.newController(); + } + /** * Sends an CLOSE RPC to the specified server to close the specified region. * <p> @@ -804,8 +814,8 @@ public class ServerManager { region.getRegionNameAsString() + " failed because no RPC connection found to this server"); } - return ProtobufUtil.closeRegion(admin, server, region.getRegionName(), - dest); + PayloadCarryingRpcController controller = newRpcController(); + return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(), dest); } public boolean sendRegionClose(ServerName server, @@ -826,7 +836,8 @@ public class ServerManager { if (server == null) return; try { AdminService.BlockingInterface admin = getRsAdmin(server); - ProtobufUtil.warmupRegion(admin, region); + PayloadCarryingRpcController controller = newRpcController(); + ProtobufUtil.warmupRegion(controller, admin, region); } catch (IOException e) { LOG.error("Received exception in RPC for warmup server:" + server + "region: " + region + @@ -838,11 +849,12 @@ public class ServerManager { * Contacts a region server and waits up to timeout ms * to close the region. This bypasses the active hmaster. */ - public static void closeRegionSilentlyAndWait(ClusterConnection connection, + public static void closeRegionSilentlyAndWait(ClusterConnection connection, ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException { AdminService.BlockingInterface rs = connection.getAdmin(server); + PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController(); try { - ProtobufUtil.closeRegion(rs, server, region.getRegionName()); + ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName()); } catch (IOException e) { LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e); } @@ -850,12 +862,13 @@ public class ServerManager { while (System.currentTimeMillis() < expiration) { try { HRegionInfo rsRegion = - ProtobufUtil.getRegionInfo(rs, region.getRegionName()); + ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName()); if (rsRegion == null) return; } catch (IOException ioe) { if (ioe instanceof NotServingRegionException) // no need to retry again return; - LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(), ioe); + LOG.warn("Exception when retrieving regioninfo from: " + + region.getRegionNameAsString(), ioe); } Thread.sleep(1000); } @@ -890,7 +903,8 @@ public class ServerManager { + region_b.getRegionNameAsString() + " failed because no RPC connection found to this server"); } - ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible, user); + PayloadCarryingRpcController controller = newRpcController(); + ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible, user); } /** @@ -899,12 +913,14 @@ public class ServerManager { public boolean isServerReachable(ServerName server) { if (server == null) throw new NullPointerException("Passed server is null"); + RetryCounter retryCounter = pingRetryCounterFactory.create(); while (retryCounter.shouldRetry()) { try { + PayloadCarryingRpcController controller = newRpcController(); AdminService.BlockingInterface admin = getRsAdmin(server); if (admin != null) { - ServerInfo info = ProtobufUtil.getServerInfo(admin); + ServerInfo info = ProtobufUtil.getServerInfo(controller, admin); return info != null && info.hasServerName() && server.getStartcode() == info.getServerName().getStartCode(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 3bd3c3f..002bdb2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -693,8 +693,8 @@ public class MiniHBaseCluster extends HBaseCluster { int count = 0; for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) { HRegionServer hrs = rst.getRegionServer(); - Region metaRegion = hrs.getOnlineRegion(regionName); - if (metaRegion != null) { + Region region = hrs.getOnlineRegion(regionName); + if (region != null) { index = count; break; } http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java index e460535..8ac89da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java @@ -55,11 +55,11 @@ public class TestGlobalMemStoreSize { private HBaseTestingUtility TEST_UTIL; private MiniHBaseCluster cluster; - + /** * Test the global mem store size in the region server is equal to sum of each * region's mem store size - * @throws Exception + * @throws Exception */ @Test public void testGlobalMemStore() throws Exception { @@ -87,8 +87,8 @@ public class TestGlobalMemStoreSize { for (HRegionServer server : getOnlineRegionServers()) { long globalMemStoreSize = 0; for (HRegionInfo regionInfo : - ProtobufUtil.getOnlineRegions(server.getRSRpcServices())) { - globalMemStoreSize += + ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) { + globalMemStoreSize += server.getFromOnlineRegions(regionInfo.getEncodedName()). getMemstoreSize(); } @@ -103,7 +103,7 @@ public class TestGlobalMemStoreSize { ", size=" + server.getRegionServerAccounting().getGlobalMemstoreSize()); for (HRegionInfo regionInfo : - ProtobufUtil.getOnlineRegions(server.getRSRpcServices())) { + ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) { Region r = server.getFromOnlineRegions(regionInfo.getEncodedName()); flush(r, server); } @@ -119,7 +119,7 @@ public class TestGlobalMemStoreSize { // If size > 0, see if its because the meta region got edits while // our test was running.... for (HRegionInfo regionInfo : - ProtobufUtil.getOnlineRegions(server.getRSRpcServices())) { + ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) { Region r = server.getFromOnlineRegions(regionInfo.getEncodedName()); long l = r.getMemstoreSize(); if (l > 0) { @@ -154,7 +154,7 @@ public class TestGlobalMemStoreSize { private List<HRegionServer> getOnlineRegionServers() { List<HRegionServer> list = new ArrayList<HRegionServer>(); - for (JVMClusterUtil.RegionServerThread rst : + for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) { if (rst.getRegionServer().isOnline()) { list.add(rst.getRegionServer()); http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index 7e934c0..8b84452 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -44,6 +44,13 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.ipc.CallRunner; +import org.apache.hadoop.hbase.ipc.DelegatingRpcScheduler; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; @@ -618,5 +625,77 @@ public class TestMetaTableAccessor { meta.close(); } } + + public static class SpyingRpcSchedulerFactory extends SimpleRpcSchedulerFactory { + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { + final RpcScheduler delegate = super.create(conf, priority, server); + return new SpyingRpcScheduler(delegate); + } + } + + public static class SpyingRpcScheduler extends DelegatingRpcScheduler { + long numPriorityCalls = 0; + + public SpyingRpcScheduler(RpcScheduler delegate) { + super(delegate); + } + + @Override + public boolean dispatch(CallRunner task) throws IOException, InterruptedException { + int priority = task.getCall().getPriority(); + + if (priority > HConstants.QOS_THRESHOLD) { + numPriorityCalls++; + } + return super.dispatch(task); + } + } + + @Test + public void testMetaUpdatesGoToPriorityQueue() throws Exception { + // This test has to be end-to-end, and do the verification from the server side + Configuration c = UTIL.getConfiguration(); + + c.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SpyingRpcSchedulerFactory.class.getName()); + + // restart so that new config takes place + afterClass(); + beforeClass(); + + TableName tableName = TableName.valueOf("foo"); + try (Admin admin = connection.getAdmin(); + RegionLocator rl = connection.getRegionLocator(tableName)) { + + // create a table and prepare for a manual split + UTIL.createTable(tableName, "cf1"); + + HRegionLocation loc = rl.getAllRegionLocations().get(0); + HRegionInfo parent = loc.getRegionInfo(); + long rid = 1000; + byte[] splitKey = Bytes.toBytes("a"); + HRegionInfo splitA = new HRegionInfo(parent.getTable(), parent.getStartKey(), + splitKey, false, rid); + HRegionInfo splitB = new HRegionInfo(parent.getTable(), splitKey, + parent.getEndKey(), false, rid); + + // find the meta server + MiniHBaseCluster cluster = UTIL.getMiniHBaseCluster(); + int rsIndex = cluster.getServerWithMeta(); + HRegionServer rs; + if (rsIndex >= 0) { + rs = cluster.getRegionServer(rsIndex); + } else { + // it is in master + rs = cluster.getMaster(); + } + SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler(); + long prevCalls = scheduler.numPriorityCalls; + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1); + + assertTrue(prevCalls < scheduler.numPriorityCalls); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java index 9943749..ba6e1d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; @@ -180,7 +182,7 @@ public class TestMetaTableLocator { // Mock an ClientProtocol. final ClientProtos.ClientService.BlockingInterface implementation = Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); - + ClusterConnection connection = mockConnection(null, implementation); // If a 'get' is called on mocked interface, throw connection refused. @@ -250,6 +252,10 @@ public class TestMetaTableLocator { (GetRegionInfoRequest)Mockito.any())).thenThrow(connectException); Mockito.when(connection.getAdmin(Mockito.any(ServerName.class))). thenReturn(implementation); + RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(controllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory); ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis()); MetaTableLocator.setMetaLocation(this.watcher, http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 24c0c32..dc1ecf1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -54,6 +54,11 @@ public class HConnectionTestingUtility { throws ZooKeeperConnectionException { ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class); Mockito.when(connection.getConfiguration()).thenReturn(conf); + Mockito.when(connection.getRpcControllerFactory()).thenReturn( + Mockito.mock(RpcControllerFactory.class)); + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); return connection; } http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java index df8f4f6..10dbed0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java @@ -1254,8 +1254,8 @@ public class TestAdmin1 { try { AdminService.BlockingInterface admin = TEST_UTIL.getHBaseAdmin().getConnection() .getAdmin(regions.get(1).getSecond()); - ProtobufUtil.mergeRegions(admin, regions.get(1).getFirst(), regions.get(2).getFirst(), true, - null); + ProtobufUtil.mergeRegions(null, admin, regions.get(1).getFirst(), regions.get(2).getFirst(), + true, null); } catch (MergeRegionException mm) { gotException = true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 3b88184..520f210 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -604,7 +604,7 @@ public class TestFromClientSide { public void testMaxKeyValueSize() throws Exception { TableName TABLE = TableName.valueOf("testMaxKeyValueSize"); Configuration conf = TEST_UTIL.getConfiguration(); - String oldMaxSize = conf.get(TableConfiguration.MAX_KEYVALUE_SIZE_KEY); + String oldMaxSize = conf.get(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY); Table ht = TEST_UTIL.createTable(TABLE, FAMILY); byte[] value = new byte[4 * 1024 * 1024]; Put put = new Put(ROW); @@ -612,7 +612,7 @@ public class TestFromClientSide { ht.put(put); try { TEST_UTIL.getConfiguration().setInt( - TableConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024); + ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024); // Create new table so we pick up the change in Configuration. try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { @@ -624,7 +624,7 @@ public class TestFromClientSide { } fail("Inserting a too large KeyValue worked, should throw exception"); } catch(Exception e) {} - conf.set(TableConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize); + conf.set(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize); } @Test http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 5995191..ddd5fa3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -270,20 +270,20 @@ public class TestFromClientSide3 { // create an empty Put Put put1 = new Put(ROW); actions.add(put1); - + Put put2 = new Put(ANOTHERROW); put2.addColumn(FAMILY, QUALIFIER, VALUE); actions.add(put2); - + table.batch(actions, results); fail("Empty Put should have failed the batch call"); } catch (IllegalArgumentException iae) { - + } finally { table.close(); } } - + @Test public void testHTableExistsMethodSingleRegionSingleGet() throws Exception { // Test with a single region table. @@ -401,7 +401,7 @@ public class TestFromClientSide3 { @Test public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { Table table = TEST_UTIL.createTable( - TableName.valueOf("testHTableExistsMethodMultipleRegionsMultipleGets"), + TableName.valueOf("testHTableExistsMethodMultipleRegionsMultipleGets"), new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255); Put put = new Put(ROW); put.addColumn(FAMILY, QUALIFIER, VALUE);
