HBASE-19251 Merge RawAsyncTable and AsyncTable
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bc3542c0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bc3542c0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bc3542c0 Branch: refs/heads/branch-2 Commit: bc3542c0fb33dd4e4d0f279bf742d9f642f9504e Parents: e063b23 Author: zhangduo <zhang...@apache.org> Authored: Thu Nov 16 14:36:28 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Thu Nov 16 14:37:51 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/AsyncMetaTableAccessor.java | 35 +- .../client/AdvancedScanResultConsumer.java | 121 ++++ .../apache/hadoop/hbase/client/AsyncAdmin.java | 17 +- .../client/AsyncBufferedMutatorBuilderImpl.java | 4 +- .../hbase/client/AsyncBufferedMutatorImpl.java | 6 +- .../hadoop/hbase/client/AsyncClientScanner.java | 4 +- .../hadoop/hbase/client/AsyncConnection.java | 29 +- .../hbase/client/AsyncConnectionImpl.java | 90 +-- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 7 +- .../hbase/client/AsyncNonMetaRegionLocator.java | 85 ++- .../client/AsyncRpcRetryingCallerFactory.java | 4 +- .../AsyncScanSingleRegionRpcRetryingCaller.java | 38 +- .../AsyncSingleRequestRpcRetryingCaller.java | 4 +- .../apache/hadoop/hbase/client/AsyncTable.java | 570 ++++++++++++++++++- .../hadoop/hbase/client/AsyncTableBase.java | 414 -------------- .../hadoop/hbase/client/AsyncTableBuilder.java | 26 +- .../hbase/client/AsyncTableBuilderBase.java | 21 +- .../hadoop/hbase/client/AsyncTableImpl.java | 83 ++- .../hbase/client/AsyncTableResultScanner.java | 11 +- .../hadoop/hbase/client/ConnectionUtils.java | 23 +- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 26 +- .../hadoop/hbase/client/RawAsyncTable.java | 263 --------- .../hadoop/hbase/client/RawAsyncTableImpl.java | 69 ++- .../hbase/client/RawScanResultConsumer.java | 137 ----- .../hadoop/hbase/client/ScanResultConsumer.java | 28 +- .../hbase/client/ScanResultConsumerBase.java | 48 ++ .../hadoop/hbase/client/ServiceCaller.java | 61 ++ .../coprocessor/AsyncAggregationClient.java | 31 +- .../client/TestAsyncAggregationClient.java | 4 +- .../client/example/AsyncClientExample.java | 2 +- .../hbase/client/example/HttpProxyExample.java | 43 +- .../hadoop/hbase/PerformanceEvaluation.java | 20 +- .../client/AbstractTestAsyncTableScan.java | 55 +- .../client/BufferingScanResultConsumer.java | 89 +++ .../client/SimpleRawScanResultConsumer.java | 84 --- .../hbase/client/TestAsyncBufferMutator.java | 2 +- .../hbase/client/TestAsyncClusterAdminApi.java | 4 +- .../hbase/client/TestAsyncRegionAdminApi.java | 36 +- ...TestAsyncSingleRequestRpcRetryingCaller.java | 12 +- .../hadoop/hbase/client/TestAsyncTable.java | 24 +- .../hbase/client/TestAsyncTableAdminApi.java | 128 ++--- .../hbase/client/TestAsyncTableBatch.java | 68 +-- .../client/TestAsyncTableGetMultiThreaded.java | 18 +- .../hbase/client/TestAsyncTableNoncedRetry.java | 4 +- .../hadoop/hbase/client/TestAsyncTableScan.java | 7 +- .../hbase/client/TestAsyncTableScanAll.java | 20 +- .../hbase/client/TestAsyncTableScanMetrics.java | 4 +- .../client/TestAsyncTableScanRenewLease.java | 6 +- .../hbase/client/TestAsyncTableScanner.java | 20 +- ...stAsyncTableScannerCloseWhileSuspending.java | 2 +- .../hbase/client/TestRawAsyncScanCursor.java | 8 +- .../TestRawAsyncTableLimitedScanWithFilter.java | 4 +- .../client/TestRawAsyncTablePartialScan.java | 8 +- .../hbase/client/TestRawAsyncTableScan.java | 8 +- 54 files changed, 1480 insertions(+), 1455 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java index 6f41bd0..4c1d602 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java @@ -38,10 +38,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.MetaTableAccessor.CollectingVisitor; import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; import org.apache.hadoop.hbase.MetaTableAccessor.Visitor; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; +import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.RawAsyncTable; -import org.apache.hadoop.hbase.client.RawScanResultConsumer; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; @@ -72,14 +72,15 @@ public class AsyncMetaTableAccessor { private static final Pattern SERVER_COLUMN_PATTERN = Pattern .compile("^server(_[0-9a-fA-F]{4})?$"); - public static CompletableFuture<Boolean> tableExists(RawAsyncTable metaTable, TableName tableName) { + public static CompletableFuture<Boolean> tableExists(AsyncTable<?> metaTable, + TableName tableName) { if (tableName.equals(META_TABLE_NAME)) { return CompletableFuture.completedFuture(true); } return getTableState(metaTable, tableName).thenApply(Optional::isPresent); } - public static CompletableFuture<Optional<TableState>> getTableState(RawAsyncTable metaTable, + public static CompletableFuture<Optional<TableState>> getTableState(AsyncTable<?> metaTable, TableName tableName) { CompletableFuture<Optional<TableState>> future = new CompletableFuture<>(); Get get = new Get(tableName.getName()).addColumn(getTableFamily(), getStateColumn()); @@ -110,7 +111,7 @@ public class AsyncMetaTableAccessor { * @return HRegionLocation for the given region */ public static CompletableFuture<Optional<HRegionLocation>> getRegionLocation( - RawAsyncTable metaTable, byte[] regionName) { + AsyncTable<?> metaTable, byte[] regionName) { CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>(); try { RegionInfo parsedRegionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName); @@ -139,7 +140,7 @@ public class AsyncMetaTableAccessor { * @return HRegionLocation for the given region */ public static CompletableFuture<Optional<HRegionLocation>> getRegionLocationWithEncodedName( - RawAsyncTable metaTable, byte[] encodedRegionName) { + AsyncTable<?> metaTable, byte[] encodedRegionName) { CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>(); metaTable.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)) .whenComplete( @@ -193,7 +194,7 @@ public class AsyncMetaTableAccessor { * {@link CompletableFuture}. */ public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations( - RawAsyncTable metaTable, final Optional<TableName> tableName) { + AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName) { CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); getTableRegionsAndLocations(metaTable, tableName, true).whenComplete( (locations, err) -> { @@ -220,7 +221,7 @@ public class AsyncMetaTableAccessor { * {@link CompletableFuture}. */ private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations( - RawAsyncTable metaTable, final Optional<TableName> tableName, + AsyncTable<AdvancedScanResultConsumer> metaTable, final Optional<TableName> tableName, final boolean excludeOfflinedSplitParents) { CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>(); if (tableName.filter((t) -> t.equals(TableName.META_TABLE_NAME)).isPresent()) { @@ -252,7 +253,7 @@ public class AsyncMetaTableAccessor { } for (HRegionLocation loc : current.get().getRegionLocations()) { if (loc != null) { - this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegionInfo(), loc + this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc .getServerName())); } } @@ -276,7 +277,7 @@ public class AsyncMetaTableAccessor { * @param type scanned part of meta * @param visitor Visitor invoked against each row */ - private static CompletableFuture<Void> scanMeta(RawAsyncTable metaTable, + private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName, QueryType type, final Visitor visitor) { return scanMeta(metaTable, getTableStartRowForMeta(tableName, type), getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor); @@ -291,8 +292,9 @@ public class AsyncMetaTableAccessor { * @param maxRows maximum rows to return * @param visitor Visitor invoked against each row */ - private static CompletableFuture<Void> scanMeta(RawAsyncTable metaTable, Optional<byte[]> startRow, - Optional<byte[]> stopRow, QueryType type, int maxRows, final Visitor visitor) { + private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable, + Optional<byte[]> startRow, Optional<byte[]> stopRow, QueryType type, int maxRows, + final Visitor visitor) { int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE; Scan scan = getMetaScan(metaTable, rowUpperLimit); for (byte[] family : type.getFamilies()) { @@ -308,11 +310,11 @@ public class AsyncMetaTableAccessor { } CompletableFuture<Void> future = new CompletableFuture<Void>(); - metaTable.scan(scan, new MetaTableRawScanResultConsumer(rowUpperLimit, visitor, future)); + metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future)); return future; } - private static final class MetaTableRawScanResultConsumer implements RawScanResultConsumer { + private static final class MetaTableScanResultConsumer implements AdvancedScanResultConsumer { private int currentRowCount; @@ -322,7 +324,8 @@ public class AsyncMetaTableAccessor { private final CompletableFuture<Void> future; - MetaTableRawScanResultConsumer(int rowUpperLimit, Visitor visitor, CompletableFuture<Void> future) { + MetaTableScanResultConsumer(int rowUpperLimit, Visitor visitor, + CompletableFuture<Void> future) { this.rowUpperLimit = rowUpperLimit; this.visitor = visitor; this.future = future; @@ -359,7 +362,7 @@ public class AsyncMetaTableAccessor { } } - private static Scan getMetaScan(RawAsyncTable metaTable, int rowUpperLimit) { + private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) { Scan scan = new Scan(); int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING, HConstants.DEFAULT_HBASE_META_SCANNER_CACHING); http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdvancedScanResultConsumer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdvancedScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdvancedScanResultConsumer.java new file mode 100644 index 0000000..10933ab --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdvancedScanResultConsumer.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.Optional; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * This is the low level API for asynchronous scan. + * <p> + * All results that match the given scan object will be passed to this class by calling + * {@link #onNext(Result[], ScanController)}. {@link #onComplete()} means the scan is finished, and + * {@link #onError(Throwable)} means we hit an unrecoverable error and the scan is terminated. + * {@link #onHeartbeat(ScanController)} means the RS is still working but we can not get a valid + * result to call {@link #onNext(Result[], ScanController)}. This is usually because the matched + * results are too sparse, for example, a filter which almost filters out everything is specified. + * <p> + * Notice that, all the methods here will be called directly in the thread which we send request to + * HBase service. So if you want the asynchronous scanner fetch data from HBase in background while + * you process the returned data, you need to move the processing work to another thread to make the + * {@link #onNext(Result[], ScanController)} call return immediately. And please do NOT do any time + * consuming tasks in these methods unless you know what you are doing. + * @since 2.0.0 + */ +@InterfaceAudience.Public +public interface AdvancedScanResultConsumer extends ScanResultConsumerBase { + + /** + * Used to resume a scan. + */ + @InterfaceAudience.Public + interface ScanResumer { + + /** + * Resume the scan. You are free to call it multiple time but only the first call will take + * effect. + */ + void resume(); + } + + /** + * Used to suspend or stop a scan, or get a scan cursor if available. + * <p> + * Notice that, you should only call the {@link #suspend()} or {@link #terminate()} inside onNext + * or onHeartbeat method. A IllegalStateException will be thrown if you call them at other places. + * <p> + * You can only call one of the {@link #suspend()} and {@link #terminate()} methods(of course you + * are free to not call them both), and the methods are not reentrant. An IllegalStateException + * will be thrown if you have already called one of the methods. + */ + @InterfaceAudience.Public + interface ScanController { + + /** + * Suspend the scan. + * <p> + * This means we will stop fetching data in background, i.e., will not call onNext any more + * before you resume the scan. + * @return A resumer used to resume the scan later. + */ + ScanResumer suspend(); + + /** + * Terminate the scan. + * <p> + * This is useful when you have got enough results and want to stop the scan in onNext method, + * or you want to stop the scan in onHeartbeat method because it has spent too many time. + */ + void terminate(); + + /** + * Get the scan cursor if available. + * @return The scan cursor. + */ + Optional<Cursor> cursor(); + } + + /** + * Indicate that we have receive some data. + * @param results the data fetched from HBase service. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within scope of onNext method. You can only call its method in + * onNext, do NOT store it and call it later outside onNext. + */ + void onNext(Result[] results, ScanController controller); + + /** + * Indicate that there is a heartbeat message but we have not cumulated enough cells to call + * {@link #onNext(Result[], ScanController)}. + * <p> + * Note that this method will always be called when RS returns something to us but we do not have + * enough cells to call {@link #onNext(Result[], ScanController)}. Sometimes it may not be a + * 'heartbeat' message for RS, for example, we have a large row with many cells and size limit is + * exceeded before sending all the cells for this row. For RS it does send some data to us and the + * time limit has not been reached, but we can not return the data to client so here we call this + * method to tell client we have already received something. + * <p> + * This method give you a chance to terminate a slow scan operation. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within the scope of onHeartbeat method. You can only call its + * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. + */ + default void onHeartbeat(ScanController controller) { + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 722e8b5..c716441 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import com.google.protobuf.RpcChannel; + import java.util.Collection; import java.util.EnumSet; import java.util.List; @@ -33,7 +35,6 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable; import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.quotas.QuotaFilter; @@ -42,8 +43,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.yetus.audience.InterfaceAudience; -import com.google.protobuf.RpcChannel; - /** * The asynchronous administrative API for HBase. * @since 2.0.0 @@ -1072,14 +1071,14 @@ public interface AsyncAdmin { * </pre> * @param stubMaker a delegation to the actual {@code newStub} call. * @param callable a delegation to the actual protobuf rpc call. See the comment of - * {@link CoprocessorCallable} for more details. + * {@link ServiceCaller} for more details. * @param <S> the type of the asynchronous stub * @param <R> the type of the return value * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. - * @see CoprocessorCallable + * @see ServiceCaller */ <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable); + ServiceCaller<S, R> callable); /** * Execute the given coprocessor call on the given region server. @@ -1094,15 +1093,15 @@ public interface AsyncAdmin { * </pre> * @param stubMaker a delegation to the actual {@code newStub} call. * @param callable a delegation to the actual protobuf rpc call. See the comment of - * {@link CoprocessorCallable} for more details. + * {@link ServiceCaller} for more details. * @param serverName the given region server * @param <S> the type of the asynchronous stub * @param <R> the type of the return value * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. - * @see CoprocessorCallable + * @see ServiceCaller */ <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, ServerName serverName); + ServiceCaller<S, R> callable, ServerName serverName); /** * List all the dead region servers. http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java index 1b8765c..a44bafa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java @@ -29,12 +29,12 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder { - private final AsyncTableBuilder<? extends AsyncTableBase> tableBuilder; + private final AsyncTableBuilder<?> tableBuilder; private long writeBufferSize; public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf, - AsyncTableBuilder<? extends AsyncTableBase> tableBuilder) { + AsyncTableBuilder<?> tableBuilder) { this.tableBuilder = tableBuilder; this.writeBufferSize = connConf.getWriteBufferSize(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java index f6f1ed6..ac159b4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java @@ -30,12 +30,12 @@ import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; /** - * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTableBase}. + * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}. */ @InterfaceAudience.Private class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { - private final AsyncTableBase table; + private final AsyncTable<?> table; private final long writeBufferSize; @@ -47,7 +47,7 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { private boolean closed; - AsyncBufferedMutatorImpl(AsyncTableBase table, long writeBufferSize) { + AsyncBufferedMutatorImpl(AsyncTable<?> table, long writeBufferSize) { this.table = table; this.writeBufferSize = writeBufferSize; } http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 5268ec8..ac2d3d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -53,7 +53,7 @@ class AsyncClientScanner { private final ScanMetrics scanMetrics; - private final RawScanResultConsumer consumer; + private final AdvancedScanResultConsumer consumer; private final TableName tableName; @@ -71,7 +71,7 @@ class AsyncClientScanner { private final ScanResultCache resultCache; - public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName, + public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { if (scan.getStartRow() == null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java index 877c074..eda2394 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java @@ -49,32 +49,37 @@ public interface AsyncConnection extends Closeable { AsyncTableRegionLocator getRegionLocator(TableName tableName); /** - * Retrieve an {@link RawAsyncTable} implementation for accessing a table. + * Retrieve an {@link AsyncTable} implementation for accessing a table. * <p> - * The returned instance will use default configs. Use {@link #getRawTableBuilder(TableName)} if you - * want to customize some configs. + * The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if + * you want to customize some configs. * <p> * This method no longer checks table existence. An exception will be thrown if the table does not * exist only when the first operation is attempted. + * <p> + * The returned {@code CompletableFuture} will be finished directly in the rpc framework's + * callback thread, so typically you should not do any time consuming work inside these methods. + * And also the observer style scan API will use {@link AdvancedScanResultConsumer} which is + * designed for experts only. Only use it when you know what you are doing. * @param tableName the name of the table - * @return an RawAsyncTable to use for interactions with this table - * @see #getRawTableBuilder(TableName) + * @return an AsyncTable to use for interactions with this table + * @see #getTableBuilder(TableName) */ - default RawAsyncTable getRawTable(TableName tableName) { - return getRawTableBuilder(tableName).build(); + default AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) { + return getTableBuilder(tableName).build(); } /** - * Returns an {@link AsyncTableBuilder} for creating {@link RawAsyncTable}. + * Returns an {@link AsyncTableBuilder} for creating {@link AsyncTable}. * <p> * This method no longer checks table existence. An exception will be thrown if the table does not * exist only when the first operation is attempted. * @param tableName the name of the table */ - AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName); + AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName); /** - * Retrieve an AsyncTable implementation for accessing a table. + * Retrieve an {@link AsyncTable} implementation for accessing a table. * <p> * This method no longer checks table existence. An exception will be thrown if the table does not * exist only when the first operation is attempted. @@ -82,7 +87,7 @@ public interface AsyncConnection extends Closeable { * @param pool the thread pool to use for executing callback * @return an AsyncTable to use for interactions with this table */ - default AsyncTable getTable(TableName tableName, ExecutorService pool) { + default AsyncTable<ScanResultConsumer> getTable(TableName tableName, ExecutorService pool) { return getTableBuilder(tableName, pool).build(); } @@ -94,7 +99,7 @@ public interface AsyncConnection extends Closeable { * @param tableName the name of the table * @param pool the thread pool to use for executing callback */ - AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool); + AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, ExecutorService pool); /** * Retrieve an {@link AsyncAdmin} implementation to administer an HBase cluster. http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index d5df785..f9f9659 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -97,7 +97,7 @@ class AsyncConnectionImpl implements AsyncConnection { private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>(); private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture = - new AtomicReference<>(); + new AtomicReference<>(); public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, User user) { @@ -108,8 +108,8 @@ class AsyncConnectionImpl implements AsyncConnection { this.rpcClient = RpcClientFactory.createClient(conf, clusterId); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); - this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE, - TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); + this.rpcTimeout = + (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); this.locator = new AsyncRegionLocator(this, RETRY_TIMER); this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { @@ -161,7 +161,7 @@ class AsyncConnectionImpl implements AsyncConnection { return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); } - private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException{ + private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException { return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); } @@ -172,38 +172,37 @@ class AsyncConnectionImpl implements AsyncConnection { } private void makeMasterStub(CompletableFuture<MasterService.Interface> future) { - registry.getMasterAddress().whenComplete( - (sn, error) -> { - if (sn == null) { - String msg = "ZooKeeper available but no active master location found"; - LOG.info(msg); - this.masterStubMakeFuture.getAndSet(null).completeExceptionally( - new MasterNotRunningException(msg)); - return; - } - try { - MasterService.Interface stub = createMasterStub(sn); - HBaseRpcController controller = getRpcController(); - stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(), - new RpcCallback<IsMasterRunningResponse>() { - @Override - public void run(IsMasterRunningResponse resp) { - if (controller.failed() || resp == null - || (resp != null && !resp.getIsMasterRunning())) { - masterStubMakeFuture.getAndSet(null).completeExceptionally( - new MasterNotRunningException("Master connection is not running anymore")); - } else { - masterStub.set(stub); - masterStubMakeFuture.set(null); - future.complete(stub); - } + registry.getMasterAddress().whenComplete((sn, error) -> { + if (sn == null) { + String msg = "ZooKeeper available but no active master location found"; + LOG.info(msg); + this.masterStubMakeFuture.getAndSet(null) + .completeExceptionally(new MasterNotRunningException(msg)); + return; + } + try { + MasterService.Interface stub = createMasterStub(sn); + HBaseRpcController controller = getRpcController(); + stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(), + new RpcCallback<IsMasterRunningResponse>() { + @Override + public void run(IsMasterRunningResponse resp) { + if (controller.failed() || resp == null || + (resp != null && !resp.getIsMasterRunning())) { + masterStubMakeFuture.getAndSet(null).completeExceptionally( + new MasterNotRunningException("Master connection is not running anymore")); + } else { + masterStub.set(stub); + masterStubMakeFuture.set(null); + future.complete(stub); } - }); - } catch (IOException e) { - this.masterStubMakeFuture.getAndSet(null).completeExceptionally( - new IOException("Failed to create async master stub", e)); - } - }); + } + }); + } catch (IOException e) { + this.masterStubMakeFuture.getAndSet(null) + .completeExceptionally(new IOException("Failed to create async master stub", e)); + } + }); } CompletableFuture<MasterService.Interface> getMasterStub() { @@ -231,8 +230,8 @@ class AsyncConnectionImpl implements AsyncConnection { new RpcCallback<IsMasterRunningResponse>() { @Override public void run(IsMasterRunningResponse resp) { - if (controller.failed() || resp == null - || (resp != null && !resp.getIsMasterRunning())) { + if (controller.failed() || resp == null || + (resp != null && !resp.getIsMasterRunning())) { makeMasterStub(future); } else { future.complete(masterStub); @@ -255,22 +254,23 @@ class AsyncConnectionImpl implements AsyncConnection { } @Override - public AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName) { - return new AsyncTableBuilderBase<RawAsyncTable>(tableName, connConf) { + public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) { + return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) { @Override - public RawAsyncTable build() { + public AsyncTable<AdvancedScanResultConsumer> build() { return new RawAsyncTableImpl(AsyncConnectionImpl.this, this); } }; } @Override - public AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool) { - return new AsyncTableBuilderBase<AsyncTable>(tableName, connConf) { + public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, + ExecutorService pool) { + return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) { @Override - public AsyncTable build() { + public AsyncTable<ScanResultConsumer> build() { RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this); return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool); } @@ -293,7 +293,7 @@ class AsyncConnectionImpl implements AsyncConnection { @Override public AsyncAdmin build() { RawAsyncHBaseAdmin rawAdmin = - new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); + new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); return new AsyncHBaseAdmin(rawAdmin, pool); } }; @@ -301,7 +301,7 @@ class AsyncConnectionImpl implements AsyncConnection { @Override public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { - return new AsyncBufferedMutatorBuilderImpl(connConf, getRawTableBuilder(tableName)); + return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName)); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 5a20291..ab529a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable; import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.quotas.QuotaFilter; @@ -56,7 +55,7 @@ import org.apache.yetus.audience.InterfaceAudience; * @see AsyncConnection#getAdminBuilder(ExecutorService) */ @InterfaceAudience.Private -public class AsyncHBaseAdmin implements AsyncAdmin { +class AsyncHBaseAdmin implements AsyncAdmin { private final RawAsyncHBaseAdmin rawAdmin; @@ -705,13 +704,13 @@ public class AsyncHBaseAdmin implements AsyncAdmin { @Override public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable) { + ServiceCaller<S, R> callable) { return wrap(rawAdmin.coprocessorService(stubMaker, callable)); } @Override public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, ServerName serverName) { + ServiceCaller<S, R> callable, ServerName serverName) { return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverName)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 5bead20..2adafb6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY; import static org.apache.hadoop.hbase.HConstants.NINES; import static org.apache.hadoop.hbase.HConstants.ZEROES; -import static org.apache.hadoop.hbase.HRegionInfo.createRegionName; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; +import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName; import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; @@ -45,14 +45,13 @@ import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; /** * The asynchronous locator for regions other than meta. @@ -63,7 +62,7 @@ class AsyncNonMetaRegionLocator { private static final Log LOG = LogFactory.getLog(AsyncNonMetaRegionLocator.class); static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = - "hbase.client.meta.max.concurrent.locate.per.table"; + "hbase.client.meta.max.concurrent.locate.per.table"; private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8; @@ -102,12 +101,12 @@ class AsyncNonMetaRegionLocator { private static final class TableCache { public final ConcurrentNavigableMap<byte[], HRegionLocation> cache = - new ConcurrentSkipListMap<>(BYTES_COMPARATOR); + new ConcurrentSkipListMap<>(BYTES_COMPARATOR); public final Set<LocateRequest> pendingRequests = new HashSet<>(); public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests = - new LinkedHashMap<>(); + new LinkedHashMap<>(); public boolean hasQuota(int max) { return pendingRequests.size() < max; @@ -126,8 +125,8 @@ class AsyncNonMetaRegionLocator { } public void clearCompletedRequests(Optional<HRegionLocation> location) { - for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter = allRequests - .entrySet().iterator(); iter.hasNext();) { + for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter = + allRequests.entrySet().iterator(); iter.hasNext();) { Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next(); if (tryComplete(entry.getKey(), entry.getValue(), location)) { iter.remove(); @@ -146,15 +145,16 @@ class AsyncNonMetaRegionLocator { HRegionLocation loc = location.get(); boolean completed; if (req.locateType.equals(RegionLocateType.BEFORE)) { - // for locating the row before current row, the common case is to find the previous region in - // reverse scan, so we check the endKey first. In general, the condition should be startKey < - // req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row - // && startKey < req.row). The two conditions are equal since startKey < endKey. - int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row); + // for locating the row before current row, the common case is to find the previous region + // in reverse scan, so we check the endKey first. In general, the condition should be + // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row || + // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey < + // endKey. + int c = Bytes.compareTo(loc.getRegion().getEndKey(), req.row); completed = - c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0); + c == 0 || (c > 0 && Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0); } else { - completed = loc.getRegionInfo().containsRow(req.row); + completed = loc.getRegion().containsRow(req.row); } if (completed) { future.complete(loc); @@ -176,13 +176,13 @@ class AsyncNonMetaRegionLocator { } private void removeFromCache(HRegionLocation loc) { - TableCache tableCache = cache.get(loc.getRegionInfo().getTable()); + TableCache tableCache = cache.get(loc.getRegion().getTable()); if (tableCache == null) { return; } - tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> { + tableCache.cache.computeIfPresent(loc.getRegion().getStartKey(), (k, oldLoc) -> { if (oldLoc.getSeqNum() > loc.getSeqNum() || - !oldLoc.getServerName().equals(loc.getServerName())) { + !oldLoc.getServerName().equals(loc.getServerName())) { return oldLoc; } return null; @@ -194,16 +194,16 @@ class AsyncNonMetaRegionLocator { if (LOG.isTraceEnabled()) { LOG.trace("Try adding " + loc + " to cache"); } - byte[] startKey = loc.getRegionInfo().getStartKey(); + byte[] startKey = loc.getRegion().getStartKey(); HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc); if (oldLoc == null) { return true; } if (oldLoc.getSeqNum() > loc.getSeqNum() || - oldLoc.getServerName().equals(loc.getServerName())) { + oldLoc.getServerName().equals(loc.getServerName())) { if (LOG.isTraceEnabled()) { LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc + - " is newer than us or has the same server name"); + " is newer than us or has the same server name"); } return false; } @@ -213,8 +213,8 @@ class AsyncNonMetaRegionLocator { } if (LOG.isTraceEnabled()) { LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue + - " is newer than us or has the same server name." + - " Maybe it is updated before we replace it"); + " is newer than us or has the same server name." + + " Maybe it is updated before we replace it"); } return oldValue; }); @@ -223,7 +223,7 @@ class AsyncNonMetaRegionLocator { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", justification = "Called by lambda expression") private void addToCache(HRegionLocation loc) { - addToCache(getTableCache(loc.getRegionInfo().getTable()), loc); + addToCache(getTableCache(loc.getRegion().getTable()), loc); if (LOG.isTraceEnabled()) { LOG.trace("Try adding " + loc + " to cache"); } @@ -232,9 +232,8 @@ class AsyncNonMetaRegionLocator { private void complete(TableName tableName, LocateRequest req, HRegionLocation loc, Throwable error) { if (error != null) { - LOG.warn( - "Failed to locate region in '" + tableName + "', row='" + Bytes.toStringBinary(req.row) - + "', locateType=" + req.locateType, error); + LOG.warn("Failed to locate region in '" + tableName + "', row='" + + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error); } Optional<LocateRequest> toSend = Optional.empty(); TableCache tableCache = getTableCache(tableName); @@ -283,7 +282,7 @@ class AsyncNonMetaRegionLocator { RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0)); if (LOG.isDebugEnabled()) { LOG.debug("The fetched location of '" + tableName + "', row='" + - Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType + " is " + locs); + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType + " is " + locs); } if (locs == null || locs.getDefaultRegionLocation() == null) { complete(tableName, req, null, @@ -292,7 +291,7 @@ class AsyncNonMetaRegionLocator { return; } HRegionLocation loc = locs.getDefaultRegionLocation(); - HRegionInfo info = loc.getRegionInfo(); + RegionInfo info = loc.getRegion(); if (info == null) { complete(tableName, req, null, new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", @@ -308,12 +307,12 @@ class AsyncNonMetaRegionLocator { complete(tableName, req, null, new RegionOfflineException( "the only available region for the required row is a split parent," + - " the daughters should be online soon: '" + info.getRegionNameAsString() + "'")); + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'")); return; } if (info.isOffline()) { complete(tableName, req, null, new RegionOfflineException("the region is offline, could" + - " be caused by a disable table call: '" + info.getRegionNameAsString() + "'")); + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'")); return; } if (loc.getServerName() == null) { @@ -332,11 +331,11 @@ class AsyncNonMetaRegionLocator { return null; } HRegionLocation loc = entry.getValue(); - byte[] endKey = loc.getRegionInfo().getEndKey(); + byte[] endKey = loc.getRegion().getEndKey(); if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { if (LOG.isTraceEnabled()) { LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + - Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT); + Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT); } return loc; } else { @@ -347,16 +346,16 @@ class AsyncNonMetaRegionLocator { private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName, byte[] row) { Map.Entry<byte[], HRegionLocation> entry = - isEmptyStopRow(row) ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); + isEmptyStopRow(row) ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); if (entry == null) { return null; } HRegionLocation loc = entry.getValue(); - if (isEmptyStopRow(loc.getRegionInfo().getEndKey()) || - Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) { + if (isEmptyStopRow(loc.getRegion().getEndKey()) || + Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0) { if (LOG.isTraceEnabled()) { LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + - Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE); + Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE); } return loc; } else { @@ -367,7 +366,7 @@ class AsyncNonMetaRegionLocator { private void locateInMeta(TableName tableName, LocateRequest req) { if (LOG.isTraceEnabled()) { LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + - "', locateType=" + req.locateType + " in meta"); + "', locateType=" + req.locateType + " in meta"); } byte[] metaKey; if (req.locateType.equals(RegionLocateType.BEFORE)) { @@ -380,7 +379,7 @@ class AsyncNonMetaRegionLocator { } else { metaKey = createRegionName(tableName, req.row, NINES, false); } - conn.getRawTable(META_TABLE_NAME) + conn.getTable(META_TABLE_NAME) .scanAll(new Scan().withStartRow(metaKey).setReversed(true).addFamily(CATALOG_FAMILY) .setOneRowLimit()) .whenComplete((results, error) -> onScanComplete(tableName, req, results, error)); @@ -389,8 +388,8 @@ class AsyncNonMetaRegionLocator { private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row, RegionLocateType locateType) { return locateType.equals(RegionLocateType.BEFORE) - ? locateRowBeforeInCache(tableCache, tableName, row) - : locateRowInCache(tableCache, tableName, row); + ? locateRowBeforeInCache(tableCache, tableName, row) + : locateRowInCache(tableCache, tableName, row); } // locateToPrevious is true means we will use the start key of a region to locate the region @@ -451,11 +450,11 @@ class AsyncNonMetaRegionLocator { void updateCachedLocation(HRegionLocation loc, Throwable exception) { AsyncRegionLocator.updateCachedLocation(loc, exception, l -> { - TableCache tableCache = cache.get(l.getRegionInfo().getTable()); + TableCache tableCache = cache.get(l.getRegion().getTable()); if (tableCache == null) { return null; } - return tableCache.cache.get(l.getRegionInfo().getStartKey()); + return tableCache.cache.get(l.getRegion().getStartKey()); }, this::addToCache, this::removeFromCache); } http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 9c45883..5eceb2d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -153,7 +153,7 @@ class AsyncRpcRetryingCallerFactory { private ScanResultCache resultCache; - private RawScanResultConsumer consumer; + private AdvancedScanResultConsumer consumer; private ClientService.Interface stub; @@ -192,7 +192,7 @@ class AsyncRpcRetryingCallerFactory { return this; } - public ScanSingleRegionCallerBuilder consumer(RawScanResultConsumer consumer) { + public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) { this.consumer = consumer; return this; } http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index ec21275..51c243a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -28,14 +28,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; - -import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer; -import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout; - import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -47,13 +41,18 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.RawScanResultConsumer.ScanResumer; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer; +import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -61,7 +60,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientServ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Retry caller for scanning a region. @@ -84,7 +82,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final ScanResultCache resultCache; - private final RawScanResultConsumer consumer; + private final AdvancedScanResultConsumer consumer; private final ClientService.Interface stub; @@ -143,7 +141,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { // Notice that, the public methods of this class is supposed to be called by upper layer only, and // package private methods can only be called within the implementation of // AsyncScanSingleRegionRpcRetryingCaller. - private final class ScanControllerImpl implements RawScanResultConsumer.ScanController { + private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController { // Make sure the methods are only called in this thread. private final Thread callerThread; @@ -217,7 +215,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { // Notice that, the public methods of this class is supposed to be called by upper layer only, and // package private methods can only be called within the implementation of // AsyncScanSingleRegionRpcRetryingCaller. - private final class ScanResumerImpl implements RawScanResultConsumer.ScanResumer { + private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { // INITIALIZED -> SUSPENDED -> RESUMED // INITIALIZED -> RESUMED @@ -301,7 +299,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId, - ScanResultCache resultCache, RawScanResultConsumer consumer, Interface stub, + ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; @@ -344,8 +342,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { stub.scan(controller, req, resp -> { if (controller.failed()) { LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId + - " for " + loc.getRegionInfo().getEncodedName() + " of " + - loc.getRegionInfo().getTable() + " failed, ignore, probably already closed", + " for " + loc.getRegion().getEncodedName() + " of " + + loc.getRegion().getTable() + " failed, ignore, probably already closed", controller.getFailed()); } }); @@ -384,7 +382,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { error = translateException(error); if (tries > startLogErrorsCnt) { LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " + - loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() + + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() + " ms", @@ -433,18 +431,18 @@ class AsyncScanSingleRegionRpcRetryingCaller { } private void completeWhenNoMoreResultsInRegion() { - if (noMoreResultsForScan(scan, loc.getRegionInfo())) { + if (noMoreResultsForScan(scan, loc.getRegion())) { completeNoMoreResults(); } else { - completeWithNextStartRow(loc.getRegionInfo().getEndKey(), true); + completeWithNextStartRow(loc.getRegion().getEndKey(), true); } } private void completeReversedWhenNoMoreResultsInRegion() { - if (noMoreResultsForReverseScan(scan, loc.getRegionInfo())) { + if (noMoreResultsForReverseScan(scan, loc.getRegion())) { completeNoMoreResults(); } else { - completeWithNextStartRow(loc.getRegionInfo().getStartKey(), false); + completeWithNextStartRow(loc.getRegion().getStartKey(), false); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index d448e5a..ddedc3b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -68,7 +68,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> { } catch (IOException e) { onError(e, () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) - + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed", + + "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed", err -> conn.getLocator().updateCachedLocation(loc, err)); return; } @@ -78,7 +78,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> { if (error != null) { onError(error, () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " - + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed", + + loc.getRegion().getEncodedName() + " of " + tableName + " failed", err -> conn.getLocator().updateCachedLocation(loc, err)); return; } http://git-wip-us.apache.org/repos/asf/hbase/blob/bc3542c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 0c72c14..b3ccb15 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -17,20 +17,277 @@ */ package org.apache.hadoop.hbase.client; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf; +import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; + +import com.google.protobuf.RpcChannel; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; + /** - * The asynchronous table for normal users. + * The interface for asynchronous version of Table. Obtain an instance from a + * {@link AsyncConnection}. * <p> * The implementation is required to be thread safe. * <p> - * The implementation should make sure that user can do everything they want to the returned - * {@code CompletableFuture} without breaking anything. Usually the implementation will require user - * to provide a {@code ExecutorService}. + * Usually the implementation will not throw any exception directly. You need to get the exception + * from the returned {@link CompletableFuture}. * @since 2.0.0 */ @InterfaceAudience.Public -public interface AsyncTable extends AsyncTableBase { +public interface AsyncTable<C extends ScanResultConsumerBase> { + + /** + * Gets the fully qualified table name instance of this table. + */ + TableName getName(); + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + * <p> + * The reference returned is not a copy, so any change made to it will affect this instance. + */ + Configuration getConfiguration(); + + /** + * Get timeout of each rpc request in this Table instance. It will be overridden by a more + * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. + * @see #getReadRpcTimeout(TimeUnit) + * @see #getWriteRpcTimeout(TimeUnit) + * @param unit the unit of time the timeout to be represented in + * @return rpc timeout in the specified time unit + */ + long getRpcTimeout(TimeUnit unit); + + /** + * Get timeout of each rpc read request in this Table instance. + * @param unit the unit of time the timeout to be represented in + * @return read rpc timeout in the specified time unit + */ + long getReadRpcTimeout(TimeUnit unit); + + /** + * Get timeout of each rpc write request in this Table instance. + * @param unit the unit of time the timeout to be represented in + * @return write rpc timeout in the specified time unit + */ + long getWriteRpcTimeout(TimeUnit unit); + + /** + * Get timeout of each operation in Table instance. + * @param unit the unit of time the timeout to be represented in + * @return operation rpc timeout in the specified time unit + */ + long getOperationTimeout(TimeUnit unit); + + /** + * Get the timeout of a single operation in a scan. It works like operation timeout for other + * operations. + * @param unit the unit of time the timeout to be represented in + * @return scan rpc timeout in the specified time unit + */ + long getScanTimeout(TimeUnit unit); + + /** + * Test for the existence of columns in the table, as specified by the Get. + * <p> + * This will return true if the Get matches one or more keys, false if not. + * <p> + * This is a server-side call so it prevents any data from being transfered to the client. + * @return true if the specified Get matches one or more keys, false if not. The return value will + * be wrapped by a {@link CompletableFuture}. + */ + default CompletableFuture<Boolean> exists(Get get) { + return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists()); + } + + /** + * Extracts certain cells from a given row. + * @param get The object that specifies what data to fetch and from which row. + * @return The data coming from the specified row, if it exists. If the row specified doesn't + * exist, the {@link Result} instance returned won't contain any + * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The + * return value will be wrapped by a {@link CompletableFuture}. + */ + CompletableFuture<Result> get(Get get); + + /** + * Puts some data to the table. + * @param put The data to put. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + CompletableFuture<Void> put(Put put); + + /** + * Deletes the specified cells/row. + * @param delete The object that specifies what to delete. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + CompletableFuture<Void> delete(Delete delete); + + /** + * Appends values to one or more columns within a single row. + * <p> + * This operation does not appear atomic to readers. Appends are done under a single row lock, so + * write operations to a row are synchronized, but readers do not take row locks so get and scan + * operations can see this operation partially completed. + * @param append object that specifies the columns and amounts to be used for the increment + * operations + * @return values of columns after the append operation (maybe null). The return value will be + * wrapped by a {@link CompletableFuture}. + */ + CompletableFuture<Result> append(Append append); + + /** + * Increments one or more columns within a single row. + * <p> + * This operation does not appear atomic to readers. Increments are done under a single row lock, + * so write operations to a row are synchronized, but readers do not take row locks so get and + * scan operations can see this operation partially completed. + * @param increment object that specifies the columns and amounts to be used for the increment + * operations + * @return values of columns after the increment. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture<Result> increment(Increment increment); + + /** + * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} + * <p> + * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. + * @param row The row that contains the cell to increment. + * @param family The column family of the cell to increment. + * @param qualifier The column qualifier of the cell to increment. + * @param amount The amount to increment the cell with (or decrement, if the amount is negative). + * @return The new value, post increment. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount) { + return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); + } + + /** + * Atomically increments a column value. If the column value already exists and is not a + * big-endian long, this could throw an exception. If the column value does not yet exist it is + * initialized to <code>amount</code> and written to the specified column. + * <p> + * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose + * any increments that have not been flushed. + * @param row The row that contains the cell to increment. + * @param family The column family of the cell to increment. + * @param qualifier The column qualifier of the cell to increment. + * @param amount The amount to increment the cell with (or decrement, if the amount is negative). + * @param durability The persistence guarantee for this increment. + * @return The new value, post increment. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount, Durability durability) { + Preconditions.checkNotNull(row, "row is null"); + Preconditions.checkNotNull(family, "family is null"); + return increment( + new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) + .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * adds the Put/Delete/RowMutations. + * <p> + * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it. + * This is a fluent style API, the code is like: + * + * <pre> + * <code> + * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put) + * .thenAccept(succ -> { + * if (succ) { + * System.out.println("Check and put succeeded"); + * } else { + * System.out.println("Check and put failed"); + * } + * }); + * </code> + * </pre> + */ + CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); + + /** + * A helper class for sending checkAndMutate request. + */ + interface CheckAndMutateBuilder { + + /** + * @param qualifier column qualifier to check. + */ + CheckAndMutateBuilder qualifier(byte[] qualifier); + + /** + * Check for lack of column. + */ + CheckAndMutateBuilder ifNotExists(); + + default CheckAndMutateBuilder ifEquals(byte[] value) { + return ifMatches(CompareOperator.EQUAL, value); + } + + /** + * @param compareOp comparison operator to use + * @param value the expected value + */ + CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value); + + /** + * @param put data to put if check succeeds + * @return {@code true} if the new put was executed, {@code false} otherwise. The return value + * will be wrapped by a {@link CompletableFuture}. + */ + CompletableFuture<Boolean> thenPut(Put put); + + /** + * @param delete data to delete if check succeeds + * @return {@code true} if the new delete was executed, {@code false} otherwise. The return + * value will be wrapped by a {@link CompletableFuture}. + */ + CompletableFuture<Boolean> thenDelete(Delete delete); + + /** + * @param mutation mutations to perform if check succeeds + * @return true if the new mutation was executed, false otherwise. The return value will be + * wrapped by a {@link CompletableFuture}. + */ + CompletableFuture<Boolean> thenMutate(RowMutations mutation); + } + + /** + * Performs multiple mutations atomically on a single row. Currently {@link Put} and + * {@link Delete} are supported. + * @param mutation object that specifies the set of mutations to perform atomically + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + CompletableFuture<Void> mutateRow(RowMutations mutation); + + /** + * The scan API uses the observer pattern. + * @param scan A configured {@link Scan} object. + * @param consumer the consumer used to receive results. + * @see ScanResultConsumer + * @see AdvancedScanResultConsumer + */ + void scan(Scan scan, C consumer); /** * Gets a scanner on the current table for the given family. @@ -59,13 +316,300 @@ public interface AsyncTable extends AsyncTableBase { ResultScanner getScanner(Scan scan); /** - * The scan API uses the observer pattern. All results that match the given scan object will be - * passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result)}. - * {@link ScanResultConsumer#onComplete()} means the scan is finished, and - * {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan - * is terminated. - * @param scan A configured {@link Scan} object. - * @param consumer the consumer used to receive results. + * Return all the results that match the given scan object. + * <p> + * Notice that usually you should use this method with a {@link Scan} object that has limit set. + * For example, if you want to get the closest row after a given row, you could do this: + * <p> + * + * <pre> + * <code> + * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> { + * if (results.isEmpty()) { + * System.out.println("No row after " + Bytes.toStringBinary(row)); + * } else { + * System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is " + * + Bytes.toStringBinary(results.stream().findFirst().get().getRow())); + * } + * }); + * </code> + * </pre> + * <p> + * If your result set is very large, you should use other scan method to get a scanner or use + * callback to process the results. They will do chunking to prevent OOM. The scanAll method will + * fetch all the results and store them in a List and then return the list to you. + * <p> + * The scan metrics will be collected background if you enable it but you have no way to get it. + * Usually you can get scan metrics from {@code ResultScanner}, or through + * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results. + * So if you really care about scan metrics then you'd better use other scan methods which return + * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no + * performance difference between these scan methods so do not worry. + * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large + * result set, it is likely to cause OOM. + * @return The results of this small scan operation. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture<List<Result>> scanAll(Scan scan); + + /** + * Test for the existence of columns in the table, as specified by the Gets. + * <p> + * This will return a list of booleans. Each value will be true if the related Get matches one or + * more keys, false if not. + * <p> + * This is a server-side call so it prevents any data from being transferred to the client. + * @param gets the Gets + * @return A list of {@link CompletableFuture}s that represent the existence for each get. + */ + default List<CompletableFuture<Boolean>> exists(List<Get> gets) { + return get(toCheckExistenceOnly(gets)).stream() + .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); + } + + /** + * A simple version for batch exists. It will fail if there are any failures and you will get the + * whole result boolean list at once if the operation is succeeded. + * @param gets the Gets + * @return A {@link CompletableFuture} that wrapper the result boolean list. + */ + default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) { + return allOf(exists(gets)); + } + + /** + * Extracts certain cells from the given rows, in batch. + * <p> + * Notice that you may not get all the results with this function, which means some of the + * returned {@link CompletableFuture}s may succeed while some of the other returned + * {@link CompletableFuture}s may fail. + * @param gets The objects that specify what data to fetch and from which rows. + * @return A list of {@link CompletableFuture}s that represent the result for each get. + */ + List<CompletableFuture<Result>> get(List<Get> gets); + + /** + * A simple version for batch get. It will fail if there are any failures and you will get the + * whole result list at once if the operation is succeeded. + * @param gets The objects that specify what data to fetch and from which rows. + * @return A {@link CompletableFuture} that wrapper the result list. + */ + default CompletableFuture<List<Result>> getAll(List<Get> gets) { + return allOf(get(gets)); + } + + /** + * Puts some data in the table, in batch. + * @param puts The list of mutations to apply. + * @return A list of {@link CompletableFuture}s that represent the result for each put. + */ + List<CompletableFuture<Void>> put(List<Put> puts); + + /** + * A simple version of batch put. It will fail if there are any failures. + * @param puts The list of mutations to apply. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + default CompletableFuture<Void> putAll(List<Put> puts) { + return allOf(put(puts)).thenApply(r -> null); + } + + /** + * Deletes the specified cells/rows in bulk. + * @param deletes list of things to delete. + * @return A list of {@link CompletableFuture}s that represent the result for each delete. + */ + List<CompletableFuture<Void>> delete(List<Delete> deletes); + + /** + * A simple version of batch delete. It will fail if there are any failures. + * @param deletes list of things to delete. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + default CompletableFuture<Void> deleteAll(List<Delete> deletes) { + return allOf(delete(deletes)).thenApply(r -> null); + } + + /** + * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of + * execution of the actions is not defined. Meaning if you do a Put and a Get in the same + * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put + * had put. + * @param actions list of Get, Put, Delete, Increment, Append objects + * @return A list of {@link CompletableFuture}s that represent the result for each action. + */ + <T> List<CompletableFuture<T>> batch(List<? extends Row> actions); + + /** + * A simple version of batch. It will fail if there are any failures and you will get the whole + * result list at once if the operation is succeeded. + * @param actions list of Get, Put, Delete, Increment, Append objects + * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. + */ + default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { + return allOf(batch(actions)); + } + + /** + * Execute the given coprocessor call on the region which contains the given {@code row}. + * <p> + * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a + * one line lambda expression, like: + * + * <pre> + * <code> + * channel -> xxxService.newStub(channel) + * </code> + * </pre> + * + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link ServiceCaller} for more details. + * @param row The row key used to identify the remote region location + * @param <S> the type of the asynchronous stub + * @param <R> the type of the return value + * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. + * @see ServiceCaller + */ + <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, + ServiceCaller<S, R> callable, byte[] row); + + /** + * The callback when we want to execute a coprocessor call on a range of regions. + * <p> + * As the locating itself also takes some time, the implementation may want to send rpc calls on + * the fly, which means we do not know how many regions we have when we get the return value of + * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have + * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)} + * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no + * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)} + * calls in the future. + * <p> + * Here is a pseudo code to describe a typical implementation of a range coprocessor service + * method to help you better understand how the {@link CoprocessorCallback} will be called. The + * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the + * {@code whenComplete} is {@code CompletableFuture.whenComplete}. + * + * <pre> + * locateThenCall(byte[] row) { + * locate(row).whenComplete((location, locateError) -> { + * if (locateError != null) { + * callback.onError(locateError); + * return; + * } + * incPendingCall(); + * region = location.getRegion(); + * if (region.getEndKey() > endKey) { + * locateEnd = true; + * } else { + * locateThenCall(region.getEndKey()); + * } + * sendCall().whenComplete((resp, error) -> { + * if (error != null) { + * callback.onRegionError(region, error); + * } else { + * callback.onRegionComplete(region, resp); + * } + * if (locateEnd && decPendingCallAndGet() == 0) { + * callback.onComplete(); + * } + * }); + * }); + * } + * </pre> + */ + @InterfaceAudience.Public + interface CoprocessorCallback<R> { + + /** + * @param region the region that the response belongs to + * @param resp the response of the coprocessor call + */ + void onRegionComplete(RegionInfo region, R resp); + + /** + * @param region the region that the error belongs to + * @param error the response error of the coprocessor call + */ + void onRegionError(RegionInfo region, Throwable error); + + /** + * Indicate that all responses of the regions have been notified by calling + * {@link #onRegionComplete(RegionInfo, Object)} or + * {@link #onRegionError(RegionInfo, Throwable)}. + */ + void onComplete(); + + /** + * Indicate that we got an error which does not belong to any regions. Usually a locating error. + */ + void onError(Throwable error); + } + + /** + * Helper class for sending coprocessorService request that executes a coprocessor call on regions + * which are covered by a range. + * <p> + * If {@code fromRow} is not specified the selection will start with the first table region. If + * {@code toRow} is not specified the selection will continue through the last table region. + * @param <S> the type of the protobuf Service you want to call. + * @param <R> the type of the return value. + */ + interface CoprocessorServiceBuilder<S, R> { + + /** + * @param startKey start region selection with region containing this row, inclusive. + */ + default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) { + return fromRow(startKey, true); + } + + /** + * @param startKey start region selection with region containing this row + * @param inclusive whether to include the startKey + */ + CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive); + + /** + * @param endKey select regions up to and including the region containing this row, exclusive. + */ + default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) { + return toRow(endKey, false); + } + + /** + * @param endKey select regions up to and including the region containing this row + * @param inclusive whether to include the endKey + */ + CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive); + + /** + * Execute the coprocessorService request. You can get the response through the + * {@link CoprocessorCallback}. + */ + void execute(); + } + + /** + * Execute a coprocessor call on the regions which are covered by a range. + * <p> + * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it. + * <p> + * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it + * is only a one line lambda expression, like: + * + * <pre> + * <code> + * channel -> xxxService.newStub(channel) + * </code> + * </pre> + * + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link ServiceCaller} for more details. + * @param callback callback to get the response. See the comment of {@link CoprocessorCallback} + * for more details. */ - void scan(Scan scan, ScanResultConsumer consumer); + <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, + ServiceCaller<S, R> callable, CoprocessorCallback<R> callback); }