HBASE-16584 Backport the new ipc implementation in HBASE-16432 to branch-1
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/094e9a31 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/094e9a31 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/094e9a31 Branch: refs/heads/branch-1 Commit: 094e9a311bec55d0c198bb483b4b1d994c9428e4 Parents: d7666b6 Author: zhangduo <[email protected]> Authored: Fri Mar 10 13:35:02 2017 +0800 Committer: zhangduo <[email protected]> Committed: Thu Mar 16 23:00:30 2017 +0800 ---------------------------------------------------------------------- .../hbase/client/FlushRegionCallable.java | 8 +- .../apache/hadoop/hbase/client/HBaseAdmin.java | 149 +- .../org/apache/hadoop/hbase/client/HTable.java | 24 +- .../hbase/client/MultiServerCallable.java | 1 + .../client/PayloadCarryingServerCallable.java | 4 +- .../RpcRetryingCallerWithReadReplicas.java | 31 +- .../hadoop/hbase/client/ScannerCallable.java | 6 +- .../hadoop/hbase/ipc/AbstractRpcClient.java | 529 +++++-- .../org/apache/hadoop/hbase/ipc/AsyncCall.java | 141 -- .../hadoop/hbase/ipc/AsyncRpcChannel.java | 785 ----------- .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 499 ------- .../hbase/ipc/AsyncServerResponseHandler.java | 126 -- .../hadoop/hbase/ipc/BlockingRpcClient.java | 77 + .../hadoop/hbase/ipc/BlockingRpcConnection.java | 730 ++++++++++ .../hbase/ipc/BufferCallBeforeInitHandler.java | 105 ++ .../java/org/apache/hadoop/hbase/ipc/Call.java | 127 +- .../hbase/ipc/CallCancelledException.java | 37 + .../org/apache/hadoop/hbase/ipc/CallEvent.java | 40 + .../hadoop/hbase/ipc/CellBlockBuilder.java | 293 ++++ .../ipc/CellScannerButNoCodecException.java | 31 + .../hbase/ipc/DefaultNettyEventLoopConfig.java | 40 + .../hbase/ipc/DelegatingHBaseRpcController.java | 136 ++ .../DelegatingPayloadCarryingRpcController.java | 60 - .../hbase/ipc/FallbackDisallowedException.java | 38 + .../hadoop/hbase/ipc/HBaseRpcController.java | 108 ++ .../hbase/ipc/HBaseRpcControllerImpl.java | 244 ++++ .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 379 ++--- .../hbase/ipc/MasterCoprocessorRpcChannel.java | 12 +- .../apache/hadoop/hbase/ipc/NettyRpcClient.java | 82 ++ .../hbase/ipc/NettyRpcClientConfigHelper.java | 83 ++ .../hadoop/hbase/ipc/NettyRpcConnection.java | 293 ++++ .../hadoop/hbase/ipc/NettyRpcDuplexHandler.java | 250 ++++ .../hbase/ipc/PayloadCarryingRpcController.java | 107 -- .../hbase/ipc/RegionCoprocessorRpcChannel.java | 22 +- .../org/apache/hadoop/hbase/ipc/RpcClient.java | 61 +- .../hadoop/hbase/ipc/RpcClientFactory.java | 32 +- .../apache/hadoop/hbase/ipc/RpcClientImpl.java | 1326 ------------------ .../apache/hadoop/hbase/ipc/RpcConnection.java | 260 ++++ .../hadoop/hbase/ipc/RpcControllerFactory.java | 14 +- .../hbase/ipc/TimeLimitedRpcController.java | 142 -- .../security/AbstractHBaseSaslRpcClient.java | 197 +++ .../hbase/security/HBaseSaslRpcClient.java | 256 +--- .../hbase/security/NettyHBaseSaslRpcClient.java | 58 + .../NettyHBaseSaslRpcClientHandler.java | 142 ++ .../hbase/security/SaslChallengeDecoder.java | 112 ++ .../hbase/security/SaslClientHandler.java | 401 ------ .../hbase/security/SaslUnwrapHandler.java | 54 + .../apache/hadoop/hbase/security/SaslUtil.java | 88 +- .../hadoop/hbase/security/SaslWrapHandler.java | 99 ++ .../security/access/AccessControlClient.java | 16 +- .../hbase/zookeeper/MetaTableLocator.java | 8 +- .../hbase/client/TestSnapshotFromAdmin.java | 12 +- .../hadoop/hbase/ipc/TestCellBlockBuilder.java | 195 +++ .../hbase/ipc/TestHBaseRpcControllerImpl.java | 220 +++ .../apache/hadoop/hbase/ipc/TestIPCUtil.java | 184 +-- .../ipc/TestPayloadCarryingRpcController.java | 223 --- .../hbase/security/TestHBaseSaslRpcClient.java | 309 ++++ .../hbase/ipc/IntegrationTestRpcClient.java | 80 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 51 +- .../hadoop/hbase/master/ServerManager.java | 27 +- .../hbase/protobuf/ReplicationProtbufUtil.java | 9 +- .../hbase/regionserver/RSRpcServices.java | 30 +- .../regionserver/wal/WALEditsReplaySink.java | 10 +- .../RegionReplicaReplicationEndpoint.java | 34 +- .../hbase/security/HBaseSaslRpcServer.java | 13 +- .../hbase/TestMetaTableAccessorNoCluster.java | 10 +- .../hadoop/hbase/TestMetaTableLocator.java | 13 +- .../hadoop/hbase/client/TestClientTimeouts.java | 57 +- .../hbase/client/TestHBaseAdminNoCluster.java | 12 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 25 +- .../hbase/client/TestRpcControllerFactory.java | 20 +- .../coprocessor/ProtobufCoprocessorService.java | 38 +- .../hadoop/hbase/ipc/AbstractTestIPC.java | 498 +++---- .../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 306 ---- .../hadoop/hbase/ipc/TestBlockingIPC.java | 58 + .../hbase/ipc/TestGlobalEventLoopGroup.java | 53 - .../org/apache/hadoop/hbase/ipc/TestIPC.java | 170 --- .../apache/hadoop/hbase/ipc/TestNettyIPC.java | 128 ++ .../hadoop/hbase/ipc/TestProtoBufRpc.java | 93 +- .../hbase/ipc/TestProtobufRpcServiceImpl.java | 121 ++ .../hadoop/hbase/ipc/TestRpcClientLeaks.java | 33 +- .../hbase/ipc/TestRpcHandlerException.java | 143 +- .../ipc/protobuf/generated/TestProtos.java | 987 ++++++++++++- .../generated/TestRpcServiceProtos.java | 152 +- .../hadoop/hbase/master/MockRegionServer.java | 16 +- .../TestEndToEndSplitTransaction.java | 12 +- .../hbase/security/AbstractTestSecureIPC.java | 301 ---- .../hbase/security/TestAsyncSecureIPC.java | 33 - .../hbase/security/TestHBaseSaslRpcClient.java | 324 ----- .../hadoop/hbase/security/TestSecureIPC.java | 251 +++- .../TestDelegationTokenWithEncryption.java | 12 +- .../token/TestGenerateDelegationToken.java | 12 +- hbase-server/src/test/protobuf/test.proto | 8 + .../src/test/protobuf/test_rpc_service.proto | 2 + pom.xml | 2 +- 95 files changed, 7393 insertions(+), 6757 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java index 73bdb74..1460c1b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.client; +import com.google.protobuf.ServiceException; + import java.io.IOException; import org.apache.commons.logging.Log; @@ -25,7 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -34,8 +36,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRespons import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.protobuf.ServiceException; - /** * A Callable for flushRegion() RPC. */ @@ -95,7 +95,7 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker); try { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); return stub.flushRegion(controller, request); } catch (ServiceException se) { http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 5def9a4..6aed027 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -18,9 +18,12 @@ */ package org.apache.hadoop.hbase.client; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.io.InterruptedIOException; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -28,12 +31,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.commons.logging.Log; @@ -54,13 +57,11 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceNotFoundException; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ProcedureInfo; -import org.apache.hadoop.hbase.RegionException; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; -import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; @@ -72,8 +73,8 @@ import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -176,10 +177,6 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; -import com.google.protobuf.ServiceException; - /** * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that * this is an HBase-internal class as defined in @@ -347,7 +344,7 @@ public class HBaseAdmin implements Admin { new MasterCallable<AbortProcedureResponse>(getConnection()) { @Override public AbortProcedureResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder().setProcId(procId).build(); @@ -441,7 +438,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables); @@ -522,7 +519,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<TableName[]>(getConnection()) { @Override public TableName[] call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); GetTableNamesRequest req = RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables); @@ -560,7 +557,7 @@ public class HBaseAdmin implements Admin { HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) { @Override public HTableDescriptor call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); GetTableDescriptorsResponse htds; GetTableDescriptorsRequest req = @@ -758,7 +755,7 @@ public class HBaseAdmin implements Admin { new MasterCallable<CreateTableResponse>(getConnection()) { @Override public CreateTableResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(desc.getTableName()); CreateTableRequest request = RequestConverter.buildCreateTableRequest( @@ -932,7 +929,7 @@ public class HBaseAdmin implements Admin { new MasterCallable<DeleteTableResponse>(getConnection()) { @Override public DeleteTableResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(tableName); DeleteTableRequest req = @@ -1183,7 +1180,7 @@ public class HBaseAdmin implements Admin { new MasterCallable<EnableTableResponse>(getConnection()) { @Override public EnableTableResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(tableName); @@ -1375,7 +1372,7 @@ public class HBaseAdmin implements Admin { new MasterCallable<DisableTableResponse>(getConnection()) { @Override public DisableTableResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(tableName); @@ -1592,7 +1589,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) { @Override public Pair<Integer, Integer> call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(tableName); @@ -1664,7 +1661,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(tableName); AddColumnRequest req = RequestConverter.buildAddColumnRequest( @@ -1715,7 +1712,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(tableName); DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest( @@ -1766,7 +1763,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(tableName); ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest( @@ -1857,7 +1854,7 @@ public class HBaseAdmin implements Admin { CloseRegionRequest request = RequestConverter.buildCloseRegionRequest(sn, encodedRegionName, false); try { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); // TODO: this does not do retries, it should. Set priority and timeout in controller CloseRegionResponse response = admin.closeRegion(controller, request); @@ -1882,7 +1879,7 @@ public class HBaseAdmin implements Admin { public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); // Close the region without updating zk state. ProtobufUtil.closeRegion(controller, admin, sn, hri.getRegionName(), false); @@ -1894,7 +1891,7 @@ public class HBaseAdmin implements Admin { @Override public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); return ProtobufUtil.getOnlineRegions(controller, admin); } @@ -1954,7 +1951,7 @@ public class HBaseAdmin implements Admin { private void flush(final ServerName sn, final HRegionInfo hri) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); FlushRegionRequest request = RequestConverter.buildFlushRegionRequest(hri.getRegionName()); @@ -2211,7 +2208,7 @@ public class HBaseAdmin implements Admin { private void compact(final ServerName sn, final HRegionInfo hri, final boolean major, final byte [] family) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); @@ -2243,7 +2240,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); // Hard to know the table name, at least check if meta if (isMetaRegion(encodedRegionName)) { @@ -2282,7 +2279,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); // Hard to know the table name, at least check if meta if (isMetaRegion(regionName)) { @@ -2318,7 +2315,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); // Hard to know the table name, at least check if meta if (isMetaRegion(regionName)) { @@ -2350,7 +2347,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); // Hard to know the table name, at least check if meta if (isMetaRegion(regionName)) { @@ -2374,7 +2371,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); SetBalancerRunningRequest req = @@ -2395,7 +2392,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.balance(controller, @@ -2409,7 +2406,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.balance(controller, @@ -2429,7 +2426,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.isBalancerEnabled(controller, @@ -2448,7 +2445,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.normalize(controller, @@ -2466,7 +2463,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.isNormalizerEnabled(controller, @@ -2484,7 +2481,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); SetNormalizerRunningRequest req = @@ -2506,7 +2503,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.enableCatalogJanitor(controller, @@ -2525,7 +2522,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Integer>(getConnection()) { @Override public Integer call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.runCatalogScan(controller, @@ -2543,7 +2540,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.isCatalogJanitorEnabled(controller, @@ -2624,7 +2621,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); try { @@ -2763,7 +2760,7 @@ public class HBaseAdmin implements Admin { Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) { throw new IOException("should not give a splitkey which equals to startkey!"); } - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(hri.getTable()); // TODO: this does not do retries, it should. Set priority and timeout in controller @@ -2791,7 +2788,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(tableName); ModifyTableRequest request = RequestConverter.buildModifyTableRequest( @@ -2909,7 +2906,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(HConstants.HIGH_QOS); master.shutdown(controller, ShutdownRequest.newBuilder().build()); @@ -2929,7 +2926,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); controller.setPriority(HConstants.HIGH_QOS); master.stopMaster(controller, StopMasterRequest.newBuilder().build()); @@ -2953,7 +2950,7 @@ public class HBaseAdmin implements Admin { this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); StopServerRequest request = RequestConverter.buildStopServerRequest( "Called by admin client " + this.connection.toString()); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(HConstants.HIGH_QOS); try { @@ -2974,7 +2971,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection()) { @Override public IsInMaintenanceModeResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.isMasterInMaintenanceMode( controller, IsInMaintenanceModeRequest.newBuilder().build()); @@ -2987,7 +2984,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) { @Override public ClusterStatus call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(); return ClusterStatus.convert(master.getClusterStatus(controller, req).getClusterStatus()); @@ -3013,7 +3010,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); // TODO: set priority based on NS? master.createNamespace(controller, @@ -3036,7 +3033,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder(). setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); @@ -3055,7 +3052,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder(). setNamespaceName(name).build()); @@ -3077,7 +3074,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) { @Override public NamespaceDescriptor call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return ProtobufUtil.toNamespaceDescriptor( master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder(). @@ -3097,7 +3094,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) { @Override public NamespaceDescriptor[] call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); List<HBaseProtos.NamespaceDescriptor> list = master.listNamespaceDescriptors(controller, @@ -3123,7 +3120,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) { @Override public ProcedureInfo[] call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); List<ProcedureProtos.Procedure> procList = master.listProcedures( controller, ListProceduresRequest.newBuilder().build()).getProcedureList(); @@ -3148,7 +3145,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); List<TableSchema> list = master.listTableDescriptorsByNamespace(controller, @@ -3176,7 +3173,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<TableName[]>(getConnection()) { @Override public TableName[] call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); List<HBaseProtos.TableName> tableNames = master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest. @@ -3277,7 +3274,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(tableNames); @@ -3327,7 +3324,7 @@ public class HBaseAdmin implements Admin { FailedLogCloseException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); try { // TODO: this does not do retries, it should. Set priority and timeout in controller @@ -3476,7 +3473,7 @@ public class HBaseAdmin implements Admin { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( regionServerPair.getFirst().getRegionName(), true); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); // TODO: this does not do retries, it should. Set priority and timeout in controller GetRegionInfoResponse response = admin.getRegionInfo(controller, request); return response.getCompactionState(); @@ -3686,7 +3683,7 @@ public class HBaseAdmin implements Admin { done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) { @Override public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.isSnapshotDone(controller, request); } @@ -3718,7 +3715,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) { @Override public SnapshotResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.snapshot(controller, request); } @@ -3752,7 +3749,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) { @Override public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.isSnapshotDone(controller, IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build()); @@ -4009,7 +4006,7 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public ExecProcedureResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.execProcedureWithRet(controller, request); } @@ -4045,7 +4042,7 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public ExecProcedureResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.execProcedure(controller, request); } @@ -4111,7 +4108,7 @@ public class HBaseAdmin implements Admin { new MasterCallable<IsProcedureDoneResponse>(getConnection()) { @Override public IsProcedureDoneResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.isProcedureDone(controller, IsProcedureDoneRequest .newBuilder().setProcedure(desc).build()); @@ -4159,7 +4156,7 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public IsRestoreSnapshotDoneResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.isRestoreSnapshotDone(controller, request); } @@ -4191,7 +4188,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<RestoreSnapshotResponse>(getConnection()) { @Override public RestoreSnapshotResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.restoreSnapshot(controller, request); } @@ -4208,7 +4205,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) { @Override public List<SnapshotDescription> call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); return master.getCompletedSnapshots(controller, GetCompletedSnapshotsRequest.newBuilder().build()).getSnapshotsList(); @@ -4309,7 +4306,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder(). @@ -4353,7 +4350,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder() .setSnapshot(snapshot).build()); @@ -4406,7 +4403,7 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota)); return null; @@ -4550,7 +4547,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Long>(getConnection()) { @Override public Long call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); MajorCompactionTimestampRequest req = MajorCompactionTimestampRequest.newBuilder() @@ -4565,7 +4562,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Long>(getConnection()) { @Override public Long call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); MajorCompactionTimestampForRegionRequest req = MajorCompactionTimestampForRegionRequest @@ -4633,7 +4630,7 @@ public class HBaseAdmin implements Admin { admin.getConnection()) { @Override public AbortProcedureResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController(); + HBaseRpcController controller = admin.getRpcControllerFactory().newController(); controller.setCallTimeout(callTimeout); return master.abortProcedure(controller, request); } @@ -4847,7 +4844,7 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection()) { @Override public List<SecurityCapability> call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setCallTimeout(callTimeout); SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build(); return ProtobufUtil.toSecurityCapabilityList( http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 527dc72..d4fa2e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -61,7 +61,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -755,7 +755,7 @@ public class HTable implements HTableInterface, RegionLocator { tableName, row) { @Override public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest( @@ -851,7 +851,7 @@ public class HTable implements HTableInterface, RegionLocator { public Result call(int callTimeout) throws IOException { ClientProtos.GetRequest request = RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { @@ -976,7 +976,7 @@ public class HTable implements HTableInterface, RegionLocator { tableName, delete.getRow()) { @Override public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); @@ -1053,6 +1053,7 @@ public class HTable implements HTableInterface, RegionLocator { rpcControllerFactory) { @Override public MultiResponse call(int callTimeout) throws IOException { + controller.reset(); controller.setPriority(tableName); int remainingTime = tracker.getRemainingTime(callTimeout); if (remainingTime == 0) { @@ -1105,7 +1106,7 @@ public class HTable implements HTableInterface, RegionLocator { new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) { @Override public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { @@ -1138,7 +1139,7 @@ public class HTable implements HTableInterface, RegionLocator { getName(), increment.getRow()) { @Override public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { @@ -1206,7 +1207,7 @@ public class HTable implements HTableInterface, RegionLocator { new RegionServerCallable<Long>(connection, getName(), row) { @Override public Long call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { @@ -1238,7 +1239,7 @@ public class HTable implements HTableInterface, RegionLocator { new RegionServerCallable<Boolean>(connection, getName(), row) { @Override public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { @@ -1268,7 +1269,7 @@ public class HTable implements HTableInterface, RegionLocator { new RegionServerCallable<Boolean>(connection, getName(), row) { @Override public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { @@ -1299,7 +1300,7 @@ public class HTable implements HTableInterface, RegionLocator { new RegionServerCallable<Boolean>(connection, getName(), row) { @Override public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { @@ -1329,7 +1330,7 @@ public class HTable implements HTableInterface, RegionLocator { new RegionServerCallable<Boolean>(connection, getName(), row) { @Override public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { @@ -1361,6 +1362,7 @@ public class HTable implements HTableInterface, RegionLocator { rpcControllerFactory) { @Override public MultiResponse call(int callTimeout) throws IOException { + controller.reset(); controller.setPriority(tableName); int remainingTime = tracker.getRemainingTime(callTimeout); if (remainingTime == 0) { http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 115ba33..738ff6e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -127,6 +127,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse // Controller optionally carries cell data over the proxy/service boundary and also // optionally ferries cell response data back out again. + controller.reset(); if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells)); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java index d94f069..aa3d5c0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; /** @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @InterfaceAudience.Private public abstract class PayloadCarryingServerCallable<T> extends RegionServerCallable<T> implements Cancellable { - protected PayloadCarryingRpcController controller; + protected HBaseRpcController controller; public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, RpcControllerFactory rpcControllerFactory) { http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index d1c40ab..6630457 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -21,6 +21,18 @@ package org.apache.hadoop.hbase.client; +import com.google.protobuf.ServiceException; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -31,27 +43,13 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.protobuf.ServiceException; - - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - /** * Caller that goes to replica if the primary region does no answer within a configurable * timeout. If the timeout is reached, it calls all the secondary replicas, and returns @@ -98,7 +96,7 @@ public class RpcRetryingCallerWithReadReplicas { */ class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable { final int id; - private final PayloadCarryingRpcController controller; + private final HBaseRpcController controller; public ReplicaRegionServerCallable(int id, HRegionLocation location) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, @@ -155,6 +153,7 @@ public class RpcRetryingCallerWithReadReplicas { ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); + controller.reset(); controller.setCallTimeout(callTimeout); try { http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index fd884e3..ebac361 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.ScannerResetException; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -104,7 +104,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected boolean isRegionServerRemote = true; private long nextCallSeq = 0; protected RpcControllerFactory controllerFactory; - protected PayloadCarryingRpcController controller; + protected HBaseRpcController controller; /** * @param connection which connection @@ -141,7 +141,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { this.controller = rpcControllerFactory.newController(); } - PayloadCarryingRpcController getController() { + HBaseRpcController getController() { return controller; }
