http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java index 2d2edaf..81c34d4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java @@ -26,12 +26,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcController; /** * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s @@ -45,18 +47,18 @@ import com.google.protobuf.Message; public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{ private static final Log LOG = LogFactory.getLog(MasterCoprocessorRpcChannel.class); - private final HConnection connection; + private final ClusterConnection connection; - public MasterCoprocessorRpcChannel(HConnection conn) { + public MasterCoprocessorRpcChannel(ClusterConnection conn) { this.connection = conn; } @Override - protected Message callExecService(Descriptors.MethodDescriptor method, + protected Message callExecService(RpcController controller, Descriptors.MethodDescriptor method, Message request, Message responsePrototype) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Call: "+method.getName()+", "+request.toString()); + if (LOG.isTraceEnabled()) { + LOG.trace("Call: "+method.getName()+", "+request.toString()); } final ClientProtos.CoprocessorServiceCall call = @@ -65,7 +67,10 @@ public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{ .setServiceName(method.getService().getFullName()) .setMethodName(method.getName()) .setRequest(request.toByteString()).build(); - CoprocessorServiceResponse result = ProtobufUtil.execService(connection.getMaster(), call); + + // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller + CoprocessorServiceResponse result = ProtobufUtil.execService(controller, + connection.getMaster(), call); Message response = null; if (result.getValue().hasValue()) { Message.Builder builder = responsePrototype.newBuilderForType();
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 009156e..ea70265 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcController; /** * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s @@ -49,28 +51,28 @@ import com.google.protobuf.Message; public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ private static final Log LOG = LogFactory.getLog(RegionCoprocessorRpcChannel.class); - private final HConnection connection; + private final ClusterConnection connection; private final TableName table; private final byte[] row; private byte[] lastRegion; private int operationTimeout; - private RpcRetryingCallerFactory rpcFactory; + private RpcRetryingCallerFactory rpcCallerFactory; + private RpcControllerFactory rpcControllerFactory; - public RegionCoprocessorRpcChannel(HConnection conn, TableName table, byte[] row) { + public RegionCoprocessorRpcChannel(ClusterConnection conn, TableName table, byte[] row) { this.connection = conn; this.table = table; this.row = row; - this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null); - this.operationTimeout = conn.getConfiguration().getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.rpcCallerFactory = conn.getRpcRetryingCallerFactory(); + this.rpcControllerFactory = conn.getRpcControllerFactory(); + this.operationTimeout = conn.getConnectionConfiguration().getOperationTimeout(); } @Override - protected Message callExecService(Descriptors.MethodDescriptor method, - Message request, Message responsePrototype) - throws IOException { + protected Message callExecService(RpcController controller, + Descriptors.MethodDescriptor method, Message request, Message responsePrototype) + throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Call: "+method.getName()+", "+request.toString()); } @@ -79,6 +81,9 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ throw new IllegalArgumentException("Missing row property for remote region location"); } + final RpcController rpcController = controller == null + ? rpcControllerFactory.newController() : controller; + final ClientProtos.CoprocessorServiceCall call = ClientProtos.CoprocessorServiceCall.newBuilder() .setRow(ByteStringer.wrap(row)) @@ -87,12 +92,19 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ .setRequest(request.toByteString()).build(); RegionServerCallable<CoprocessorServiceResponse> callable = new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) { - public CoprocessorServiceResponse call(int callTimeout) throws Exception { - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - return ProtobufUtil.execService(getStub(), call, regionName); - } - }; - CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller() + @Override + public CoprocessorServiceResponse call(int callTimeout) throws Exception { + if (rpcController instanceof PayloadCarryingRpcController) { + ((PayloadCarryingRpcController) rpcController).setPriority(tableName); + } + if (rpcController instanceof TimeLimitedRpcController) { + ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout); + } + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + return ProtobufUtil.execService(rpcController, getStub(), call, regionName); + } + }; + CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller() .callWithRetries(callable, operationTimeout); Message response = null; if (result.getValue().hasValue()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java index 01cdd28..057da88 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.ByteStringer; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcController; /** * Provides clients with an RPC connection to call coprocessor endpoint @@ -48,8 +49,9 @@ public class RegionServerCoprocessorRpcChannel extends CoprocessorRpcChannel { } @Override - protected Message callExecService(Descriptors.MethodDescriptor method, Message request, - Message responsePrototype) throws IOException { + protected Message callExecService(RpcController controller, + Descriptors.MethodDescriptor method, Message request, Message responsePrototype) + throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Call: " + method.getName() + ", " + request.toString()); } @@ -58,8 +60,10 @@ public class RegionServerCoprocessorRpcChannel extends CoprocessorRpcChannel { .setRow(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY)) .setServiceName(method.getService().getFullName()).setMethodName(method.getName()) .setRequest(request.toByteString()).build(); + + // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller CoprocessorServiceResponse result = - ProtobufUtil.execRegionServerService(connection.getClient(serverName), call); + ProtobufUtil.execRegionServerService(controller, connection.getClient(serverName), call); Message response = null; if (result.getValue().hasValue()) { Message.Builder builder = responsePrototype.newBuilderForType(); http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 1000666..50b3c0c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -163,6 +163,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.Parser; import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; import com.google.protobuf.TextFormat; @@ -1612,21 +1613,22 @@ public final class ProtobufUtil { } } - public static CoprocessorServiceResponse execService(final ClientService.BlockingInterface client, - final CoprocessorServiceCall call, final byte[] regionName) throws IOException { + public static CoprocessorServiceResponse execService(final RpcController controller, + final ClientService.BlockingInterface client, final CoprocessorServiceCall call, + final byte[] regionName) throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() .setCall(call).setRegion( RequestConverter.buildRegionSpecifier(REGION_NAME, regionName)).build(); try { CoprocessorServiceResponse response = - client.execService(null, request); + client.execService(controller, request); return response; } catch (ServiceException se) { throw getRemoteException(se); } } - public static CoprocessorServiceResponse execService( + public static CoprocessorServiceResponse execService(final RpcController controller, final MasterService.BlockingInterface client, final CoprocessorServiceCall call) throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() @@ -1634,7 +1636,7 @@ public final class ProtobufUtil { RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build(); try { CoprocessorServiceResponse response = - client.execMasterService(null, request); + client.execMasterService(controller, request); return response; } catch (ServiceException se) { throw getRemoteException(se); @@ -1649,7 +1651,8 @@ public final class ProtobufUtil { * @throws IOException */ public static CoprocessorServiceResponse execRegionServerService( - final ClientService.BlockingInterface client, final CoprocessorServiceCall call) + final RpcController controller, final ClientService.BlockingInterface client, + final CoprocessorServiceCall call) throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest @@ -1659,7 +1662,7 @@ public final class ProtobufUtil { RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)) .build(); try { - CoprocessorServiceResponse response = client.execRegionServerService(null, request); + CoprocessorServiceResponse response = client.execRegionServerService(controller, request); return response; } catch (ServiceException se) { throw getRemoteException(se); @@ -1685,13 +1688,13 @@ public final class ProtobufUtil { * @return the retrieved region info * @throws IOException */ - public static HRegionInfo getRegionInfo(final AdminService.BlockingInterface admin, - final byte[] regionName) throws IOException { + public static HRegionInfo getRegionInfo(final RpcController controller, + final AdminService.BlockingInterface admin, final byte[] regionName) throws IOException { try { GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(regionName); GetRegionInfoResponse response = - admin.getRegionInfo(null, request); + admin.getRegionInfo(controller, request); return HRegionInfo.convert(response.getRegionInfo()); } catch (ServiceException se) { throw getRemoteException(se); @@ -1707,12 +1710,13 @@ public final class ProtobufUtil { * @param transitionInZK * @throws IOException */ - public static void closeRegion(final AdminService.BlockingInterface admin, - final ServerName server, final byte[] regionName, final boolean transitionInZK) throws IOException { + public static void closeRegion(final RpcController controller, + final AdminService.BlockingInterface admin, final ServerName server, final byte[] regionName, + final boolean transitionInZK) throws IOException { CloseRegionRequest closeRegionRequest = RequestConverter.buildCloseRegionRequest(server, regionName, transitionInZK); try { - admin.closeRegion(null, closeRegionRequest); + admin.closeRegion(controller, closeRegionRequest); } catch (ServiceException se) { throw getRemoteException(se); } @@ -1728,7 +1732,8 @@ public final class ProtobufUtil { * @return true if the region is closed * @throws IOException */ - public static boolean closeRegion(final AdminService.BlockingInterface admin, + public static boolean closeRegion(final RpcController controller, + final AdminService.BlockingInterface admin, final ServerName server, final byte[] regionName, final int versionOfClosingNode, final ServerName destinationServer, @@ -1737,7 +1742,7 @@ public final class ProtobufUtil { RequestConverter.buildCloseRegionRequest(server, regionName, versionOfClosingNode, destinationServer, transitionInZK); try { - CloseRegionResponse response = admin.closeRegion(null, closeRegionRequest); + CloseRegionResponse response = admin.closeRegion(controller, closeRegionRequest); return ResponseConverter.isClosed(response); } catch (ServiceException se) { throw getRemoteException(se); @@ -1752,14 +1757,14 @@ public final class ProtobufUtil { * @param regionInfo * */ - public static void warmupRegion(final AdminService.BlockingInterface admin, - final HRegionInfo regionInfo) throws IOException { + public static void warmupRegion(final RpcController controller, + final AdminService.BlockingInterface admin, final HRegionInfo regionInfo) throws IOException { try { WarmupRegionRequest warmupRegionRequest = RequestConverter.buildWarmupRegionRequest(regionInfo); - admin.warmupRegion(null, warmupRegionRequest); + admin.warmupRegion(controller, warmupRegionRequest); } catch (ServiceException e) { throw getRemoteException(e); } @@ -1771,18 +1776,18 @@ public final class ProtobufUtil { * @param region * @throws IOException */ - public static void openRegion(final AdminService.BlockingInterface admin, - ServerName server, final HRegionInfo region) throws IOException { + public static void openRegion(final RpcController controller, + final AdminService.BlockingInterface admin, ServerName server, final HRegionInfo region) + throws IOException { OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, region, -1, null, null); try { - admin.openRegion(null, request); + admin.openRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } } - /** * A helper to get the all the online regions on a region * server using admin protocol. @@ -1792,11 +1797,22 @@ public final class ProtobufUtil { * @throws IOException */ public static List<HRegionInfo> getOnlineRegions(final AdminService.BlockingInterface admin) + throws IOException { + return getOnlineRegions(null, admin); + } + + /** + * A helper to get the all the online regions on a region + * server using admin protocol. + * @return a list of online region info + */ + public static List<HRegionInfo> getOnlineRegions(final RpcController controller, + final AdminService.BlockingInterface admin) throws IOException { GetOnlineRegionRequest request = RequestConverter.buildGetOnlineRegionRequest(); GetOnlineRegionResponse response = null; try { - response = admin.getOnlineRegion(null, request); + response = admin.getOnlineRegion(controller, request); } catch (ServiceException se) { throw getRemoteException(se); } @@ -1820,16 +1836,14 @@ public final class ProtobufUtil { /** * A helper to get the info of a region server using admin protocol. - * - * @param admin * @return the server name - * @throws IOException */ - public static ServerInfo getServerInfo(final AdminService.BlockingInterface admin) + public static ServerInfo getServerInfo(final RpcController controller, + final AdminService.BlockingInterface admin) throws IOException { GetServerInfoRequest request = RequestConverter.buildGetServerInfoRequest(); try { - GetServerInfoResponse response = admin.getServerInfo(null, request); + GetServerInfoResponse response = admin.getServerInfo(controller, request); return response.getServerInfo(); } catch (ServiceException se) { throw getRemoteException(se); @@ -1840,19 +1854,27 @@ public final class ProtobufUtil { * A helper to get the list of files of a column family * on a given region using admin protocol. * - * @param admin - * @param regionName - * @param family * @return the list of store files - * @throws IOException */ public static List<String> getStoreFiles(final AdminService.BlockingInterface admin, final byte[] regionName, final byte[] family) throws IOException { + return getStoreFiles(null, admin, regionName, family); + } + + /** + * A helper to get the list of files of a column family + * on a given region using admin protocol. + * + * @return the list of store files + */ + public static List<String> getStoreFiles(final RpcController controller, + final AdminService.BlockingInterface admin, final byte[] regionName, final byte[] family) + throws IOException { GetStoreFileRequest request = RequestConverter.buildGetStoreFileRequest(regionName, family); try { - GetStoreFileResponse response = admin.getStoreFile(null, request); + GetStoreFileResponse response = admin.getStoreFile(controller, request); return response.getStoreFileList(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -1867,12 +1889,13 @@ public final class ProtobufUtil { * @param splitPoint * @throws IOException */ - public static void split(final AdminService.BlockingInterface admin, - final HRegionInfo hri, byte[] splitPoint) throws IOException { + public static void split(final RpcController controller, + final AdminService.BlockingInterface admin, final HRegionInfo hri, byte[] splitPoint) + throws IOException { SplitRegionRequest request = RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint); try { - admin.splitRegion(null, request); + admin.splitRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1888,13 +1911,14 @@ public final class ProtobufUtil { * two adjacent regions * @throws IOException */ - public static void mergeRegions(final AdminService.BlockingInterface admin, + public static void mergeRegions(final RpcController controller, + final AdminService.BlockingInterface admin, final HRegionInfo region_a, final HRegionInfo region_b, final boolean forcible) throws IOException { MergeRegionsRequest request = RequestConverter.buildMergeRegionsRequest( region_a.getRegionName(), region_b.getRegionName(),forcible); try { - admin.mergeRegions(null, request); + admin.mergeRegions(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -2167,8 +2191,9 @@ public final class ProtobufUtil { * @param actions the permissions to be granted * @throws ServiceException */ - public static void grant(AccessControlService.BlockingInterface protocol, - String userShortName, Permission.Action... actions) throws ServiceException { + public static void grant(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, + Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2177,7 +2202,7 @@ public final class ProtobufUtil { AccessControlProtos.GrantRequest request = RequestConverter. buildGrantRequest(userShortName, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.grant(null, request); + protocol.grant(controller, request); } /** @@ -2194,9 +2219,9 @@ public final class ProtobufUtil { * @param actions the permissions to be granted * @throws ServiceException */ - public static void grant(AccessControlService.BlockingInterface protocol, - String userShortName, TableName tableName, byte[] f, byte[] q, - Permission.Action... actions) throws ServiceException { + public static void grant(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, TableName tableName, + byte[] f, byte[] q, Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2205,7 +2230,7 @@ public final class ProtobufUtil { AccessControlProtos.GrantRequest request = RequestConverter. buildGrantRequest(userShortName, tableName, f, q, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.grant(null, request); + protocol.grant(controller, request); } /** @@ -2218,8 +2243,8 @@ public final class ProtobufUtil { * @param actions the permissions to be granted * @throws ServiceException */ - public static void grant(AccessControlService.BlockingInterface protocol, - String userShortName, String namespace, + public static void grant(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, String namespace, Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); @@ -2229,7 +2254,7 @@ public final class ProtobufUtil { AccessControlProtos.GrantRequest request = RequestConverter. buildGrantRequest(userShortName, namespace, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.grant(null, request); + protocol.grant(controller, request); } /** @@ -2242,8 +2267,9 @@ public final class ProtobufUtil { * @param actions the permissions to be revoked * @throws ServiceException */ - public static void revoke(AccessControlService.BlockingInterface protocol, - String userShortName, Permission.Action... actions) throws ServiceException { + public static void revoke(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, + Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2252,7 +2278,7 @@ public final class ProtobufUtil { AccessControlProtos.RevokeRequest request = RequestConverter. buildRevokeRequest(userShortName, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.revoke(null, request); + protocol.revoke(controller, request); } /** @@ -2269,9 +2295,9 @@ public final class ProtobufUtil { * @param actions the permissions to be revoked * @throws ServiceException */ - public static void revoke(AccessControlService.BlockingInterface protocol, - String userShortName, TableName tableName, byte[] f, byte[] q, - Permission.Action... actions) throws ServiceException { + public static void revoke(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, TableName tableName, + byte[] f, byte[] q, Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2280,7 +2306,7 @@ public final class ProtobufUtil { AccessControlProtos.RevokeRequest request = RequestConverter. buildRevokeRequest(userShortName, tableName, f, q, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.revoke(null, request); + protocol.revoke(controller, request); } /** @@ -2294,8 +2320,8 @@ public final class ProtobufUtil { * @param actions the permissions to be revoked * @throws ServiceException */ - public static void revoke(AccessControlService.BlockingInterface protocol, - String userShortName, String namespace, + public static void revoke(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, String namespace, Permission.Action... actions) throws ServiceException { List<AccessControlProtos.Permission.Action> permActions = Lists.newArrayListWithCapacity(actions.length); @@ -2305,7 +2331,7 @@ public final class ProtobufUtil { AccessControlProtos.RevokeRequest request = RequestConverter. buildRevokeRequest(userShortName, namespace, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.revoke(null, request); + protocol.revoke(controller, request); } /** @@ -2316,14 +2342,14 @@ public final class ProtobufUtil { * @param protocol the AccessControlService protocol proxy * @throws ServiceException */ - public static List<UserPermission> getUserPermissions( + public static List<UserPermission> getUserPermissions(RpcController controller, AccessControlService.BlockingInterface protocol) throws ServiceException { AccessControlProtos.GetUserPermissionsRequest.Builder builder = AccessControlProtos.GetUserPermissionsRequest.newBuilder(); builder.setType(AccessControlProtos.Permission.Type.Global); AccessControlProtos.GetUserPermissionsRequest request = builder.build(); AccessControlProtos.GetUserPermissionsResponse response = - protocol.getUserPermissions(null, request); + protocol.getUserPermissions(controller, request); List<UserPermission> perms = new ArrayList<UserPermission>(response.getUserPermissionCount()); for (AccessControlProtos.UserPermission perm: response.getUserPermissionList()) { perms.add(ProtobufUtil.toUserPermission(perm)); @@ -2340,7 +2366,7 @@ public final class ProtobufUtil { * @param t optional table name * @throws ServiceException */ - public static List<UserPermission> getUserPermissions( + public static List<UserPermission> getUserPermissions(RpcController controller, AccessControlService.BlockingInterface protocol, TableName t) throws ServiceException { AccessControlProtos.GetUserPermissionsRequest.Builder builder = @@ -2351,7 +2377,7 @@ public final class ProtobufUtil { builder.setType(AccessControlProtos.Permission.Type.Table); AccessControlProtos.GetUserPermissionsRequest request = builder.build(); AccessControlProtos.GetUserPermissionsResponse response = - protocol.getUserPermissions(null, request); + protocol.getUserPermissions(controller, request); List<UserPermission> perms = new ArrayList<UserPermission>(response.getUserPermissionCount()); for (AccessControlProtos.UserPermission perm: response.getUserPermissionList()) { perms.add(ProtobufUtil.toUserPermission(perm)); @@ -2368,7 +2394,7 @@ public final class ProtobufUtil { * @param namespace name of the namespace * @throws ServiceException */ - public static List<UserPermission> getUserPermissions( + public static List<UserPermission> getUserPermissions(RpcController controller, AccessControlService.BlockingInterface protocol, byte[] namespace) throws ServiceException { AccessControlProtos.GetUserPermissionsRequest.Builder builder = @@ -2379,7 +2405,7 @@ public final class ProtobufUtil { builder.setType(AccessControlProtos.Permission.Type.Namespace); AccessControlProtos.GetUserPermissionsRequest request = builder.build(); AccessControlProtos.GetUserPermissionsResponse response = - protocol.getUserPermissions(null, request); + protocol.getUserPermissions(controller, request); List<UserPermission> perms = new ArrayList<UserPermission>(response.getUserPermissionCount()); for (AccessControlProtos.UserPermission perm: response.getUserPermissionList()) { perms.add(ProtobufUtil.toUserPermission(perm)); http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java index d80e3ae..8be0b22 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java @@ -32,11 +32,13 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService.BlockingInterface; @@ -94,9 +96,12 @@ public class AccessControlClient { public static void grant(final Connection connection, final TableName tableName, final String userName, final byte[] family, final byte[] qual, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + controller.setPriority(tableName); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.grant(getAccessControlServiceStub(table), userName, tableName, family, qual, - actions); + ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, tableName, + family, qual, actions); } } @@ -110,8 +115,12 @@ public class AccessControlClient { */ public static void grant(final Connection connection, final String namespace, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.grant(getAccessControlServiceStub(table), userName, namespace, actions); + ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, namespace, + actions); } } @@ -121,8 +130,10 @@ public class AccessControlClient { */ public static void grant(final Connection connection, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.grant(getAccessControlServiceStub(table), userName, actions); + ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, actions); } } @@ -146,9 +157,12 @@ public class AccessControlClient { public static void revoke(final Connection connection, final TableName tableName, final String username, final byte[] family, final byte[] qualifier, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + controller.setPriority(tableName); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.revoke(getAccessControlServiceStub(table), username, tableName, family, - qualifier, actions); + ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), username, tableName, + family, qualifier, actions); } } @@ -162,8 +176,11 @@ public class AccessControlClient { */ public static void revoke(final Connection connection, final String namespace, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.revoke(getAccessControlServiceStub(table), userName, namespace, actions); + ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, namespace, + actions); } } @@ -173,10 +190,11 @@ public class AccessControlClient { */ public static void revoke(final Connection connection, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.revoke(getAccessControlServiceStub(table), userName, actions); + ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, actions); } - } /** @@ -188,6 +206,8 @@ public class AccessControlClient { */ public static List<UserPermission> getUserPermissions(Connection connection, String tableRegex) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); List<UserPermission> permList = new ArrayList<UserPermission>(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { try (Admin admin = connection.getAdmin()) { @@ -196,14 +216,16 @@ public class AccessControlClient { AccessControlProtos.AccessControlService.newBlockingStub(service); HTableDescriptor[] htds = null; if (tableRegex == null || tableRegex.isEmpty()) { - permList = ProtobufUtil.getUserPermissions(protocol); + permList = ProtobufUtil.getUserPermissions(controller, protocol); } else if (tableRegex.charAt(0) == '@') { String namespace = tableRegex.substring(1); - permList = ProtobufUtil.getUserPermissions(protocol, Bytes.toBytes(namespace)); + permList = ProtobufUtil.getUserPermissions(controller, protocol, + Bytes.toBytes(namespace)); } else { htds = admin.listTables(Pattern.compile(tableRegex), true); for (HTableDescriptor hd : htds) { - permList.addAll(ProtobufUtil.getUserPermissions(protocol, hd.getTableName())); + permList.addAll(ProtobufUtil.getUserPermissions(controller, protocol, + hd.getTableName())); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index 0975c14..eebbe24 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -35,11 +35,14 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.ipc.FailedServerException; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -287,7 +290,7 @@ public class MetaTableLocator { } catch (RegionServerStoppedException e) { // Pass -- server name sends us to a server that is dying or already dead. } - return (service != null) && verifyRegionLocation(service, + return (service != null) && verifyRegionLocation(hConnection, service, getMetaRegionLocation(zkw, replicaId), RegionReplicaUtil.getRegionInfoForReplica( HRegionInfo.FIRST_META_REGIONINFO, replicaId).getRegionName()); } @@ -307,17 +310,22 @@ public class MetaTableLocator { // rather than have to pass it in. Its made awkward by the fact that the // HRI is likely a proxy against remote server so the getServerName needs // to be fixed to go to a local method or to a cache before we can do this. - private boolean verifyRegionLocation(AdminService.BlockingInterface hostingServer, - final ServerName address, final byte [] regionName) + private boolean verifyRegionLocation(final Connection connection, + AdminService.BlockingInterface hostingServer, final ServerName address, + final byte [] regionName) throws IOException { if (hostingServer == null) { LOG.info("Passed hostingServer is null"); return false; } Throwable t; + PayloadCarryingRpcController controller = null; + if (connection instanceof ClusterConnection) { + controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + } try { // Try and get regioninfo from the hosting server. - return ProtobufUtil.getRegionInfo(hostingServer, regionName) != null; + return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null; } catch (ConnectException e) { t = e; } catch (RetriesExhaustedException e) { @@ -617,4 +625,4 @@ public class MetaTableLocator { stopped = true; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java index 30060b2..8aa8007 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; @@ -77,26 +79,34 @@ public class TestSnapshotFromAdmin { // setup the conf to match the expected properties conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); conf.setLong("hbase.client.pause", pauseTime); + // mock the master admin to our mock MasterKeepAliveConnection mockMaster = Mockito.mock(MasterKeepAliveConnection.class); Mockito.when(mockConnection.getConfiguration()).thenReturn(conf); Mockito.when(mockConnection.getKeepAliveMasterService()).thenReturn(mockMaster); + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(controllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); + Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); // set the max wait time for the snapshot to complete SnapshotResponse response = SnapshotResponse.newBuilder() .setExpectedTimeout(maxWaitTime) .build(); Mockito - .when( - mockMaster.snapshot((RpcController) Mockito.isNull(), - Mockito.any(SnapshotRequest.class))).thenReturn(response); + .when( + mockMaster.snapshot((RpcController) Mockito.any(), + Mockito.any(SnapshotRequest.class))).thenReturn(response); // setup the response IsSnapshotDoneResponse.Builder builder = IsSnapshotDoneResponse.newBuilder(); builder.setDone(false); // first five times, we return false, last we get success Mockito.when( - mockMaster.isSnapshotDone((RpcController) Mockito.isNull(), + mockMaster.isSnapshotDone((RpcController) Mockito.any(), Mockito.any(IsSnapshotDoneRequest.class))).thenReturn(builder.build(), builder.build(), - builder.build(), builder.build(), builder.build(), builder.setDone(true).build()); + builder.build(), builder.build(), builder.build(), builder.setDone(true).build()); // setup the admin and run the test Admin admin = new HBaseAdmin(mockConnection); @@ -122,6 +132,13 @@ public class TestSnapshotFromAdmin { .mock(ConnectionManager.HConnectionImplementation.class); Configuration conf = HBaseConfiguration.create(); Mockito.when(mockConnection.getConfiguration()).thenReturn(conf); + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(controllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); + Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); Admin admin = new HBaseAdmin(mockConnection); SnapshotDescription.Builder builder = SnapshotDescription.newBuilder(); // check that invalid snapshot names fail @@ -141,11 +158,11 @@ public class TestSnapshotFromAdmin { Mockito.when(mockConnection.getKeepAliveMasterService()).thenReturn(master); SnapshotResponse response = SnapshotResponse.newBuilder().setExpectedTimeout(0).build(); Mockito.when( - master.snapshot((RpcController) Mockito.isNull(), Mockito.any(SnapshotRequest.class))) + master.snapshot((RpcController) Mockito.any(), Mockito.any(SnapshotRequest.class))) .thenReturn(response); IsSnapshotDoneResponse doneResponse = IsSnapshotDoneResponse.newBuilder().setDone(true).build(); Mockito.when( - master.isSnapshotDone((RpcController) Mockito.isNull(), + master.isSnapshotDone((RpcController) Mockito.any(), Mockito.any(IsSnapshotDoneRequest.class))).thenReturn(doneResponse); // make sure that we can use valid names http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java index 62f5c66..2a5ae55 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -279,7 +279,7 @@ public class DistributedHBaseCluster extends HBaseCluster { AdminProtos.AdminService.BlockingInterface client = ((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName()); - ServerInfo info = ProtobufUtil.getServerInfo(client); + ServerInfo info = ProtobufUtil.getServerInfo(null, client); return ProtobufUtil.toServerName(info.getServerName()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon index 158a239..b98f50d 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -42,7 +42,7 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; <%java return; %> </%if> <%java> - ServerInfo serverInfo = ProtobufUtil.getServerInfo(regionServer.getRSRpcServices()); + ServerInfo serverInfo = ProtobufUtil.getServerInfo(null, regionServer.getRSRpcServices()); ServerName serverName = ProtobufUtil.toServerName(serverInfo.getServerName()); List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(regionServer.getRSRpcServices()); MasterAddressTracker masterAddressTracker = regionServer.getMasterAddressTracker(); http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 64a75b9..a9cf0f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -22,7 +22,9 @@ import java.nio.channels.ClosedChannelException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.util.Pair; @@ -38,7 +40,8 @@ import com.google.protobuf.Message; * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained * RpcServer.Call */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving public class CallRunner { private static final Log LOG = LogFactory.getLog(CallRunner.class); http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index fffe7f3..90bcb47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -32,7 +32,7 @@ import java.net.InetSocketAddress; public abstract class RpcScheduler { /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ - static abstract class Context { + public static abstract class Context { public abstract InetSocketAddress getListenerAddress(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 51de6af..2386ef1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -284,7 +284,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. */ - class Call implements RpcCallContext { + @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) + @InterfaceStability.Evolving + public class Call implements RpcCallContext { protected int id; // the client's call id protected BlockingService service; protected MethodDescriptor md; @@ -363,6 +365,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.header; } + public boolean hasPriority() { + return this.header.hasPriority(); + } + + public int getPriority() { + return this.header.getPriority(); + } + /* * Short string representation without param info because param itself could be huge depends on * the payload of a command http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java index 196320d..d31711e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java @@ -695,7 +695,7 @@ public class RegionPlacementMaintainer { UpdateFavoredNodesResponse updateFavoredNodesResponse = currentRegionServer.updateFavoredNodes(null, request); LOG.info("Region server " + - ProtobufUtil.getServerInfo(currentRegionServer).getServerName() + + ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() + " has updated " + updateFavoredNodesResponse.getResponse() + " / " + singleServerPlan.getAssignmentMap().size() + " regions with the assignment plan"); http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index af6339c..c3c3693 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -161,6 +163,7 @@ public class ServerManager { private final long warningSkew; private final RetryCounterFactory pingRetryCounterFactory; + private final RpcControllerFactory rpcControllerFactory; /** * Set of region servers which are dead but not processed immediately. If one @@ -225,6 +228,9 @@ public class ServerManager { int pingSleepInterval = Math.max(1, master.getConfiguration().getInt( "hbase.master.ping.server.retry.sleep.interval", 100)); this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval); + this.rpcControllerFactory = this.connection == null + ? null + : connection.getRpcControllerFactory(); } /** @@ -792,6 +798,10 @@ public class ServerManager { } } + private PayloadCarryingRpcController newRpcController() { + return rpcControllerFactory == null ? null : rpcControllerFactory.newController(); + } + /** * Sends an CLOSE RPC to the specified server to close the specified region. * <p> @@ -816,7 +826,8 @@ public class ServerManager { region.getRegionNameAsString() + " failed because no RPC connection found to this server"); } - return ProtobufUtil.closeRegion(admin, server, region.getRegionName(), + PayloadCarryingRpcController controller = newRpcController(); + return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(), versionOfClosingNode, dest, transitionInZK); } @@ -838,7 +849,8 @@ public class ServerManager { if (server == null) return; try { AdminService.BlockingInterface admin = getRsAdmin(server); - ProtobufUtil.warmupRegion(admin, region); + PayloadCarryingRpcController controller = newRpcController(); + ProtobufUtil.warmupRegion(controller, admin, region); } catch (IOException e) { LOG.error("Received exception in RPC for warmup server:" + server + "region: " + region + @@ -850,11 +862,12 @@ public class ServerManager { * Contacts a region server and waits up to timeout ms * to close the region. This bypasses the active hmaster. */ - public static void closeRegionSilentlyAndWait(ClusterConnection connection, + public static void closeRegionSilentlyAndWait(ClusterConnection connection, ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException { AdminService.BlockingInterface rs = connection.getAdmin(server); + PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController(); try { - ProtobufUtil.closeRegion(rs, server, region.getRegionName(), false); + ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName(), false); } catch (IOException e) { LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e); } @@ -862,12 +875,13 @@ public class ServerManager { while (System.currentTimeMillis() < expiration) { try { HRegionInfo rsRegion = - ProtobufUtil.getRegionInfo(rs, region.getRegionName()); + ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName()); if (rsRegion == null) return; } catch (IOException ioe) { if (ioe instanceof NotServingRegionException) // no need to retry again return; - LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(), ioe); + LOG.warn("Exception when retrieving regioninfo from: " + + region.getRegionNameAsString(), ioe); } Thread.sleep(1000); } @@ -902,7 +916,8 @@ public class ServerManager { + region_b.getRegionNameAsString() + " failed because no RPC connection found to this server"); } - ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible); + PayloadCarryingRpcController controller = newRpcController(); + ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible); } /** @@ -911,6 +926,7 @@ public class ServerManager { public boolean isServerReachable(ServerName server) { if (server == null) throw new NullPointerException("Passed server is null"); + RetryCounter retryCounter = pingRetryCounterFactory.create(); while (retryCounter.shouldRetry()) { synchronized (this.onlineServers) { @@ -919,9 +935,10 @@ public class ServerManager { } } try { + PayloadCarryingRpcController controller = newRpcController(); AdminService.BlockingInterface admin = getRsAdmin(server); if (admin != null) { - ServerInfo info = ProtobufUtil.getServerInfo(admin); + ServerInfo info = ProtobufUtil.getServerInfo(controller, admin); return info != null && info.hasServerName() && server.getStartcode() == info.getServerName().getStartCode(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index c2d273b..68f4c13 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -694,8 +694,8 @@ public class MiniHBaseCluster extends HBaseCluster { int count = 0; for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) { HRegionServer hrs = rst.getRegionServer(); - Region metaRegion = hrs.getOnlineRegion(regionName); - if (metaRegion != null) { + Region region = hrs.getOnlineRegion(regionName); + if (region != null) { index = count; break; } http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java index 21d5564..28a5786 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java @@ -54,11 +54,11 @@ public class TestGlobalMemStoreSize { private HBaseTestingUtility TEST_UTIL; private MiniHBaseCluster cluster; - + /** * Test the global mem store size in the region server is equal to sum of each * region's mem store size - * @throws Exception + * @throws Exception */ @Test public void testGlobalMemStore() throws Exception { @@ -86,8 +86,8 @@ public class TestGlobalMemStoreSize { for (HRegionServer server : getOnlineRegionServers()) { long globalMemStoreSize = 0; for (HRegionInfo regionInfo : - ProtobufUtil.getOnlineRegions(server.getRSRpcServices())) { - globalMemStoreSize += + ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) { + globalMemStoreSize += server.getFromOnlineRegions(regionInfo.getEncodedName()). getMemstoreSize(); } @@ -102,7 +102,7 @@ public class TestGlobalMemStoreSize { ", size=" + server.getRegionServerAccounting().getGlobalMemstoreSize()); for (HRegionInfo regionInfo : - ProtobufUtil.getOnlineRegions(server.getRSRpcServices())) { + ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) { Region r = server.getFromOnlineRegions(regionInfo.getEncodedName()); flush(r, server); } @@ -118,7 +118,7 @@ public class TestGlobalMemStoreSize { // If size > 0, see if its because the meta region got edits while // our test was running.... for (HRegionInfo regionInfo : - ProtobufUtil.getOnlineRegions(server.getRSRpcServices())) { + ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) { Region r = server.getFromOnlineRegions(regionInfo.getEncodedName()); long l = r.getMemstoreSize(); if (l > 0) { @@ -153,7 +153,7 @@ public class TestGlobalMemStoreSize { private List<HRegionServer> getOnlineRegionServers() { List<HRegionServer> list = new ArrayList<HRegionServer>(); - for (JVMClusterUtil.RegionServerThread rst : + for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) { if (rst.getRegionServer().isOnline()) { list.add(rst.getRegionServer()); http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index 25bfde0..3955a4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -39,6 +39,13 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.ipc.CallRunner; +import org.apache.hadoop.hbase.ipc.DelegatingRpcScheduler; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -565,5 +572,77 @@ public class TestMetaTableAccessor { meta.close(); } } + + public static class SpyingRpcSchedulerFactory extends SimpleRpcSchedulerFactory { + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { + final RpcScheduler delegate = super.create(conf, priority, server); + return new SpyingRpcScheduler(delegate); + } + } + + public static class SpyingRpcScheduler extends DelegatingRpcScheduler { + long numPriorityCalls = 0; + + public SpyingRpcScheduler(RpcScheduler delegate) { + super(delegate); + } + + @Override + public boolean dispatch(CallRunner task) throws IOException, InterruptedException { + int priority = task.getCall().getPriority(); + + if (priority > HConstants.QOS_THRESHOLD) { + numPriorityCalls++; + } + return super.dispatch(task); + } + } + + @Test + public void testMetaUpdatesGoToPriorityQueue() throws Exception { + // This test has to be end-to-end, and do the verification from the server side + Configuration c = UTIL.getConfiguration(); + + c.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SpyingRpcSchedulerFactory.class.getName()); + + // restart so that new config takes place + afterClass(); + beforeClass(); + + TableName tableName = TableName.valueOf("foo"); + try (Admin admin = connection.getAdmin(); + RegionLocator rl = connection.getRegionLocator(tableName)) { + + // create a table and prepare for a manual split + UTIL.createTable(tableName, "cf1"); + + HRegionLocation loc = rl.getAllRegionLocations().get(0); + HRegionInfo parent = loc.getRegionInfo(); + long rid = 1000; + byte[] splitKey = Bytes.toBytes("a"); + HRegionInfo splitA = new HRegionInfo(parent.getTable(), parent.getStartKey(), + splitKey, false, rid); + HRegionInfo splitB = new HRegionInfo(parent.getTable(), splitKey, + parent.getEndKey(), false, rid); + + // find the meta server + MiniHBaseCluster cluster = UTIL.getMiniHBaseCluster(); + int rsIndex = cluster.getServerWithMeta(); + HRegionServer rs; + if (rsIndex >= 0) { + rs = cluster.getRegionServer(rsIndex); + } else { + // it is in master + rs = cluster.getMaster(); + } + SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler(); + long prevCalls = scheduler.numPriorityCalls; + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1); + + assertTrue(prevCalls < scheduler.numPriorityCalls); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java index e2bdc7b..de80a7b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; @@ -179,7 +181,7 @@ public class TestMetaTableLocator { // Mock an ClientProtocol. final ClientProtos.ClientService.BlockingInterface implementation = Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); - + ClusterConnection connection = mockConnection(null, implementation); // If a 'get' is called on mocked interface, throw connection refused. @@ -249,6 +251,10 @@ public class TestMetaTableLocator { (GetRegionInfoRequest)Mockito.any())).thenThrow(connectException); Mockito.when(connection.getAdmin(Mockito.any(ServerName.class))). thenReturn(implementation); + RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(controllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory); ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis()); MetaTableLocator.setMetaLocation(this.watcher, http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 92a4afe..306dfee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -63,6 +63,11 @@ public class HConnectionTestingUtility { if (connection == null) { connection = Mockito.mock(HConnectionImplementation.class); Mockito.when(connection.getConfiguration()).thenReturn(conf); + Mockito.when(connection.getRpcControllerFactory()).thenReturn( + Mockito.mock(RpcControllerFactory.class)); + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection); } return connection; @@ -210,4 +215,4 @@ public class HConnectionTestingUtility { return ConnectionManager.CONNECTION_INSTANCES.size(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java index 6b68b5e..2695ec4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java @@ -1256,7 +1256,7 @@ public class TestAdmin1 { try { AdminService.BlockingInterface admin = TEST_UTIL.getHBaseAdmin().getConnection() .getAdmin(regions.get(1).getSecond()); - ProtobufUtil.mergeRegions(admin, regions.get(1).getFirst(), regions.get(2).getFirst(), true); + ProtobufUtil.mergeRegions(null, admin, regions.get(1).getFirst(), regions.get(2).getFirst(), true); } catch (MergeRegionException mm) { gotException = true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 28c354f..d942b63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -696,7 +696,7 @@ public class TestFromClientSide { public void testMaxKeyValueSize() throws Exception { byte [] TABLE = Bytes.toBytes("testMaxKeyValueSize"); Configuration conf = TEST_UTIL.getConfiguration(); - String oldMaxSize = conf.get(TableConfiguration.MAX_KEYVALUE_SIZE_KEY); + String oldMaxSize = conf.get(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY); Table ht = TEST_UTIL.createTable(TABLE, FAMILY); byte[] value = new byte[4 * 1024 * 1024]; Put put = new Put(ROW); @@ -704,7 +704,7 @@ public class TestFromClientSide { ht.put(put); try { TEST_UTIL.getConfiguration().setInt( - TableConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024); + ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024); // Create new table so we pick up the change in Configuration. try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { @@ -716,7 +716,7 @@ public class TestFromClientSide { } fail("Inserting a too large KeyValue worked, should throw exception"); } catch(Exception e) {} - conf.set(TableConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize); + conf.set(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize); } @Test http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java index c4a7ef8..b788e35 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest; @@ -309,6 +311,14 @@ public class TestHBaseAdminNoCluster { } }); Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin); + RpcControllerFactory rpcControllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(connection.getRpcControllerFactory()).thenReturn(rpcControllerFactory); + Mockito.when(rpcControllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration); + Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); Admin admin = null; try { http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 10a8f12..ca8273e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -287,7 +287,7 @@ public class TestScannersFromClientSide { private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, int expectedCellCount) throws Exception { ResultScanner scanner = table.getScanner(scan); - + int rowCount = 0; int cellCount = 0; Result r = null; @@ -607,7 +607,7 @@ public class TestScannersFromClientSide { byte[] regionName = hri.getRegionName(); int i = cluster.getServerWith(regionName); HRegionServer rs = cluster.getRegionServer(i); - ProtobufUtil.closeRegion( + ProtobufUtil.closeRegion(null, rs.getRSRpcServices(), rs.getServerName(), regionName, false); long startTime = EnvironmentEdgeManager.currentTime(); long timeOut = 300000; @@ -630,7 +630,7 @@ public class TestScannersFromClientSide { if (ConfigUtil.useZKForAssignment(TEST_UTIL.getConfiguration())) { ZKAssign.createNodeOffline(zkw, hri, loc.getServerName()); } - ProtobufUtil.openRegion(rs.getRSRpcServices(), rs.getServerName(), hri); + ProtobufUtil.openRegion(null, rs.getRSRpcServices(), rs.getServerName(), hri); startTime = EnvironmentEdgeManager.currentTime(); while (true) { if (rs.getOnlineRegion(regionName) != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java new file mode 100644 index 0000000..7c6fa26 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +public class DelegatingRpcScheduler extends RpcScheduler { + protected RpcScheduler delegate; + + public DelegatingRpcScheduler(RpcScheduler delegate) { + this.delegate = delegate; + } + + @Override + public void stop() { + delegate.stop(); + } + @Override + public void start() { + delegate.start(); + } + @Override + public void init(Context context) { + delegate.init(context); + } + @Override + public int getReplicationQueueLength() { + return delegate.getReplicationQueueLength(); + } + + @Override + public int getPriorityQueueLength() { + return delegate.getPriorityQueueLength(); + } + + @Override + public int getGeneralQueueLength() { + return delegate.getGeneralQueueLength(); + } + + @Override + public int getActiveRpcHandlerCount() { + return delegate.getActiveRpcHandlerCount(); + } + + @Override + public boolean dispatch(CallRunner task) throws IOException, InterruptedException { + return delegate.dispatch(task); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 5eb1d57..e3024d0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -193,7 +193,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { if (hri.getTable().equals(table)) { // splitRegion doesn't work if startkey/endkey are null - ProtobufUtil.split(hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); // hard code split + ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); } } @@ -469,9 +469,10 @@ public class TestLoadIncrementalHFilesSplitRecovery { final AtomicInteger countedLqis = new AtomicInteger(); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { + @Override protected List<LoadQueueItem> groupOrSplit( Multimap<ByteBuffer, LoadQueueItem> regionGroups, - final LoadQueueItem item, final HTable htable, + final LoadQueueItem item, final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); if (lqis != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java index 963f36e..00f3dc2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.master; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.net.InetAddress; @@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; http://git-wip-us.apache.org/repos/asf/hbase/blob/6a80087f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java index edd7b2d..de157b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java @@ -349,7 +349,7 @@ public class TestMasterFailover { region = enabledRegions.remove(0); regionsThatShouldBeOnline.add(region); ZKAssign.createNodeOffline(zkw, region, serverName); - ProtobufUtil.openRegion(hrs.getRSRpcServices(), hrs.getServerName(), region); + ProtobufUtil.openRegion(null, hrs.getRSRpcServices(), hrs.getServerName(), region); while (true) { byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); RegionTransition rt = RegionTransition.parseFrom(bytes); @@ -364,7 +364,7 @@ public class TestMasterFailover { region = disabledRegions.remove(0); regionsThatShouldBeOffline.add(region); ZKAssign.createNodeOffline(zkw, region, serverName); - ProtobufUtil.openRegion(hrs.getRSRpcServices(), hrs.getServerName(), region); + ProtobufUtil.openRegion(null, hrs.getRSRpcServices(), hrs.getServerName(), region); while (true) { byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); RegionTransition rt = RegionTransition.parseFrom(bytes); @@ -752,7 +752,7 @@ public class TestMasterFailover { region = enabledRegions.remove(0); regionsThatShouldBeOnline.add(region); ZKAssign.createNodeOffline(zkw, region, deadServerName); - ProtobufUtil.openRegion(hrsDead.getRSRpcServices(), + ProtobufUtil.openRegion(null, hrsDead.getRSRpcServices(), hrsDead.getServerName(), region); while (true) { byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); @@ -768,7 +768,7 @@ public class TestMasterFailover { region = disabledRegions.remove(0); regionsThatShouldBeOffline.add(region); ZKAssign.createNodeOffline(zkw, region, deadServerName); - ProtobufUtil.openRegion(hrsDead.getRSRpcServices(), + ProtobufUtil.openRegion(null, hrsDead.getRSRpcServices(), hrsDead.getServerName(), region); while (true) { byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); @@ -788,7 +788,7 @@ public class TestMasterFailover { region = enabledRegions.remove(0); regionsThatShouldBeOnline.add(region); ZKAssign.createNodeOffline(zkw, region, deadServerName); - ProtobufUtil.openRegion(hrsDead.getRSRpcServices(), + ProtobufUtil.openRegion(null, hrsDead.getRSRpcServices(), hrsDead.getServerName(), region); while (true) { byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); @@ -807,7 +807,7 @@ public class TestMasterFailover { region = disabledRegions.remove(0); regionsThatShouldBeOffline.add(region); ZKAssign.createNodeOffline(zkw, region, deadServerName); - ProtobufUtil.openRegion(hrsDead.getRSRpcServices(), + ProtobufUtil.openRegion(null, hrsDead.getRSRpcServices(), hrsDead.getServerName(), region); while (true) { byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName());