Repository: hbase Updated Branches: refs/heads/hbase-10070 d02bb538d -> 97b7df274
HBASE-11214. Fixes for scans on a replicated table Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/97b7df27 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/97b7df27 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/97b7df27 Branch: refs/heads/hbase-10070 Commit: 97b7df274c27ae1075d0782ad31cf284362eaf48 Parents: d02bb53 Author: Devaraj Das <[email protected]> Authored: Thu May 22 10:01:50 2014 -0700 Committer: Devaraj Das <[email protected]> Committed: Thu May 22 10:01:50 2014 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/ClientScanner.java | 71 +------------------- .../hadoop/hbase/client/ClientSmallScanner.java | 57 ++-------------- .../org/apache/hadoop/hbase/client/HTable.java | 2 +- .../hbase/client/ReversedClientScanner.java | 19 +----- .../client/ScannerCallableWithReplicas.java | 12 ++-- 5 files changed, 15 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 3bdce36..14a5608 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -48,8 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes; * If there are multiple regions in a table, this scanner will iterate * through them all. */ [email protected] [email protected] [email protected] public class ClientScanner extends AbstractClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); protected Scan scan; @@ -99,74 +98,6 @@ public class ClientScanner extends AbstractClientScanner { new RpcRetryingCallerFactory(conf), pool, primaryOperationTimeout); } - /** - * Create a new ClientScanner for the specified table. - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf - * @param scan - * @param tableName - * @param connection - * @param rpcFactory - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory) - throws IOException { - this(conf, scan, tableName, connection, rpcFactory, null, 0); - } - - /** - * Create a new ClientScanner for the specified table. A ClusterConnection will be - * retrieved using the passed Configuration. - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, - final TableName tableName) throws IOException { - this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf)); - } - - /** - * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)} - */ - @Deprecated - public ClientScanner(final Configuration conf, final Scan scan, - final byte [] tableName) throws IOException { - this(conf, scan, TableName.valueOf(tableName)); - } - - - /** - * Create a new ClientScanner for the specified table - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @param connection Connection identifying the cluster - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection) throws IOException { - this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), null, 0); - } - - /** - * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)} - */ - @Deprecated - public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName, - ClusterConnection connection) throws IOException { - this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf), - null, 0); - } - /** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start * row maybe changed changed. http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index ca2f431..20a3b9b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -48,8 +48,7 @@ import com.google.protobuf.ServiceException; * * For small scan, it will get better performance than {@link ClientScanner} */ [email protected] [email protected] [email protected] public class ClientSmallScanner extends ClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); private ScannerCallableWithReplicas smallScanCallable = null; @@ -58,36 +57,6 @@ public class ClientSmallScanner extends ClientScanner { private byte[] skipRowOfFirstResult = null; /** - * Create a new ClientSmallScanner for the specified table. An HConnection - * will be retrieved using the passed Configuration. Note that the passed - * {@link Scan} 's start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to rangeGet - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName) throws IOException { - this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf)); - } - - /** - * Create a new ClientSmallScanner for the specified table. An HConnection - * will be retrieved using the passed Configuration. Note that the passed - * {@link Scan} 's start row maybe changed. - * @param conf - * @param scan - * @param tableName - * @param connection - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection) throws IOException { - this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf)); - } - - /** * Create a new ClientSmallScanner for the specified table. Note that the passed * {@link Scan} 's start row maybe changed. * @param conf @@ -101,24 +70,8 @@ public class ClientSmallScanner extends ClientScanner { public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, ExecutorService pool, int primaryOperationTimeout) throws IOException { - super(conf, scan, tableName, connection, pool, primaryOperationTimeout); - } - - /** - * Create a new ShortClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to rangeGet - * @param connection Connection identifying the cluster - * @param rpcFactory - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory) throws IOException { - super(conf, scan, tableName, connection, rpcFactory); + super(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), pool, + primaryOperationTimeout); } @Override @@ -196,6 +149,7 @@ public class ClientSmallScanner extends ClientScanner { @Override public Result[] call() throws IOException { + if (this.closed) return null; if (Thread.interrupted()) { throw new InterruptedIOException(); } @@ -217,9 +171,6 @@ public class ClientSmallScanner extends ClientScanner { public ScannerCallable getScannerCallableForReplica(int id) { return new SmallScannerCallable(id, this.getCaching()); } - - @Override - public void setClose(){} } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index d759757..c80a1f3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -129,7 +129,7 @@ public class HTable implements HTableInterface { protected long currentWriteBufferSize; protected int scannerCaching; private int maxKeyValueSize; - private ExecutorService pool; // For Multi + private ExecutorService pool; // For Multi & Scan private boolean closed; private int operationTimeout; private int retries; http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index c1940ae..0677d57 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -36,26 +36,12 @@ import org.apache.hadoop.hbase.util.Bytes; /** * A reversed client scanner which support backward scanning */ [email protected] [email protected] [email protected] public class ReversedClientScanner extends ClientScanner { private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); // A byte array in which all elements are the max byte, and it is used to // construct closest front row static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); - /** - * Create a new ReversibleClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed. - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @param connection Connection identifying the cluster - * @throws IOException - */ - public ReversedClientScanner(Configuration conf, Scan scan, - TableName tableName, ClusterConnection connection) throws IOException { - super(conf, scan, tableName, connection); - } /** * Create a new ReversibleClientScanner for the specified table Note that the @@ -71,7 +57,8 @@ public class ReversedClientScanner extends ClientScanner { public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, ClusterConnection connection, ExecutorService pool, int primaryOperationTimeout) throws IOException { - super(conf, scan, tableName, connection, pool, primaryOperationTimeout); + super(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), pool, + primaryOperationTimeout); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 14a3646..b79d6fb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -132,9 +132,9 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, currentScannerCallable.getRow()); + // allocate a boundedcompletion pool of some multiple of number of replicas. - // We want accommodate the "scan" RPC call and the "close" RPC call (we schedule "close" - // RPCs for unneeded replica scans using the same pool) + // We want to accomodate some RPCs for redundant replica scans (but are still in progress) BoundedCompletionService<Pair<Result[], ScannerCallable>> cs = new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size() * 5); @@ -151,7 +151,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { if (f != null) { Pair<Result[], ScannerCallable> r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } return r == null ? null : r.getFirst(); //great we got a response } @@ -175,7 +175,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { Future<Pair<Result[], ScannerCallable>> f = cs.take(); Pair<Result[], ScannerCallable> r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } return r == null ? null : r.getFirst(); // great we got an answer } catch (ExecutionException e) { @@ -204,7 +204,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { } private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, - AtomicBoolean done, BoundedCompletionService<Pair<Result[], ScannerCallable>> cs) { + AtomicBoolean done, ExecutorService pool) { if (done.compareAndSet(false, true)) { if (currentScannerCallable != scanner) replicaSwitched.set(true); currentScannerCallable = scanner; @@ -226,7 +226,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { // the table is closed (when the awaitTermination of the underlying pool is called) s.setClose(); RetryingRPC r = new RetryingRPC(s); - cs.submit(r); + pool.submit(r); } // now clear outstandingCallables since we scheduled a close for all the contained scanners outstandingCallables.clear();
