HBASE-15295 MutateTableAccess.multiMutate() does not get high priority causing a deadlock
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/05200976 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/05200976 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/05200976 Branch: refs/heads/hbase-12439 Commit: 05200976110135abb60f9b879b9b830671c07141 Parents: cbf9c1e Author: Enis Soztutar <[email protected]> Authored: Wed Mar 23 12:30:41 2016 -0700 Committer: Enis Soztutar <[email protected]> Committed: Mon Mar 28 17:56:32 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/HRegionInfo.java | 1 + .../hbase/client/BufferedMutatorImpl.java | 2 +- .../hadoop/hbase/client/ClusterConnection.java | 20 +- .../hbase/client/ConnectionConfiguration.java | 144 ++++++ .../hbase/client/ConnectionImplementation.java | 69 ++- .../apache/hadoop/hbase/client/HBaseAdmin.java | 511 +++++++++++++------ .../org/apache/hadoop/hbase/client/HTable.java | 55 +- .../hadoop/hbase/client/TableConfiguration.java | 144 ------ .../hadoop/hbase/ipc/AbstractRpcClient.java | 2 +- .../hadoop/hbase/ipc/CoprocessorRpcChannel.java | 11 +- .../hbase/ipc/MasterCoprocessorRpcChannel.java | 18 +- .../hbase/ipc/RegionCoprocessorRpcChannel.java | 46 +- .../ipc/RegionServerCoprocessorRpcChannel.java | 10 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 168 +++--- .../security/access/AccessControlClient.java | 46 +- .../hbase/zookeeper/MetaTableLocator.java | 18 +- .../hbase/client/TestSnapshotFromAdmin.java | 31 +- .../hadoop/hbase/DistributedHBaseCluster.java | 4 +- .../hbase/tmpl/regionserver/RSStatusTmpl.jamon | 2 +- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 5 +- .../apache/hadoop/hbase/ipc/RpcScheduler.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 12 +- .../hbase/master/RegionPlacementMaintainer.java | 2 +- .../hadoop/hbase/master/ServerManager.java | 34 +- .../apache/hadoop/hbase/MiniHBaseCluster.java | 4 +- .../hadoop/hbase/TestGlobalMemStoreSize.java | 14 +- .../hadoop/hbase/TestMetaTableAccessor.java | 79 +++ .../hadoop/hbase/TestMetaTableLocator.java | 8 +- .../hbase/client/HConnectionTestingUtility.java | 5 + .../apache/hadoop/hbase/client/TestAdmin1.java | 4 +- .../hadoop/hbase/client/TestFromClientSide.java | 6 +- .../hbase/client/TestFromClientSide3.java | 10 +- .../hbase/client/TestHBaseAdminNoCluster.java | 10 + .../client/TestScannersFromClientSide.java | 6 +- .../hbase/ipc/DelegatingRpcScheduler.java | 76 +++ .../TestLoadIncrementalHFilesSplitRecovery.java | 3 +- .../hbase/master/TestClockSkewDetection.java | 20 +- .../regionserver/TestRegionServerNoMaster.java | 2 +- .../hbase/security/access/SecureTestUtil.java | 12 +- .../security/access/TestAccessController.java | 20 +- .../security/access/TestNamespaceCommands.java | 13 +- 41 files changed, 1095 insertions(+), 554 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index 13ba23d..71f87f7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -202,6 +202,7 @@ public class HRegionInfo implements Comparable<HRegionInfo> { public final static byte[] HIDDEN_START_KEY = Bytes.toBytes("hidden-start-key"); /** HRegionInfo for first meta region */ + // TODO: How come Meta regions still do not have encoded region names? Fix. public static final HRegionInfo FIRST_META_REGIONINFO = new HRegionInfo(1L, TableName.META_TABLE_NAME); http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index ef3f7e9..01aaec5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -88,7 +88,7 @@ public class BufferedMutatorImpl implements BufferedMutator { this.pool = params.getPool(); this.listener = params.getListener(); - TableConfiguration tableConf = new TableConfiguration(conf); + ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? params.getWriteBufferSize() : tableConf.getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 45589be..d348ffc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -31,11 +31,12 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; -/** Internal methods on HConnection that should not be used by user code. */ +/** Internal methods on Connection that should not be used by user code. */ @InterfaceAudience.Private // NOTE: Although this class is public, this class is meant to be used directly from internal // classes and unit tests only. @@ -287,7 +288,22 @@ public interface ClusterConnection extends HConnection { * @return RpcRetryingCallerFactory */ RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf); - + + /** + * @return Connection's RpcRetryingCallerFactory instance + */ + RpcRetryingCallerFactory getRpcRetryingCallerFactory(); + + /** + * @return Connection's RpcControllerFactory instance + */ + RpcControllerFactory getRpcControllerFactory(); + + /** + * @return a ConnectionConfiguration object holding parsed configuration values + */ + ConnectionConfiguration getConnectionConfiguration(); + /** * @return the current statistics tracker associated with this connection */ http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java new file mode 100644 index 0000000..35bebae --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Configuration parameters for the connection. + * Configuration is a heavy weight registry that does a lot of string operations and regex matching. + * Method calls into Configuration account for high CPU usage and have huge performance impact. + * This class caches connection-related configuration values in the ConnectionConfiguration + * object so that expensive conf.getXXX() calls are avoided every time HTable, etc is instantiated. + * see HBASE-12128 + */ [email protected] +public class ConnectionConfiguration { + + public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; + public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; + public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; + public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1; + + private final long writeBufferSize; + private final int metaOperationTimeout; + private final int operationTimeout; + private final int scannerCaching; + private final long scannerMaxResultSize; + private final int primaryCallTimeoutMicroSecond; + private final int replicaCallTimeoutMicroSecondScan; + private final int retries; + private final int maxKeyValueSize; + + // toggle for async/sync prefetch + private final boolean clientScannerAsyncPrefetch; + + /** + * Constructor + * @param conf Configuration object + */ + ConnectionConfiguration(Configuration conf) { + this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); + + this.metaOperationTimeout = conf.getInt( + HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + + this.operationTimeout = conf.getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + + this.scannerCaching = conf.getInt( + HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + + this.scannerMaxResultSize = + conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + + this.primaryCallTimeoutMicroSecond = + conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms + + this.replicaCallTimeoutMicroSecondScan = + conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms + + this.retries = conf.getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + + this.clientScannerAsyncPrefetch = conf.getBoolean( + Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH); + + this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); + } + + /** + * Constructor + * This is for internal testing purpose (using the default value). + * In real usage, we should read the configuration from the Configuration object. + */ + @VisibleForTesting + protected ConnectionConfiguration() { + this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT; + this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; + this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; + this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; + this.scannerMaxResultSize = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; + this.primaryCallTimeoutMicroSecond = 10000; + this.replicaCallTimeoutMicroSecondScan = 1000000; + this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; + this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; + this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; + } + + public long getWriteBufferSize() { + return writeBufferSize; + } + + public int getMetaOperationTimeout() { + return metaOperationTimeout; + } + + public int getOperationTimeout() { + return operationTimeout; + } + + public int getScannerCaching() { + return scannerCaching; + } + + public int getPrimaryCallTimeoutMicroSecond() { + return primaryCallTimeoutMicroSecond; + } + + public int getReplicaCallTimeoutMicroSecondScan() { + return replicaCallTimeoutMicroSecondScan; + } + + public int getRetriesNumber() { + return retries; + } + + public int getMaxKeyValueSize() { + return maxKeyValueSize; + } + + public long getScannerMaxResultSize() { + return scannerMaxResultSize; + } + + public boolean isClientScannerAsyncPrefetch() { + return clientScannerAsyncPrefetch; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index fd4dc6d..ecaf18b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -25,6 +25,26 @@ import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import java.io.Closeable; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.Nullable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -78,25 +98,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; -import javax.annotation.Nullable; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. @@ -158,7 +159,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // cache the configuration value for tables so that we can avoid calling // the expensive Configuration to fetch the value multiple times. - private final TableConfiguration tableConfig; + private final ConnectionConfiguration connectionConfig; // Client rpc instance. private RpcClient rpcClient; @@ -190,14 +191,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.conf = conf; this.user = user; this.batchPool = pool; - this.tableConfig = new TableConfiguration(conf); + this.connectionConfig = new ConnectionConfiguration(conf); this.closed = false; this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS); // how many times to try, one more than max *retry* time - this.numTries = tableConfig.getRetriesNumber() + 1; + this.numTries = connectionConfig.getRetriesNumber() + 1; this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); @@ -306,7 +307,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { @Override public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException { - return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool); + return new HTable(tableName, this, connectionConfig, + rpcCallerFactory, rpcControllerFactory, pool); } @Override @@ -318,10 +320,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable { params.pool(HTable.getDefaultExecutor(getConfiguration())); } if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { - params.writeBufferSize(tableConfig.getWriteBufferSize()); + params.writeBufferSize(connectionConfig.getWriteBufferSize()); } if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { - params.maxKeyValueSize(tableConfig.getMaxKeyValueSize()); + params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); } return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); } @@ -2281,4 +2283,19 @@ class ConnectionImplementation implements ClusterConnection, Closeable { public boolean hasCellBlockSupport() { return this.rpcClient.hasCellBlockSupport(); } -} \ No newline at end of file + + @Override + public ConnectionConfiguration getConnectionConfiguration() { + return this.connectionConfig; + } + + @Override + public RpcRetryingCallerFactory getRpcRetryingCallerFactory() { + return this.rpcCallerFactory; + } + + @Override + public RpcControllerFactory getRpcControllerFactory() { + return this.rpcControllerFactory; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index c2a0bb8..c1d07ae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -68,7 +68,9 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; @@ -217,6 +219,7 @@ public class HBaseAdmin implements Admin { private int operationTimeout; private RpcRetryingCallerFactory rpcCallerFactory; + private RpcControllerFactory rpcControllerFactory; private NonceGenerator ng; @@ -229,6 +232,7 @@ public class HBaseAdmin implements Admin { this.conf = connection.getConfiguration(); this.connection = connection; + // TODO: receive ConnectionConfiguration here rather than re-parsing these configs every time. this.pause = this.conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, @@ -240,7 +244,8 @@ public class HBaseAdmin implements Admin { this.syncWaitTimeout = this.conf.getInt( "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); + this.rpcCallerFactory = connection.getRpcRetryingCallerFactory(); + this.rpcControllerFactory = connection.getRpcControllerFactory(); this.ng = this.connection.getNonceGenerator(); } @@ -266,17 +271,19 @@ public class HBaseAdmin implements Admin { @Override public Future<Boolean> abortProcedureAsync( - final long procId, - final boolean mayInterruptIfRunning) throws IOException { + final long procId, + final boolean mayInterruptIfRunning) throws IOException { Boolean abortProcResponse = executeCallable( new MasterCallable<AbortProcedureResponse>(getConnection()) { - @Override - public AbortProcedureResponse call(int callTimeout) throws ServiceException { - AbortProcedureRequest abortProcRequest = - AbortProcedureRequest.newBuilder().setProcId(procId).build(); - return master.abortProcedure(null,abortProcRequest); - } - }).getIsProcedureAborted(); + @Override + public AbortProcedureResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + AbortProcedureRequest abortProcRequest = + AbortProcedureRequest.newBuilder().setProcId(procId).build(); + return master.abortProcedure(controller, abortProcRequest); + } + }).getIsProcedureAborted(); AbortProcedureFuture abortProcFuture = new AbortProcedureFuture(this, procId, abortProcResponse); @@ -342,9 +349,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables); - return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req)); + return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req)); } }); } @@ -376,9 +385,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<TableName[]>(getConnection()) { @Override public TableName[] call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableNamesRequest req = RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables); - return ProtobufUtil.getTableNameArray(master.getTableNames(null, req) + return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req) .getTableNamesList()); } }); @@ -392,19 +403,23 @@ public class HBaseAdmin implements Admin { @Override public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException { - return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, operationTimeout); + return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory, + operationTimeout); } static HTableDescriptor getTableDescriptor(final TableName tableName, HConnection connection, - RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout) throws IOException { + RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, + int operationTimeout) throws IOException { if (tableName == null) return null; HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) { @Override public HTableDescriptor call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsResponse htds; GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(tableName); - htds = master.getTableDescriptors(null, req); + htds = master.getTableDescriptors(controller, req); if (!htds.getTableSchemaList().isEmpty()) { return HTableDescriptor.convert(htds.getTableSchemaList().get(0)); @@ -483,14 +498,17 @@ public class HBaseAdmin implements Admin { } CreateTableResponse response = executeCallable( - new MasterCallable<CreateTableResponse>(getConnection()) { - @Override - public CreateTableResponse call(int callTimeout) throws ServiceException { - CreateTableRequest request = RequestConverter.buildCreateTableRequest( - desc, splitKeys, ng.getNonceGroup(), ng.newNonce()); - return master.createTable(null, request); - } - }); + new MasterCallable<CreateTableResponse>(getConnection()) { + @Override + public CreateTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(desc.getTableName()); + CreateTableRequest request = RequestConverter.buildCreateTableRequest( + desc, splitKeys, ng.getNonceGroup(), ng.newNonce()); + return master.createTable(controller, request); + } + }); return new CreateTableFuture(this, desc, splitKeys, response); } @@ -532,14 +550,17 @@ public class HBaseAdmin implements Admin { @Override public Future<Void> deleteTableAsync(final TableName tableName) throws IOException { DeleteTableResponse response = executeCallable( - new MasterCallable<DeleteTableResponse>(getConnection()) { - @Override - public DeleteTableResponse call(int callTimeout) throws ServiceException { - DeleteTableRequest req = - RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); - return master.deleteTable(null,req); - } - }); + new MasterCallable<DeleteTableResponse>(getConnection()) { + @Override + public DeleteTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + DeleteTableRequest req = + RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce()); + return master.deleteTable(controller,req); + } + }); return new DeleteTableFuture(this, tableName, response); } @@ -614,10 +635,13 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<TruncateTableResponse>(getConnection()) { @Override public TruncateTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); LOG.info("Started truncating " + tableName); TruncateTableRequest req = RequestConverter.buildTruncateTableRequest( tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce()); - return master.truncateTable(null, req); + return master.truncateTable(controller, req); } }); return new TruncateTableFuture(this, tableName, preserveSplits, response); @@ -714,15 +738,19 @@ public class HBaseAdmin implements Admin { public Future<Void> enableTableAsync(final TableName tableName) throws IOException { TableName.isLegalFullyQualifiedTableName(tableName.getName()); EnableTableResponse response = executeCallable( - new MasterCallable<EnableTableResponse>(getConnection()) { - @Override - public EnableTableResponse call(int callTimeout) throws ServiceException { - LOG.info("Started enable of " + tableName); - EnableTableRequest req = - RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); - return master.enableTable(null,req); - } - }); + new MasterCallable<EnableTableResponse>(getConnection()) { + @Override + public EnableTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + + LOG.info("Started enable of " + tableName); + EnableTableRequest req = + RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce()); + return master.enableTable(controller,req); + } + }); return new EnableTableFuture(this, tableName, response); } @@ -776,15 +804,20 @@ public class HBaseAdmin implements Admin { public Future<Void> disableTableAsync(final TableName tableName) throws IOException { TableName.isLegalFullyQualifiedTableName(tableName.getName()); DisableTableResponse response = executeCallable( - new MasterCallable<DisableTableResponse>(getConnection()) { - @Override - public DisableTableResponse call(int callTimeout) throws ServiceException { - LOG.info("Started disable of " + tableName); - DisableTableRequest req = - RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); - return master.disableTable(null, req); - } - }); + new MasterCallable<DisableTableResponse>(getConnection()) { + @Override + public DisableTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + + LOG.info("Started disable of " + tableName); + DisableTableRequest req = + RequestConverter.buildDisableTableRequest( + tableName, ng.getNonceGroup(), ng.newNonce()); + return master.disableTable(controller, req); + } + }); return new DisableTableFuture(this, tableName, response); } @@ -863,9 +896,13 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) { @Override public Pair<Integer, Integer> call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + GetSchemaAlterStatusRequest req = RequestConverter .buildGetSchemaAlterStatusRequest(tableName); - GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(null, req); + GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req); Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(), ret.getTotalRegions()); return pair; @@ -897,10 +934,14 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<AddColumnResponse>(getConnection()) { @Override public AddColumnResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - return master.addColumn(null, req); + return master.addColumn(controller, req); } }); return new AddColumnFamilyFuture(this, tableName, response); @@ -938,10 +979,14 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection()) { @Override public DeleteColumnResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - master.deleteColumn(null, req); + master.deleteColumn(controller, req); return null; } }); @@ -980,10 +1025,14 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection()) { @Override public ModifyColumnResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - master.modifyColumn(null, req); + master.modifyColumn(controller, req); return null; } }); @@ -1042,7 +1091,10 @@ public class HBaseAdmin implements Admin { CloseRegionRequest request = RequestConverter.buildCloseRegionRequest(sn, encodedRegionName); try { - CloseRegionResponse response = admin.closeRegion(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + + // TODO: this does not do retries, it should. Set priority and timeout in controller + CloseRegionResponse response = admin.closeRegion(controller, request); boolean isRegionClosed = response.getClosed(); if (false == isRegionClosed) { LOG.error("Not able to close the region " + encodedRegionName + "."); @@ -1056,14 +1108,17 @@ public class HBaseAdmin implements Admin { @Override public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + // Close the region without updating zk state. - ProtobufUtil.closeRegion(admin, sn, hri.getRegionName()); + ProtobufUtil.closeRegion(controller, admin, sn, hri.getRegionName()); } @Override public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - return ProtobufUtil.getOnlineRegions(admin); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + return ProtobufUtil.getOnlineRegions(controller, admin); } @Override @@ -1088,23 +1143,15 @@ public class HBaseAdmin implements Admin { } HRegionInfo hRegionInfo = regionServerPair.getFirst(); ServerName serverName = regionServerPair.getSecond(); + + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); FlushRegionRequest request = RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); try { - admin.flushRegion(null, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - - private void flush(final ServerName sn, final HRegionInfo hri) - throws IOException { - AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - FlushRegionRequest request = - RequestConverter.buildFlushRegionRequest(hri.getRegionName()); - try { - admin.flushRegion(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.flushRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1258,11 +1305,13 @@ public class HBaseAdmin implements Admin { private void compact(final ServerName sn, final HRegionInfo hri, final boolean major, final byte [] family) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); try { - admin.compactRegion(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.compactRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1275,10 +1324,17 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(encodedRegionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + try { MoveRegionRequest request = RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName); - master.moveRegion(null, request); + master.moveRegion(controller, request); } catch (DeserializationException de) { LOG.error("Could not parse destination server name: " + de); throw new ServiceException(new DoNotRetryIOException(de)); @@ -1288,6 +1344,11 @@ public class HBaseAdmin implements Admin { }); } + private boolean isMetaRegion(final byte[] regionName) { + return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) + || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); + } + @Override public void assign(final byte[] regionName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { @@ -1295,9 +1356,16 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + AssignRegionRequest request = RequestConverter.buildAssignRegionRequest(toBeAssigned); - master.assignRegion(null,request); + master.assignRegion(controller,request); return null; } }); @@ -1310,9 +1378,15 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } UnassignRegionRequest request = RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force); - master.unassignRegion(null, request); + master.unassignRegion(controller, request); return null; } }); @@ -1324,7 +1398,13 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.offlineRegion(null,RequestConverter.buildOfflineRegionRequest(regionName)); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName)); return null; } }); @@ -1336,9 +1416,12 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + SetBalancerRunningRequest req = RequestConverter.buildSetBalancerRunningRequest(on, synchronous); - return master.setBalancerRunning(null, req).getPrevBalanceValue(); + return master.setBalancerRunning(controller, req).getPrevBalanceValue(); } }); } @@ -1348,7 +1431,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.balance(null, RequestConverter.buildBalanceRequest(false)).getBalancerRan(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.balance(controller, + RequestConverter.buildBalanceRequest(false)).getBalancerRan(); } }); } @@ -1358,7 +1445,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.balance(null, RequestConverter.buildBalanceRequest(force)).getBalancerRan(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.balance(controller, + RequestConverter.buildBalanceRequest(force)).getBalancerRan(); } }); } @@ -1368,8 +1459,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.isBalancerEnabled(null, RequestConverter.buildIsBalancerEnabledRequest()) - .getEnabled(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isBalancerEnabled(controller, + RequestConverter.buildIsBalancerEnabledRequest()).getEnabled(); } }); } @@ -1379,7 +1473,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.normalize(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.normalize(controller, RequestConverter.buildNormalizeRequest()).getNormalizerRan(); } }); @@ -1390,7 +1487,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.isNormalizerEnabled(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isNormalizerEnabled(controller, RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled(); } }); @@ -1401,9 +1501,12 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + SetNormalizerRunningRequest req = RequestConverter.buildSetNormalizerRunningRequest(on); - return master.setNormalizerRunning(null, req).getPrevNormalizerValue(); + return master.setNormalizerRunning(controller, req).getPrevNormalizerValue(); } }); } @@ -1413,7 +1516,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.enableCatalogJanitor(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.enableCatalogJanitor(controller, RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue(); } }); @@ -1424,7 +1530,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Integer>(getConnection()) { @Override public Integer call(int callTimeout) throws ServiceException { - return master.runCatalogScan(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.runCatalogScan(controller, RequestConverter.buildCatalogScanRequest()).getScanResult(); } }); @@ -1435,7 +1544,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.isCatalogJanitorEnabled(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isCatalogJanitorEnabled(controller, RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue(); } }); @@ -1480,11 +1592,14 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + try { DispatchMergingRegionsRequest request = RequestConverter .buildDispatchMergingRegionsRequest(encodedNameOfRegionA, encodedNameOfRegionB, forcible); - master.dispatchMergingRegions(null, request); + master.dispatchMergingRegions(controller, request); } catch (DeserializationException de) { LOG.error("Could not parse destination server name: " + de); } @@ -1562,28 +1677,35 @@ public class HBaseAdmin implements Admin { Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) { throw new IOException("should not give a splitkey which equals to startkey!"); } - // TODO: This is not executed via retries + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(hri.getTable()); + + // TODO: this does not do retries, it should. Set priority and timeout in controller AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - ProtobufUtil.split(admin, hri, splitPoint); + ProtobufUtil.split(controller, admin, hri, splitPoint); } @Override public Future<Void> modifyTable(final TableName tableName, final HTableDescriptor htd) - throws IOException { + throws IOException { if (!tableName.equals(htd.getTableName())) { throw new IllegalArgumentException("the specified table name '" + tableName + "' doesn't match with the HTD one: " + htd.getTableName()); } ModifyTableResponse response = executeCallable( - new MasterCallable<ModifyTableResponse>(getConnection()) { - @Override - public ModifyTableResponse call(int callTimeout) throws ServiceException { - ModifyTableRequest request = RequestConverter.buildModifyTableRequest( - tableName, htd, ng.getNonceGroup(), ng.newNonce()); - return master.modifyTable(null, request); - } - }); + new MasterCallable<ModifyTableResponse>(getConnection()) { + @Override + public ModifyTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + + ModifyTableRequest request = RequestConverter.buildModifyTableRequest( + tableName, htd, ng.getNonceGroup(), ng.newNonce()); + return master.modifyTable(controller, request); + } + }); return new ModifyTableFuture(this, tableName, response); } @@ -1715,7 +1837,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.shutdown(null,ShutdownRequest.newBuilder().build()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(HConstants.HIGH_QOS); + master.shutdown(controller, ShutdownRequest.newBuilder().build()); return null; } }); @@ -1726,7 +1851,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.stopMaster(null, StopMasterRequest.newBuilder().build()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(HConstants.HIGH_QOS); + master.stopMaster(controller, StopMasterRequest.newBuilder().build()); return null; } }); @@ -1741,8 +1869,12 @@ public class HBaseAdmin implements Admin { this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); StopServerRequest request = RequestConverter.buildStopServerRequest( "Called by admin client " + this.connection.toString()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + + controller.setPriority(HConstants.HIGH_QOS); try { - admin.stopServer(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.stopServer(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1753,8 +1885,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) { @Override public ClusterStatus call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(); - return ClusterStatus.convert(master.getClusterStatus(null, req).getClusterStatus()); + return ClusterStatus.convert(master.getClusterStatus(controller, req).getClusterStatus()); } }); } @@ -1793,18 +1927,21 @@ public class HBaseAdmin implements Admin { @Override public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor) - throws IOException { + throws IOException { CreateNamespaceResponse response = executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection()) { - @Override - public CreateNamespaceResponse call(int callTimeout) throws Exception { - return master.createNamespace(null, - CreateNamespaceRequest.newBuilder() - .setNamespaceDescriptor(ProtobufUtil - .toProtoNamespaceDescriptor(descriptor)).build() - ); - } - }); + @Override + public CreateNamespaceResponse call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // TODO: set priority based on NS? + return master.createNamespace(controller, + CreateNamespaceRequest.newBuilder() + .setNamespaceDescriptor(ProtobufUtil + .toProtoNamespaceDescriptor(descriptor)).build() + ); + } + }); return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) { @Override public String getOperationType() { @@ -1821,15 +1958,18 @@ public class HBaseAdmin implements Admin { @Override public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor) - throws IOException { + throws IOException { ModifyNamespaceResponse response = executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection()) { - @Override - public ModifyNamespaceResponse call(int callTimeout) throws Exception { - return master.modifyNamespace(null, ModifyNamespaceRequest.newBuilder(). - setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); - } - }); + @Override + public ModifyNamespaceResponse call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // TODO: set priority based on NS? + return master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder(). + setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); + } + }); return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) { @Override public String getOperationType() { @@ -1846,15 +1986,18 @@ public class HBaseAdmin implements Admin { @Override public Future<Void> deleteNamespaceAsync(final String name) - throws IOException { + throws IOException { DeleteNamespaceResponse response = executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection()) { - @Override - public DeleteNamespaceResponse call(int callTimeout) throws Exception { - return master.deleteNamespace(null, DeleteNamespaceRequest.newBuilder(). - setNamespaceName(name).build()); - } - }); + @Override + public DeleteNamespaceResponse call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // TODO: set priority based on NS? + return master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder(). + setNamespaceName(name).build()); + } + }); return new NamespaceFuture(this, name, response.getProcId()) { @Override public String getOperationType() { @@ -1869,8 +2012,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) { @Override public NamespaceDescriptor call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); return ProtobufUtil.toNamespaceDescriptor( - master.getNamespaceDescriptor(null, GetNamespaceDescriptorRequest.newBuilder(). + master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder(). setNamespaceName(name).build()).getNamespaceDescriptor()); } }); @@ -1882,9 +2027,12 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) { @Override public NamespaceDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List<HBaseProtos.NamespaceDescriptor> list = - master.listNamespaceDescriptors(null, ListNamespaceDescriptorsRequest.newBuilder(). - build()).getNamespaceDescriptorList(); + master.listNamespaceDescriptors(controller, + ListNamespaceDescriptorsRequest.newBuilder().build()) + .getNamespaceDescriptorList(); NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()]; for(int i = 0; i < list.size(); i++) { res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i)); @@ -1900,8 +2048,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) { @Override public ProcedureInfo[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List<ProcedureProtos.Procedure> procList = master.listProcedures( - null, ListProceduresRequest.newBuilder().build()).getProcedureList(); + controller, ListProceduresRequest.newBuilder().build()).getProcedureList(); ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()]; for (int i = 0; i < procList.size(); i++) { procInfoList[i] = ProcedureInfo.convert(procList.get(i)); @@ -1917,9 +2067,12 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List<TableSchema> list = - master.listTableDescriptorsByNamespace(null, ListTableDescriptorsByNamespaceRequest. - newBuilder().setNamespaceName(name).build()).getTableSchemaList(); + master.listTableDescriptorsByNamespace(controller, + ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name) + .build()).getTableSchemaList(); HTableDescriptor[] res = new HTableDescriptor[list.size()]; for(int i=0; i < list.size(); i++) { @@ -1936,8 +2089,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<TableName[]>(getConnection()) { @Override public TableName[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List<HBaseProtos.TableName> tableNames = - master.listTableNamesByNamespace(null, ListTableNamesByNamespaceRequest. + master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest. newBuilder().setNamespaceName(name).build()) .getTableNameList(); TableName[] result = new TableName[tableNames.size()]; @@ -2017,9 +2172,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(tableNames); - return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req)); + return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req)); } }); } @@ -2059,8 +2216,11 @@ public class HBaseAdmin implements Admin { FailedLogCloseException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + try { - return admin.rollWALWriter(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + return admin.rollWALWriter(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -2142,7 +2302,9 @@ public class HBaseAdmin implements Admin { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( regionServerPair.getFirst().getRegionName(), true); - GetRegionInfoResponse response = admin.getRegionInfo(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + // TODO: this does not do retries, it should. Set priority and timeout in controller + GetRegionInfoResponse response = admin.getRegionInfo(controller, request); return response.getCompactionState(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -2204,7 +2366,9 @@ public class HBaseAdmin implements Admin { done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) { @Override public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { - return master.isSnapshotDone(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isSnapshotDone(controller, request); } }); } @@ -2224,7 +2388,9 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) { @Override public SnapshotResponse call(int callTimeout) throws ServiceException { - return master.snapshot(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.snapshot(controller, request); } }); } @@ -2236,7 +2402,9 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) { @Override public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { - return master.isSnapshotDone(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isSnapshotDone(controller, IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build()); } }).getDone(); @@ -2370,7 +2538,9 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public ExecProcedureResponse call(int callTimeout) throws ServiceException { - return master.execProcedureWithRet(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.execProcedureWithRet(controller, request); } }); @@ -2395,7 +2565,9 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public ExecProcedureResponse call(int callTimeout) throws ServiceException { - return master.execProcedure(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.execProcedure(controller, request); } }); @@ -2442,7 +2614,9 @@ public class HBaseAdmin implements Admin { new MasterCallable<IsProcedureDoneResponse>(getConnection()) { @Override public IsProcedureDoneResponse call(int callTimeout) throws ServiceException { - return master.isProcedureDone(null, IsProcedureDoneRequest + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isProcedureDone(controller, IsProcedureDoneRequest .newBuilder().setProcedure(desc).build()); } }).getDone(); @@ -2488,7 +2662,9 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public IsRestoreSnapshotDoneResponse call(int callTimeout) throws ServiceException { - return master.isRestoreSnapshotDone(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isRestoreSnapshotDone(controller, request); } }); } @@ -2518,7 +2694,9 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<RestoreSnapshotResponse>(getConnection()) { @Override public RestoreSnapshotResponse call(int callTimeout) throws ServiceException { - return master.restoreSnapshot(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.restoreSnapshot(controller, request); } }); } @@ -2528,8 +2706,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) { @Override public List<SnapshotDescription> call(int callTimeout) throws ServiceException { - return master.getCompletedSnapshots(null, GetCompletedSnapshotsRequest.newBuilder().build()) - .getSnapshotsList(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.getCompletedSnapshots(controller, + GetCompletedSnapshotsRequest.newBuilder().build()).getSnapshotsList(); } }); } @@ -2587,7 +2767,9 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.deleteSnapshot(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder(). setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build() ); @@ -2619,8 +2801,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - this.master.deleteSnapshot(null, DeleteSnapshotRequest.newBuilder().setSnapshot(snapshot) - .build()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder() + .setSnapshot(snapshot).build()); return null; } }); @@ -2651,7 +2835,9 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - this.master.setQuota(null, QuotaSettings.buildSetQuotaRequestProto(quota)); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota)); return null; } }); @@ -2750,10 +2936,12 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Long>(getConnection()) { @Override public Long call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); MajorCompactionTimestampRequest req = MajorCompactionTimestampRequest.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - return master.getLastMajorCompactionTimestamp(null, req).getCompactionTimestamp(); + return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp(); } }); } @@ -2763,13 +2951,16 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<Long>(getConnection()) { @Override public Long call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); MajorCompactionTimestampForRegionRequest req = MajorCompactionTimestampForRegionRequest .newBuilder() .setRegion( RequestConverter .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build(); - return master.getLastMajorCompactionTimestampForRegion(null, req).getCompactionTimestamp(); + return master.getLastMajorCompactionTimestampForRegion(controller, req) + .getCompactionTimestamp(); } }); } @@ -2818,6 +3009,7 @@ public class HBaseAdmin implements Admin { CompactType compactType) throws IOException { CompactionState state = CompactionState.NONE; checkTableExists(tableName); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); switch (compactType) { case MOB: try { @@ -2826,7 +3018,7 @@ public class HBaseAdmin implements Admin { GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( info.getRegionName(), true); GetRegionInfoResponse response = this.connection.getAdmin(master) - .getRegionInfo(null, request); + .getRegionInfo(controller, request); state = response.getCompactionState(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -2852,7 +3044,7 @@ public class HBaseAdmin implements Admin { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( pair.getFirst().getRegionName(), true); - GetRegionInfoResponse response = admin.getRegionInfo(null, request); + GetRegionInfoResponse response = admin.getRegionInfo(controller, request); switch (response.getCompactionState()) { case MAJOR_AND_MINOR: return CompactionState.MAJOR_AND_MINOR; @@ -2952,7 +3144,9 @@ public class HBaseAdmin implements Admin { admin.getConnection()) { @Override public AbortProcedureResponse call(int callTimeout) throws ServiceException { - return master.abortProcedure(null, request); + PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController(); + controller.setCallTimeout(callTimeout); + return master.abortProcedure(controller, request); } }); } @@ -3366,9 +3560,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection()) { @Override public List<SecurityCapability> call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build(); return ProtobufUtil.toSecurityCapabilityList( - master.getSecurityCapabilities(null, req).getCapabilitiesList()); + master.getSecurityCapabilities(controller, req).getCapabilitiesList()); } }); } catch (IOException e) { @@ -3414,4 +3610,7 @@ public class HBaseAdmin implements Admin { HConstants.EMPTY_END_ROW, false, 0); } + private RpcControllerFactory getRpcControllerFactory() { + return rpcControllerFactory; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/05200976/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 33fd94e..befc671 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -105,7 +105,7 @@ public class HTable implements HTableInterface { protected ClusterConnection connection; private final TableName tableName; private volatile Configuration configuration; - private TableConfiguration tableConfiguration; + private ConnectionConfiguration connConfiguration; protected BufferedMutatorImpl mutator; private boolean autoFlush = true; private boolean closed = false; @@ -154,7 +154,7 @@ public class HTable implements HTableInterface { */ @InterfaceAudience.Private protected HTable(TableName tableName, final ClusterConnection connection, - final TableConfiguration tableConfig, + final ConnectionConfiguration tableConfig, final RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, final ExecutorService pool) throws IOException { @@ -165,7 +165,7 @@ public class HTable implements HTableInterface { this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); - this.tableConfiguration = tableConfig; + this.connConfiguration = tableConfig; this.pool = pool; if (pool == null) { this.pool = getDefaultExecutor(this.configuration); @@ -188,7 +188,7 @@ public class HTable implements HTableInterface { protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException { connection = conn; tableName = params.getTableName(); - tableConfiguration = new TableConfiguration(connection.getConfiguration()); + connConfiguration = new ConnectionConfiguration(connection.getConfiguration()); cleanupPoolOnClose = false; cleanupConnectionOnClose = false; // used from tests, don't trust the connection is real @@ -206,14 +206,14 @@ public class HTable implements HTableInterface { * setup this HTable's parameter based on the passed configuration */ private void finishSetup() throws IOException { - if (tableConfiguration == null) { - tableConfiguration = new TableConfiguration(configuration); + if (connConfiguration == null) { + connConfiguration = new ConnectionConfiguration(configuration); } this.operationTimeout = tableName.isSystemTable() ? - tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); - this.scannerCaching = tableConfiguration.getScannerCaching(); - this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize(); + connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); } @@ -265,23 +265,14 @@ public class HTable implements HTableInterface { */ @Override public HTableDescriptor getTableDescriptor() throws IOException { - HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, - rpcCallerFactory, operationTimeout); + HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, + rpcControllerFactory, operationTimeout); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } return null; } - private <V> V executeMasterCallable(MasterCallable<V> callable) throws IOException { - RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(); - try { - return caller.callWithRetries(callable, operationTimeout); - } finally { - callable.close(); - } - } - /** * Get the corresponding start keys and regions for an arbitrary range of * keys. @@ -354,34 +345,34 @@ public class HTable implements HTableInterface { Boolean async = scan.isAsyncPrefetch(); if (async == null) { - async = tableConfiguration.isClientScannerAsyncPrefetch(); + async = connConfiguration.isClientScannerAsyncPrefetch(); } if (scan.isReversed()) { if (scan.isSmall()) { return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } } if (scan.isSmall()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { if (async) { return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } } } @@ -454,9 +445,9 @@ public class HTable implements HTableInterface { // Call that takes into account the replica RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( rpcControllerFactory, tableName, this.connection, get, pool, - tableConfiguration.getRetriesNumber(), + connConfiguration.getRetriesNumber(), operationTimeout, - tableConfiguration.getPrimaryCallTimeoutMicroSecond()); + connConfiguration.getPrimaryCallTimeoutMicroSecond()); return callable.call(); } @@ -1039,7 +1030,7 @@ public class HTable implements HTableInterface { // validate for well-formedness public void validatePut(final Put put) throws IllegalArgumentException { - validatePut(put, tableConfiguration.getMaxKeyValueSize()); + validatePut(put, connConfiguration.getMaxKeyValueSize()); } // validate for well-formedness @@ -1092,7 +1083,7 @@ public class HTable implements HTableInterface { @Override public long getWriteBufferSize() { if (mutator == null) { - return tableConfiguration.getWriteBufferSize(); + return connConfiguration.getWriteBufferSize(); } else { return mutator.getWriteBufferSize(); } @@ -1344,8 +1335,8 @@ public class HTable implements HTableInterface { this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( new BufferedMutatorParams(tableName) .pool(pool) - .writeBufferSize(tableConfiguration.getWriteBufferSize()) - .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize()) + .writeBufferSize(connConfiguration.getWriteBufferSize()) + .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) ); } return mutator;
