http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java deleted file mode 100644 index 7d24c4f..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ /dev/null @@ -1,414 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static java.util.stream.Collectors.toList; -import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf; -import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; - -import org.apache.hadoop.hbase.CompareOperator; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * The base interface for asynchronous version of Table. Obtain an instance from a - * {@link AsyncConnection}. - * <p> - * The implementation is required to be thread safe. - * <p> - * Usually the implementation will not throw any exception directly. You need to get the exception - * from the returned {@link CompletableFuture}. - * @since 2.0.0 - */ -@InterfaceAudience.Public -public interface AsyncTableBase { - - /** - * Gets the fully qualified table name instance of this table. - */ - TableName getName(); - - /** - * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. - * <p> - * The reference returned is not a copy, so any change made to it will affect this instance. - */ - Configuration getConfiguration(); - - /** - * Get timeout of each rpc request in this Table instance. It will be overridden by a more - * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. - * @see #getReadRpcTimeout(TimeUnit) - * @see #getWriteRpcTimeout(TimeUnit) - * @param unit the unit of time the timeout to be represented in - * @return rpc timeout in the specified time unit - */ - long getRpcTimeout(TimeUnit unit); - - /** - * Get timeout of each rpc read request in this Table instance. - * @param unit the unit of time the timeout to be represented in - * @return read rpc timeout in the specified time unit - */ - long getReadRpcTimeout(TimeUnit unit); - - /** - * Get timeout of each rpc write request in this Table instance. - * @param unit the unit of time the timeout to be represented in - * @return write rpc timeout in the specified time unit - */ - long getWriteRpcTimeout(TimeUnit unit); - - /** - * Get timeout of each operation in Table instance. - * @param unit the unit of time the timeout to be represented in - * @return operation rpc timeout in the specified time unit - */ - long getOperationTimeout(TimeUnit unit); - - /** - * Get the timeout of a single operation in a scan. It works like operation timeout for other - * operations. - * @param unit the unit of time the timeout to be represented in - * @return scan rpc timeout in the specified time unit - */ - long getScanTimeout(TimeUnit unit); - - /** - * Test for the existence of columns in the table, as specified by the Get. - * <p> - * This will return true if the Get matches one or more keys, false if not. - * <p> - * This is a server-side call so it prevents any data from being transfered to the client. - * @return true if the specified Get matches one or more keys, false if not. The return value will - * be wrapped by a {@link CompletableFuture}. - */ - default CompletableFuture<Boolean> exists(Get get) { - return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists()); - } - - /** - * Extracts certain cells from a given row. - * @param get The object that specifies what data to fetch and from which row. - * @return The data coming from the specified row, if it exists. If the row specified doesn't - * exist, the {@link Result} instance returned won't contain any - * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The - * return value will be wrapped by a {@link CompletableFuture}. - */ - CompletableFuture<Result> get(Get get); - - /** - * Puts some data to the table. - * @param put The data to put. - * @return A {@link CompletableFuture} that always returns null when complete normally. - */ - CompletableFuture<Void> put(Put put); - - /** - * Deletes the specified cells/row. - * @param delete The object that specifies what to delete. - * @return A {@link CompletableFuture} that always returns null when complete normally. - */ - CompletableFuture<Void> delete(Delete delete); - - /** - * Appends values to one or more columns within a single row. - * <p> - * This operation does not appear atomic to readers. Appends are done under a single row lock, so - * write operations to a row are synchronized, but readers do not take row locks so get and scan - * operations can see this operation partially completed. - * @param append object that specifies the columns and amounts to be used for the increment - * operations - * @return values of columns after the append operation (maybe null). The return value will be - * wrapped by a {@link CompletableFuture}. - */ - CompletableFuture<Result> append(Append append); - - /** - * Increments one or more columns within a single row. - * <p> - * This operation does not appear atomic to readers. Increments are done under a single row lock, - * so write operations to a row are synchronized, but readers do not take row locks so get and - * scan operations can see this operation partially completed. - * @param increment object that specifies the columns and amounts to be used for the increment - * operations - * @return values of columns after the increment. The return value will be wrapped by a - * {@link CompletableFuture}. - */ - CompletableFuture<Result> increment(Increment increment); - - /** - * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} - * <p> - * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. - * @param row The row that contains the cell to increment. - * @param family The column family of the cell to increment. - * @param qualifier The column qualifier of the cell to increment. - * @param amount The amount to increment the cell with (or decrement, if the amount is negative). - * @return The new value, post increment. The return value will be wrapped by a - * {@link CompletableFuture}. - */ - default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount) { - return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); - } - - /** - * Atomically increments a column value. If the column value already exists and is not a - * big-endian long, this could throw an exception. If the column value does not yet exist it is - * initialized to <code>amount</code> and written to the specified column. - * <p> - * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose - * any increments that have not been flushed. - * @param row The row that contains the cell to increment. - * @param family The column family of the cell to increment. - * @param qualifier The column qualifier of the cell to increment. - * @param amount The amount to increment the cell with (or decrement, if the amount is negative). - * @param durability The persistence guarantee for this increment. - * @return The new value, post increment. The return value will be wrapped by a - * {@link CompletableFuture}. - */ - default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount, Durability durability) { - Preconditions.checkNotNull(row, "row is null"); - Preconditions.checkNotNull(family, "family is null"); - return increment( - new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) - .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); - } - - /** - * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it - * adds the Put/Delete/RowMutations. - * <p> - * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it. - * This is a fluent style API, the code is like: - * - * <pre> - * <code> - * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put) - * .thenAccept(succ -> { - * if (succ) { - * System.out.println("Check and put succeeded"); - * } else { - * System.out.println("Check and put failed"); - * } - * }); - * </code> - * </pre> - */ - CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); - - /** - * A helper class for sending checkAndMutate request. - */ - interface CheckAndMutateBuilder { - - /** - * @param qualifier column qualifier to check. - */ - CheckAndMutateBuilder qualifier(byte[] qualifier); - - /** - * Check for lack of column. - */ - CheckAndMutateBuilder ifNotExists(); - - default CheckAndMutateBuilder ifEquals(byte[] value) { - return ifMatches(CompareOperator.EQUAL, value); - } - - /** - * @param compareOp comparison operator to use - * @param value the expected value - */ - CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value); - - /** - * @param put data to put if check succeeds - * @return {@code true} if the new put was executed, {@code false} otherwise. The return value - * will be wrapped by a {@link CompletableFuture}. - */ - CompletableFuture<Boolean> thenPut(Put put); - - /** - * @param delete data to delete if check succeeds - * @return {@code true} if the new delete was executed, {@code false} otherwise. The return - * value will be wrapped by a {@link CompletableFuture}. - */ - CompletableFuture<Boolean> thenDelete(Delete delete); - - /** - * @param mutation mutations to perform if check succeeds - * @return true if the new mutation was executed, false otherwise. The return value will be - * wrapped by a {@link CompletableFuture}. - */ - CompletableFuture<Boolean> thenMutate(RowMutations mutation); - } - - /** - * Performs multiple mutations atomically on a single row. Currently {@link Put} and - * {@link Delete} are supported. - * @param mutation object that specifies the set of mutations to perform atomically - * @return A {@link CompletableFuture} that always returns null when complete normally. - */ - CompletableFuture<Void> mutateRow(RowMutations mutation); - - /** - * Return all the results that match the given scan object. - * <p> - * Notice that usually you should use this method with a {@link Scan} object that has limit set. - * For example, if you want to get the closest row after a given row, you could do this: - * <p> - * - * <pre> - * <code> - * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> { - * if (results.isEmpty()) { - * System.out.println("No row after " + Bytes.toStringBinary(row)); - * } else { - * System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is " - * + Bytes.toStringBinary(results.stream().findFirst().get().getRow())); - * } - * }); - * </code> - * </pre> - * <p> - * If your result set is very large, you should use other scan method to get a scanner or use - * callback to process the results. They will do chunking to prevent OOM. The scanAll method will - * fetch all the results and store them in a List and then return the list to you. - * <p> - * The scan metrics will be collected background if you enable it but you have no way to get it. - * Usually you can get scan metrics from {@code ResultScanner}, or through - * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results. - * So if you really care about scan metrics then you'd better use other scan methods which return - * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no - * performance difference between these scan methods so do not worry. - * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large - * result set, it is likely to cause OOM. - * @return The results of this small scan operation. The return value will be wrapped by a - * {@link CompletableFuture}. - */ - CompletableFuture<List<Result>> scanAll(Scan scan); - - /** - * Test for the existence of columns in the table, as specified by the Gets. - * <p> - * This will return a list of booleans. Each value will be true if the related Get matches one or - * more keys, false if not. - * <p> - * This is a server-side call so it prevents any data from being transferred to the client. - * @param gets the Gets - * @return A list of {@link CompletableFuture}s that represent the existence for each get. - */ - default List<CompletableFuture<Boolean>> exists(List<Get> gets) { - return get(toCheckExistenceOnly(gets)).stream() - .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); - } - - /** - * A simple version for batch exists. It will fail if there are any failures and you will get the - * whole result boolean list at once if the operation is succeeded. - * @param gets the Gets - * @return A {@link CompletableFuture} that wrapper the result boolean list. - */ - default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) { - return allOf(exists(gets)); - } - - /** - * Extracts certain cells from the given rows, in batch. - * <p> - * Notice that you may not get all the results with this function, which means some of the - * returned {@link CompletableFuture}s may succeed while some of the other returned - * {@link CompletableFuture}s may fail. - * @param gets The objects that specify what data to fetch and from which rows. - * @return A list of {@link CompletableFuture}s that represent the result for each get. - */ - List<CompletableFuture<Result>> get(List<Get> gets); - - /** - * A simple version for batch get. It will fail if there are any failures and you will get the - * whole result list at once if the operation is succeeded. - * @param gets The objects that specify what data to fetch and from which rows. - * @return A {@link CompletableFuture} that wrapper the result list. - */ - default CompletableFuture<List<Result>> getAll(List<Get> gets) { - return allOf(get(gets)); - } - - /** - * Puts some data in the table, in batch. - * @param puts The list of mutations to apply. - * @return A list of {@link CompletableFuture}s that represent the result for each put. - */ - List<CompletableFuture<Void>> put(List<Put> puts); - - /** - * A simple version of batch put. It will fail if there are any failures. - * @param puts The list of mutations to apply. - * @return A {@link CompletableFuture} that always returns null when complete normally. - */ - default CompletableFuture<Void> putAll(List<Put> puts) { - return allOf(put(puts)).thenApply(r -> null); - } - - /** - * Deletes the specified cells/rows in bulk. - * @param deletes list of things to delete. - * @return A list of {@link CompletableFuture}s that represent the result for each delete. - */ - List<CompletableFuture<Void>> delete(List<Delete> deletes); - - /** - * A simple version of batch delete. It will fail if there are any failures. - * @param deletes list of things to delete. - * @return A {@link CompletableFuture} that always returns null when complete normally. - */ - default CompletableFuture<Void> deleteAll(List<Delete> deletes) { - return allOf(delete(deletes)).thenApply(r -> null); - } - - /** - * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of - * execution of the actions is not defined. Meaning if you do a Put and a Get in the same - * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put - * had put. - * @param actions list of Get, Put, Delete, Increment, Append objects - * @return A list of {@link CompletableFuture}s that represent the result for each action. - */ - <T> List<CompletableFuture<T>> batch(List<? extends Row> actions); - - /** - * A simple version of batch. It will fail if there are any failures and you will get the whole - * result list at once if the operation is succeeded. - * @param actions list of Get, Put, Delete, Increment, Append objects - * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. - */ - default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { - return allOf(batch(actions)); - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index 9c5b092..6632ad5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.yetus.audience.InterfaceAudience; /** - * For creating {@link AsyncTable} or {@link RawAsyncTable}. + * For creating {@link AsyncTable}. * <p> * The implementation should have default configurations set before returning the builder to user. * So users are free to only set the configs they care about to create a new @@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience; * @since 2.0.0 */ @InterfaceAudience.Public -public interface AsyncTableBuilder<T extends AsyncTableBase> { +public interface AsyncTableBuilder<C extends ScanResultConsumerBase> { /** * Set timeout for a whole operation such as get, put or delete. Notice that scan will not be @@ -44,7 +44,7 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> { * @see #setMaxRetries(int) * @see #setScanTimeout(long, TimeUnit) */ - AsyncTableBuilder<T> setOperationTimeout(long timeout, TimeUnit unit); + AsyncTableBuilder<C> setOperationTimeout(long timeout, TimeUnit unit); /** * As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is @@ -53,7 +53,7 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> { * operation in a scan, such as openScanner or next. * @see #setScanTimeout(long, TimeUnit) */ - AsyncTableBuilder<T> setScanTimeout(long timeout, TimeUnit unit); + AsyncTableBuilder<C> setScanTimeout(long timeout, TimeUnit unit); /** * Set timeout for each rpc request. @@ -61,23 +61,23 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> { * Notice that this will <strong>NOT</strong> change the rpc timeout for read(get, scan) request * and write request(put, delete). */ - AsyncTableBuilder<T> setRpcTimeout(long timeout, TimeUnit unit); + AsyncTableBuilder<C> setRpcTimeout(long timeout, TimeUnit unit); /** * Set timeout for each read(get, scan) rpc request. */ - AsyncTableBuilder<T> setReadRpcTimeout(long timeout, TimeUnit unit); + AsyncTableBuilder<C> setReadRpcTimeout(long timeout, TimeUnit unit); /** * Set timeout for each write(put, delete) rpc request. */ - AsyncTableBuilder<T> setWriteRpcTimeout(long timeout, TimeUnit unit); + AsyncTableBuilder<C> setWriteRpcTimeout(long timeout, TimeUnit unit); /** * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. */ - AsyncTableBuilder<T> setRetryPause(long pause, TimeUnit unit); + AsyncTableBuilder<C> setRetryPause(long pause, TimeUnit unit); /** * Set the max retry times for an operation. Usually it is the max attempt times minus 1. @@ -87,7 +87,7 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> { * @see #setMaxAttempts(int) * @see #setOperationTimeout(long, TimeUnit) */ - default AsyncTableBuilder<T> setMaxRetries(int maxRetries) { + default AsyncTableBuilder<C> setMaxRetries(int maxRetries) { return setMaxAttempts(retries2Attempts(maxRetries)); } @@ -98,15 +98,15 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> { * @see #setMaxRetries(int) * @see #setOperationTimeout(long, TimeUnit) */ - AsyncTableBuilder<T> setMaxAttempts(int maxAttempts); + AsyncTableBuilder<C> setMaxAttempts(int maxAttempts); /** * Set the number of retries that are allowed before we start to log. */ - AsyncTableBuilder<T> setStartLogErrorsCnt(int startLogErrorsCnt); + AsyncTableBuilder<C> setStartLogErrorsCnt(int startLogErrorsCnt); /** - * Create the {@link AsyncTable} or {@link RawAsyncTable} instance. + * Create the {@link AsyncTable} instance. */ - T build(); + AsyncTable<C> build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 3fd6bde..ee571f1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -28,7 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience; * Base class for all asynchronous table builders. */ @InterfaceAudience.Private -abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncTableBuilder<T> { +abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase> + implements AsyncTableBuilder<C> { protected TableName tableName; @@ -51,7 +52,7 @@ abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncT AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) { this.tableName = tableName; this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs() - : connConf.getOperationTimeoutNs(); + : connConf.getOperationTimeoutNs(); this.scanTimeoutNs = connConf.getScanTimeoutNs(); this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); @@ -62,49 +63,49 @@ abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncT } @Override - public AsyncTableBuilderBase<T> setOperationTimeout(long timeout, TimeUnit unit) { + public AsyncTableBuilderBase<C> setOperationTimeout(long timeout, TimeUnit unit) { this.operationTimeoutNs = unit.toNanos(timeout); return this; } @Override - public AsyncTableBuilderBase<T> setScanTimeout(long timeout, TimeUnit unit) { + public AsyncTableBuilderBase<C> setScanTimeout(long timeout, TimeUnit unit) { this.scanTimeoutNs = unit.toNanos(timeout); return this; } @Override - public AsyncTableBuilderBase<T> setRpcTimeout(long timeout, TimeUnit unit) { + public AsyncTableBuilderBase<C> setRpcTimeout(long timeout, TimeUnit unit) { this.rpcTimeoutNs = unit.toNanos(timeout); return this; } @Override - public AsyncTableBuilderBase<T> setReadRpcTimeout(long timeout, TimeUnit unit) { + public AsyncTableBuilderBase<C> setReadRpcTimeout(long timeout, TimeUnit unit) { this.readRpcTimeoutNs = unit.toNanos(timeout); return this; } @Override - public AsyncTableBuilderBase<T> setWriteRpcTimeout(long timeout, TimeUnit unit) { + public AsyncTableBuilderBase<C> setWriteRpcTimeout(long timeout, TimeUnit unit) { this.writeRpcTimeoutNs = unit.toNanos(timeout); return this; } @Override - public AsyncTableBuilderBase<T> setRetryPause(long pause, TimeUnit unit) { + public AsyncTableBuilderBase<C> setRetryPause(long pause, TimeUnit unit) { this.pauseNs = unit.toNanos(pause); return this; } @Override - public AsyncTableBuilderBase<T> setMaxAttempts(int maxAttempts) { + public AsyncTableBuilderBase<C> setMaxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; return this; } @Override - public AsyncTableBuilderBase<T> setStartLogErrorsCnt(int startLogErrorsCnt) { + public AsyncTableBuilderBase<C> setStartLogErrorsCnt(int startLogErrorsCnt) { this.startLogErrorsCnt = startLogErrorsCnt; return this; } http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index ae43f5b..c8553c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -19,34 +19,37 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; +import com.google.protobuf.RpcChannel; + import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; /** - * The implementation of AsyncTable. Based on {@link RawAsyncTable}. + * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a + * thread pool when constructing this class, and the callback methods registered to the returned + * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users + * to do anything they want in the callbacks without breaking the rpc framework. */ @InterfaceAudience.Private -class AsyncTableImpl implements AsyncTable { +class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { - private final RawAsyncTable rawTable; + private final AsyncTable<AdvancedScanResultConsumer> rawTable; private final ExecutorService pool; - private final long defaultScannerMaxResultSize; - - AsyncTableImpl(AsyncConnectionImpl conn, RawAsyncTable rawTable, ExecutorService pool) { + AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable<AdvancedScanResultConsumer> rawTable, + ExecutorService pool) { this.rawTable = rawTable; this.pool = pool; - this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); } @Override @@ -172,16 +175,9 @@ class AsyncTableImpl implements AsyncTable { return wrap(rawTable.scanAll(scan)); } - private long resultSize2CacheSize(long maxResultSize) { - // * 2 if possible - return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; - } - @Override public ResultScanner getScanner(Scan scan) { - return new AsyncTableResultScanner(rawTable, ReflectionUtils.newInstance(scan.getClass(), scan), - resultSize2CacheSize( - scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); + return rawTable.getScanner(scan); } private void scan0(Scan scan, ScanResultConsumer consumer) { @@ -222,4 +218,59 @@ class AsyncTableImpl implements AsyncTable { public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList()); } + + @Override + public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, + ServiceCaller<S, R> callable, byte[] row) { + return wrap(rawTable.coprocessorService(stubMaker, callable, row)); + } + + @Override + public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( + Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, + CoprocessorCallback<R> callback) { + CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() { + + @Override + public void onRegionComplete(RegionInfo region, R resp) { + pool.execute(() -> callback.onRegionComplete(region, resp)); + } + + @Override + public void onRegionError(RegionInfo region, Throwable error) { + pool.execute(() -> callback.onRegionError(region, error)); + } + + @Override + public void onComplete() { + pool.execute(() -> callback.onComplete()); + } + + @Override + public void onError(Throwable error) { + pool.execute(() -> callback.onError(error)); + } + }; + CoprocessorServiceBuilder<S, R> builder = + rawTable.coprocessorService(stubMaker, callable, wrappedCallback); + return new CoprocessorServiceBuilder<S, R>() { + + @Override + public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) { + builder.fromRow(startKey, inclusive); + return this; + } + + @Override + public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) { + builder.toRow(endKey, inclusive); + return this; + } + + @Override + public void execute() { + builder.execute(); + } + }; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index 957f06f..fe9645a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -37,11 +37,11 @@ import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; * {@code 2 * scan.getMaxResultSize()}. */ @InterfaceAudience.Private -class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { +class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer { private static final Log LOG = LogFactory.getLog(AsyncTableResultScanner.class); - private final RawAsyncTable rawTable; + private final AsyncTable<AdvancedScanResultConsumer> rawTable; private final long maxCacheSize; @@ -59,7 +59,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { private ScanResumer resumer; - public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) { + public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan, + long maxCacheSize) { this.rawTable = table; this.maxCacheSize = maxCacheSize; this.scan = scan; @@ -74,8 +75,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { private void stopPrefetch(ScanController controller) { if (LOG.isDebugEnabled()) { LOG.debug(String.format("0x%x", System.identityHashCode(this)) + - " stop prefetching when scanning " + rawTable.getName() + " as the cache size " + - cacheSize + " is greater than the maxCacheSize " + maxCacheSize); + " stop prefetching when scanning " + rawTable.getName() + " as the cache size " + + cacheSize + " is greater than the maxCacheSize " + maxCacheSize); } resumer = controller.suspend(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index bc0ade2..780dcf9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -21,9 +21,6 @@ import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; - import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; @@ -41,26 +38,28 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.DNS; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.DNS; /** * Utility used by client connections. @@ -378,7 +377,7 @@ public final class ConnectionUtils { } } - static boolean noMoreResultsForScan(Scan scan, HRegionInfo info) { + static boolean noMoreResultsForScan(Scan scan, RegionInfo info) { if (isEmptyStopRow(info.getEndKey())) { return true; } @@ -392,7 +391,7 @@ public final class ConnectionUtils { return c > 0 || (c == 0 && !scan.includeStopRow()); } - static boolean noMoreResultsForReverseScan(Scan scan, HRegionInfo info) { + static boolean noMoreResultsForReverseScan(Scan scan, RegionInfo info) { if (isEmptyStartRow(info.getStartKey())) { return true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index bcf581b..6366cf0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.Message; +import com.google.protobuf.RpcChannel; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -63,7 +68,6 @@ import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; -import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable; import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.TableCFs; @@ -83,6 +87,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; + import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer; import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout; @@ -245,11 +250,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Updat import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.protobuf.Message; -import com.google.protobuf.RpcChannel; - /** * The implementation of AsyncAdmin. * <p> @@ -263,7 +263,7 @@ import com.google.protobuf.RpcChannel; * @see AsyncConnection#getAdminBuilder() */ @InterfaceAudience.Private -public class RawAsyncHBaseAdmin implements AsyncAdmin { +class RawAsyncHBaseAdmin implements AsyncAdmin { public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class); @@ -272,7 +272,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { private final HashedWheelTimer retryTimer; - private final RawAsyncTable metaTable; + private final AsyncTable<AdvancedScanResultConsumer> metaTable; private final long rpcTimeoutNs; @@ -290,7 +290,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { AsyncAdminBuilderBase builder) { this.connection = connection; this.retryTimer = retryTimer; - this.metaTable = connection.getRawTable(META_TABLE_NAME); + this.metaTable = connection.getTable(META_TABLE_NAME); this.rpcTimeoutNs = builder.rpcTimeoutNs; this.operationTimeoutNs = builder.operationTimeoutNs; this.pauseNs = builder.pauseNs; @@ -1442,8 +1442,8 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); Scan scan = QuotaTableUtil.makeScan(filter); - this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build() - .scan(scan, new RawScanResultConsumer() { + this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build() + .scan(scan, new AdvancedScanResultConsumer() { List<QuotaSettings> settings = new ArrayList<>(); @Override @@ -3001,7 +3001,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable) { + ServiceCaller<S, R> callable) { MasterCoprocessorRpcChannelImpl channel = new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); S stub = stubMaker.apply(channel); @@ -3019,7 +3019,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, ServerName serverName) { + ServiceCaller<S, R> callable, ServerName serverName) { RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName( serverName)); http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java deleted file mode 100644 index 102f279..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -import org.apache.yetus.audience.InterfaceAudience; - -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.RpcController; - -/** - * A low level asynchronous table. - * <p> - * The implementation is required to be thread safe. - * <p> - * The returned {@code CompletableFuture} will be finished directly in the rpc framework's callback - * thread, so typically you should not do any time consuming work inside these methods, otherwise - * you will be likely to block at least one connection to RS(even more if the rpc framework uses - * NIO). - * <p> - * So, only experts that want to build high performance service should use this interface directly, - * especially for the {@link #scan(Scan, RawScanResultConsumer)} below. - * @since 2.0.0 - */ -@InterfaceAudience.Public -public interface RawAsyncTable extends AsyncTableBase { - - /** - * The basic scan API uses the observer pattern. All results that match the given scan object will - * be passed to the given {@code consumer} by calling {@code RawScanResultConsumer.onNext}. - * {@code RawScanResultConsumer.onComplete} means the scan is finished, and - * {@code RawScanResultConsumer.onError} means we hit an unrecoverable error and the scan is - * terminated. {@code RawScanResultConsumer.onHeartbeat} means the RS is still working but we can - * not get a valid result to call {@code RawScanResultConsumer.onNext}. This is usually because - * the matched results are too sparse, for example, a filter which almost filters out everything - * is specified. - * <p> - * Notice that, the methods of the given {@code consumer} will be called directly in the rpc - * framework's callback thread, so typically you should not do any time consuming work inside - * these methods, otherwise you will be likely to block at least one connection to RS(even more if - * the rpc framework uses NIO). - * @param scan A configured {@link Scan} object. - * @param consumer the consumer used to receive results. - */ - void scan(Scan scan, RawScanResultConsumer consumer); - - /** - * Delegate to a protobuf rpc call. - * <p> - * Usually, it is just a simple lambda expression, like: - * - * <pre> - * <code> - * (stub, controller, rpcCallback) -> { - * XXXRequest request = ...; // prepare the request - * stub.xxx(controller, request, rpcCallback); - * } - * </code> - * </pre> - * - * And if you can prepare the {@code request} before calling the coprocessorService method, the - * lambda expression will be: - * - * <pre> - * <code> - * (stub, controller, rpcCallback) -> stub.xxx(controller, request, rpcCallback) - * </code> - * </pre> - */ - @InterfaceAudience.Public - @FunctionalInterface - interface CoprocessorCallable<S, R> { - - /** - * Represent the actual protobuf rpc call. - * @param stub the asynchronous stub - * @param controller the rpc controller, has already been prepared for you - * @param rpcCallback the rpc callback, has already been prepared for you - */ - void call(S stub, RpcController controller, RpcCallback<R> rpcCallback); - } - - /** - * Execute the given coprocessor call on the region which contains the given {@code row}. - * <p> - * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a - * one line lambda expression, like: - * - * <pre> - * <code> - * channel -> xxxService.newStub(channel) - * </code> - * </pre> - * - * @param stubMaker a delegation to the actual {@code newStub} call. - * @param callable a delegation to the actual protobuf rpc call. See the comment of - * {@link CoprocessorCallable} for more details. - * @param row The row key used to identify the remote region location - * @param <S> the type of the asynchronous stub - * @param <R> the type of the return value - * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. - * @see CoprocessorCallable - */ - <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, byte[] row); - - /** - * The callback when we want to execute a coprocessor call on a range of regions. - * <p> - * As the locating itself also takes some time, the implementation may want to send rpc calls on - * the fly, which means we do not know how many regions we have when we get the return value of - * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have - * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)} - * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no - * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)} - * calls in the future. - * <p> - * Here is a pseudo code to describe a typical implementation of a range coprocessor service - * method to help you better understand how the {@link CoprocessorCallback} will be called. The - * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the - * {@code whenComplete} is {@code CompletableFuture.whenComplete}. - * - * <pre> - * locateThenCall(byte[] row) { - * locate(row).whenComplete((location, locateError) -> { - * if (locateError != null) { - * callback.onError(locateError); - * return; - * } - * incPendingCall(); - * region = location.getRegion(); - * if (region.getEndKey() > endKey) { - * locateEnd = true; - * } else { - * locateThenCall(region.getEndKey()); - * } - * sendCall().whenComplete((resp, error) -> { - * if (error != null) { - * callback.onRegionError(region, error); - * } else { - * callback.onRegionComplete(region, resp); - * } - * if (locateEnd && decPendingCallAndGet() == 0) { - * callback.onComplete(); - * } - * }); - * }); - * } - * </pre> - */ - @InterfaceAudience.Public - interface CoprocessorCallback<R> { - - /** - * @param region the region that the response belongs to - * @param resp the response of the coprocessor call - */ - void onRegionComplete(RegionInfo region, R resp); - - /** - * @param region the region that the error belongs to - * @param error the response error of the coprocessor call - */ - void onRegionError(RegionInfo region, Throwable error); - - /** - * Indicate that all responses of the regions have been notified by calling - * {@link #onRegionComplete(RegionInfo, Object)} or - * {@link #onRegionError(RegionInfo, Throwable)}. - */ - void onComplete(); - - /** - * Indicate that we got an error which does not belong to any regions. Usually a locating error. - */ - void onError(Throwable error); - } - - /** - * Helper class for sending coprocessorService request that executes a coprocessor call on regions - * which are covered by a range. - * <p> - * If {@code fromRow} is not specified the selection will start with the first table region. If - * {@code toRow} is not specified the selection will continue through the last table region. - * @param <S> the type of the protobuf Service you want to call. - * @param <R> the type of the return value. - */ - interface CoprocessorServiceBuilder<S, R> { - - /** - * @param startKey start region selection with region containing this row, inclusive. - */ - default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) { - return fromRow(startKey, true); - } - - /** - * @param startKey start region selection with region containing this row - * @param inclusive whether to include the startKey - */ - CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive); - - /** - * @param endKey select regions up to and including the region containing this row, exclusive. - */ - default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) { - return toRow(endKey, false); - } - - /** - * @param endKey select regions up to and including the region containing this row - * @param inclusive whether to include the endKey - */ - CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive); - - /** - * Execute the coprocessorService request. You can get the response through the - * {@link CoprocessorCallback}. - */ - void execute(); - } - - /** - * Execute a coprocessor call on the regions which are covered by a range. - * <p> - * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it. - * <p> - * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it - * is only a one line lambda expression, like: - * - * <pre> - * <code> - * channel -> xxxService.newStub(channel) - * </code> - * </pre> - * - * @param stubMaker a delegation to the actual {@code newStub} call. - * @param callable a delegation to the actual protobuf rpc call. See the comment of - * {@link CoprocessorCallable} for more details. - * @param callback callback to get the response. See the comment of {@link CoprocessorCallback} - * for more details. - */ - <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index d4de573..07a2b92 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -62,9 +62,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType /** * The implementation of RawAsyncTable. + * <p> + * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will + * be finished inside the rpc framework thread, which means that the callbacks registered to the + * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use + * this class should not try to do time consuming tasks in the callbacks. + * @since 2.0.0 + * @see AsyncTableImpl */ @InterfaceAudience.Private -class RawAsyncTableImpl implements RawAsyncTable { +class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { private final AsyncConnectionImpl conn; @@ -102,7 +109,7 @@ class RawAsyncTableImpl implements RawAsyncTable { this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() - : conn.connConf.getScannerCaching(); + : conn.connConf.getScannerCaching(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); } @@ -270,7 +277,7 @@ class RawAsyncTableImpl implements RawAsyncTable { @Override public CheckAndMutateBuilder qualifier(byte[] qualifier) { this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + - " an empty byte array, or just do not call this method if you want a null qualifier"); + " an empty byte array, or just do not call this method if you want a null qualifier"); return this; } @@ -290,7 +297,7 @@ class RawAsyncTableImpl implements RawAsyncTable { private void preCheck() { Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + - " calling ifNotExists/ifEquals/ifMatches before executing the request"); + " calling ifNotExists/ifEquals/ifMatches before executing the request"); } @Override @@ -354,14 +361,12 @@ class RawAsyncTableImpl implements RawAsyncTable { } else { try { org.apache.hadoop.hbase.client.MultiResponse multiResp = - ResponseConverter.getResults(req, resp, controller.cellScanner()); + ResponseConverter.getResults(req, resp, controller.cellScanner()); Throwable ex = multiResp.getException(regionName); if (ex != null) { - future - .completeExceptionally(ex instanceof IOException ? ex - : new IOException( - "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), - ex)); + future.completeExceptionally(ex instanceof IOException ? ex + : new IOException( + "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); } else { future.complete(respConverter .apply((Result) multiResp.getResults().get(regionName).result.get(0))); @@ -400,11 +405,28 @@ class RawAsyncTableImpl implements RawAsyncTable { return newScan; } + public void scan(Scan scan, AdvancedScanResultConsumer consumer) { + new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs, + maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); + } + + private long resultSize2CacheSize(long maxResultSize) { + // * 2 if possible + return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; + } + + @Override + public ResultScanner getScanner(Scan scan) { + return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan), + resultSize2CacheSize( + scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); + } + @Override public CompletableFuture<List<Result>> scanAll(Scan scan) { CompletableFuture<List<Result>> future = new CompletableFuture<>(); List<Result> scanResults = new ArrayList<>(); - scan(scan, new RawScanResultConsumer() { + scan(scan, new AdvancedScanResultConsumer() { @Override public void onNext(Result[] results, ScanController controller) { @@ -424,11 +446,6 @@ class RawAsyncTableImpl implements RawAsyncTable { return future; } - public void scan(Scan scan, RawScanResultConsumer consumer) { - new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs, - maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); - } - @Override public List<CompletableFuture<Result>> get(List<Get> gets) { return batch(gets, readRpcTimeoutNs); @@ -487,7 +504,7 @@ class RawAsyncTableImpl implements RawAsyncTable { } private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, RegionInfo region, byte[] row) { + ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs); S stub = stubMaker.apply(channel); @@ -505,7 +522,7 @@ class RawAsyncTableImpl implements RawAsyncTable { @Override public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, byte[] row) { + ServiceCaller<S, R> callable, byte[] row) { return coprocessorService(stubMaker, callable, null, row); } @@ -527,7 +544,7 @@ class RawAsyncTableImpl implements RawAsyncTable { } private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback, + ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { @@ -563,7 +580,7 @@ class RawAsyncTableImpl implements RawAsyncTable { private final Function<RpcChannel, S> stubMaker; - private final CoprocessorCallable<S, R> callable; + private final ServiceCaller<S, R> callable; private final CoprocessorCallback<R> callback; @@ -576,7 +593,7 @@ class RawAsyncTableImpl implements RawAsyncTable { private boolean endKeyInclusive; public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback) { + ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); this.callable = Preconditions.checkNotNull(callable, "callable is null"); this.callback = Preconditions.checkNotNull(callback, "callback is null"); @@ -586,8 +603,8 @@ class RawAsyncTableImpl implements RawAsyncTable { public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { this.startKey = Preconditions.checkNotNull(startKey, "startKey is null. Consider using" + - " an empty byte array, or just do not call this method if you want to start selection" + - " from the first region"); + " an empty byte array, or just do not call this method if you want to start selection" + + " from the first region"); this.startKeyInclusive = inclusive; return this; } @@ -596,8 +613,8 @@ class RawAsyncTableImpl implements RawAsyncTable { public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { this.endKey = Preconditions.checkNotNull(endKey, "endKey is null. Consider using" + - " an empty byte array, or just do not call this method if you want to continue" + - " selection to the last region"); + " an empty byte array, or just do not call this method if you want to continue" + + " selection to the last region"); this.endKeyInclusive = inclusive; return this; } @@ -614,7 +631,7 @@ class RawAsyncTableImpl implements RawAsyncTable { @Override public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( - Function<RpcChannel, S> stubMaker, CoprocessorCallable<S, R> callable, + Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); } http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java deleted file mode 100644 index 7ab02d8..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.util.Optional; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; - -/** - * Receives {@link Result} for an asynchronous scan. - * <p> - * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread - * which we send request to HBase service. So if you want the asynchronous scanner fetch data from - * HBase in background while you process the returned data, you need to move the processing work to - * another thread to make the {@code onNext} call return immediately. And please do NOT do any time - * consuming tasks in all methods below unless you know what you are doing. - * @since 2.0.0 - */ -@InterfaceAudience.Public -public interface RawScanResultConsumer { - - /** - * Used to resume a scan. - */ - @InterfaceAudience.Public - interface ScanResumer { - - /** - * Resume the scan. You are free to call it multiple time but only the first call will take - * effect. - */ - void resume(); - } - - /** - * Used to suspend or stop a scan, or get a scan cursor if available. - * <p> - * Notice that, you should only call the {@link #suspend()} or {@link #terminate()} inside onNext - * or onHeartbeat method. A IllegalStateException will be thrown if you call them at other places. - * <p> - * You can only call one of the {@link #suspend()} and {@link #terminate()} methods(of course you - * are free to not call them both), and the methods are not reentrant. An IllegalStateException - * will be thrown if you have already called one of the methods. - */ - @InterfaceAudience.Public - interface ScanController { - - /** - * Suspend the scan. - * <p> - * This means we will stop fetching data in background, i.e., will not call onNext any more - * before you resume the scan. - * @return A resumer used to resume the scan later. - */ - ScanResumer suspend(); - - /** - * Terminate the scan. - * <p> - * This is useful when you have got enough results and want to stop the scan in onNext method, - * or you want to stop the scan in onHeartbeat method because it has spent too many time. - */ - void terminate(); - - /** - * Get the scan cursor if available. - * @return The scan cursor. - */ - Optional<Cursor> cursor(); - } - - /** - * Indicate that we have receive some data. - * @param results the data fetched from HBase service. - * @param controller used to suspend or terminate the scan. Notice that the {@code controller} - * instance is only valid within scope of onNext method. You can only call its method in - * onNext, do NOT store it and call it later outside onNext. - */ - void onNext(Result[] results, ScanController controller); - - /** - * Indicate that there is a heartbeat message but we have not cumulated enough cells to call - * {@link #onNext(Result[], ScanController)}. - * <p> - * Note that this method will always be called when RS returns something to us but we do not have - * enough cells to call {@link #onNext(Result[], ScanController)}. Sometimes it may not be a - * 'heartbeat' message for RS, for example, we have a large row with many cells and size limit is - * exceeded before sending all the cells for this row. For RS it does send some data to us and the - * time limit has not been reached, but we can not return the data to client so here we call this - * method to tell client we have already received something. - * <p> - * This method give you a chance to terminate a slow scan operation. - * @param controller used to suspend or terminate the scan. Notice that the {@code controller} - * instance is only valid within the scope of onHeartbeat method. You can only call its - * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. - */ - default void onHeartbeat(ScanController controller) { - } - - /** - * Indicate that we hit an unrecoverable error and the scan operation is terminated. - * <p> - * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. - */ - void onError(Throwable error); - - /** - * Indicate that the scan operation is completed normally. - */ - void onComplete(); - - /** - * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to - * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan - * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can - * store it somewhere to get the metrics at any time if you want. - */ - default void onScanMetricsCreated(ScanMetrics scanMetrics) { - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java index 826a8ef..be3108b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java @@ -18,38 +18,20 @@ package org.apache.hadoop.hbase.client; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; /** * Receives {@link Result} for an asynchronous scan. + * <p> + * All results that match the given scan object will be passed to this class by calling + * {@link #onNext(Result)}. {@link #onComplete()} means the scan is finished, and + * {@link #onError(Throwable)} means we hit an unrecoverable error and the scan is terminated. */ @InterfaceAudience.Public -public interface ScanResultConsumer { +public interface ScanResultConsumer extends ScanResultConsumerBase { /** * @param result the data fetched from HBase service. * @return {@code false} if you want to terminate the scan process. Otherwise {@code true} */ boolean onNext(Result result); - - /** - * Indicate that we hit an unrecoverable error and the scan operation is terminated. - * <p> - * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. - */ - void onError(Throwable error); - - /** - * Indicate that the scan operation is completed normally. - */ - void onComplete(); - - /** - * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to - * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan - * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can - * store it somewhere to get the metrics at any time if you want. - */ - default void onScanMetricsCreated(ScanMetrics scanMetrics) { - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java new file mode 100644 index 0000000..538cf9d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The base interface for scan result consumer. + */ +@InterfaceAudience.Public +public interface ScanResultConsumerBase { + /** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + * <p> + * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ + void onError(Throwable error); + + /** + * Indicate that the scan operation is completed normally. + */ + void onComplete(); + + /** + * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to + * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan + * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can + * store it somewhere to get the metrics at any time if you want. + */ + default void onScanMetricsCreated(ScanMetrics scanMetrics) { + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java new file mode 100644 index 0000000..467f1a2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Delegate to a protobuf rpc call. + * <p> + * Usually, it is just a simple lambda expression, like: + * + * <pre> + * <code> + * (stub, controller, rpcCallback) -> { + * XXXRequest request = ...; // prepare the request + * stub.xxx(controller, request, rpcCallback); + * } + * </code> + * </pre> + * + * And if already have the {@code request}, the lambda expression will be: + * + * <pre> + * <code> + * (stub, controller, rpcCallback) -> stub.xxx(controller, request, rpcCallback) + * </code> + * </pre> + * + * @param <S> the type of the protobuf Service you want to call. + * @param <R> the type of the return value. + */ +@InterfaceAudience.Public +@FunctionalInterface +public interface ServiceCaller<S, R> { + + /** + * Represent the actual protobuf rpc call. + * @param stub the asynchronous stub + * @param controller the rpc controller, has already been prepared for you + * @param rpcCallback the rpc callback, has already been prepared for you + */ + void call(S stub, RpcController controller, RpcCallback<R> rpcCallback); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java index ff9b873..371e865 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java @@ -32,9 +32,9 @@ import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.RawAsyncTable; -import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback; -import org.apache.hadoop.hbase.client.RawScanResultConsumer; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.AsyncTable.CoprocessorCallback; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -126,7 +126,7 @@ public class AsyncAggregationClient { } public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> - max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { + max(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { CompletableFuture<R> future = new CompletableFuture<>(); AggregateRequest req; try { @@ -163,7 +163,7 @@ public class AsyncAggregationClient { } public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> - min(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { + min(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { CompletableFuture<R> future = new CompletableFuture<>(); AggregateRequest req; try { @@ -201,7 +201,7 @@ public class AsyncAggregationClient { public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<Long> - rowCount(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { + rowCount(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { CompletableFuture<Long> future = new CompletableFuture<>(); AggregateRequest req; try { @@ -233,7 +233,7 @@ public class AsyncAggregationClient { } public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<S> - sum(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { + sum(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { CompletableFuture<S> future = new CompletableFuture<>(); AggregateRequest req; try { @@ -269,7 +269,7 @@ public class AsyncAggregationClient { public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<Double> - avg(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { + avg(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { CompletableFuture<Double> future = new CompletableFuture<>(); AggregateRequest req; try { @@ -307,7 +307,7 @@ public class AsyncAggregationClient { public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<Double> - std(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { + std(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { CompletableFuture<Double> future = new CompletableFuture<>(); AggregateRequest req; try { @@ -351,7 +351,7 @@ public class AsyncAggregationClient { // the map key is the startRow of the region private static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<NavigableMap<byte[], S>> - sumByRegion(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { + sumByRegion(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { CompletableFuture<NavigableMap<byte[], S>> future = new CompletableFuture<NavigableMap<byte[], S>>(); AggregateRequest req; @@ -388,8 +388,8 @@ public class AsyncAggregationClient { } private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian( - CompletableFuture<R> future, RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, - Scan scan, NavigableMap<byte[], S> sumByRegion) { + CompletableFuture<R> future, AsyncTable<AdvancedScanResultConsumer> table, + ColumnInterpreter<R, S, P, Q, T> ci, Scan scan, NavigableMap<byte[], S> sumByRegion) { double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L); S movingSum = null; byte[] startRow = null; @@ -410,7 +410,7 @@ public class AsyncAggregationClient { NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family); byte[] weightQualifier = qualifiers.last(); byte[] valueQualifier = qualifiers.first(); - table.scan(scan, new RawScanResultConsumer() { + table.scan(scan, new AdvancedScanResultConsumer() { private S sum = baseSum; @@ -456,8 +456,9 @@ public class AsyncAggregationClient { }); } - public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> - median(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { + public static <R, S, P extends Message, Q extends Message, T extends Message> + CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table, + ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { CompletableFuture<R> future = new CompletableFuture<>(); sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> { if (error != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java index 389aaaf..12e5b8d 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java @@ -55,7 +55,7 @@ public class TestAsyncAggregationClient { private static AsyncConnection CONN; - private static RawAsyncTable TABLE; + private static AsyncTable<AdvancedScanResultConsumer> TABLE; @BeforeClass public static void setUp() throws Exception { @@ -69,7 +69,7 @@ public class TestAsyncAggregationClient { } UTIL.createTable(TABLE_NAME, CF, splitKeys); CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); - TABLE = CONN.getRawTable(TABLE_NAME); + TABLE = CONN.getTable(TABLE_NAME); TABLE.putAll(LongStream.range(0, COUNT) .mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l))) .addColumn(CF, CQ, Bytes.toBytes(l)).addColumn(CF, CQ2, Bytes.toBytes(l * l))) http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java index 2105547..67aba62 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java @@ -141,7 +141,7 @@ public class AsyncClientExample extends Configured implements Tool { latch.countDown(); return; } - AsyncTable table = conn.getTable(tableName, threadPool); + AsyncTable<?> table = conn.getTable(tableName, threadPool); table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))) .whenComplete((putResp, putErr) -> { if (putErr != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java index bb83bac..e3686f4 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java @@ -17,9 +17,24 @@ */ package org.apache.hadoop.hbase.client.example; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; +import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; - import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ServerBootstrap; import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf; import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; @@ -43,26 +58,10 @@ import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.HttpVersion; import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.QueryStringDecoder; import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.GlobalEventExecutor; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; -import java.util.Optional; -import java.util.concurrent.ExecutionException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.AsyncConnection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RawAsyncTable; -import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; -import org.apache.hadoop.hbase.util.Bytes; - /** - * A simple example on how to use {@link RawAsyncTable} to write a fully asynchronous HTTP proxy - * server. The {@link AsyncConnection} will share the same event loop with the HTTP server. + * A simple example on how to use {@link org.apache.hadoop.hbase.client.AsyncTable} to write a fully + * asynchronous HTTP proxy server. The {@link AsyncConnection} will share the same event loop with + * the HTTP server. * <p> * The request URL is: * @@ -160,7 +159,7 @@ public class HttpProxyExample { private void get(ChannelHandlerContext ctx, FullHttpRequest req) { Params params = parse(req); - conn.getRawTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row)) + conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row)) .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))) .whenComplete((r, e) -> { if (e != null) { @@ -181,7 +180,7 @@ public class HttpProxyExample { Params params = parse(req); byte[] value = new byte[req.content().readableBytes()]; req.content().readBytes(value); - conn.getRawTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row)) + conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row)) .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)) .whenComplete((r, e) -> { if (e != null) {