Repository: hbase Updated Branches: refs/heads/master 49abc2e1c -> fad7d01d8
HBASE-18972 Use Builder pattern to remove nullable parameters for coprocessor methods in RawAsyncTable interface Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fad7d01d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fad7d01d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fad7d01d Branch: refs/heads/master Commit: fad7d01d8f28f2de8e82b27306568c265bd32b41 Parents: 49abc2e Author: zhangduo <[email protected]> Authored: Thu Nov 2 13:55:16 2017 +0800 Committer: zhangduo <[email protected]> Committed: Thu Nov 2 13:58:29 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/RawAsyncTable.java | 85 ++++++++++++-------- .../hadoop/hbase/client/RawAsyncTableImpl.java | 75 +++++++++++++---- .../coprocessor/AsyncAggregationClient.java | 72 ++++++++++------- 3 files changed, 151 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fad7d01d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java index cd0226b..102f279 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java @@ -38,11 +38,6 @@ import com.google.protobuf.RpcController; * <p> * So, only experts that want to build high performance service should use this interface directly, * especially for the {@link #scan(Scan, RawScanResultConsumer)} below. - * <p> - * TODO: For now the only difference between this interface and {@link AsyncTable} is the scan - * method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat) - * so it is not suitable for a normal user. If it is still the only difference after we implement - * most features of AsyncTable, we can think about merge these two interfaces. * @since 2.0.0 */ @InterfaceAudience.Public @@ -135,8 +130,8 @@ public interface RawAsyncTable extends AsyncTableBase { * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)} * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no - * {@link #onRegionComplete(RegionInfo, Object)} or - * {@link #onRegionError(RegionInfo, Throwable)} calls in the future. + * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)} + * calls in the future. * <p> * Here is a pseudo code to describe a typical implementation of a range coprocessor service * method to help you better understand how the {@link CoprocessorCallback} will be called. The @@ -200,25 +195,56 @@ public interface RawAsyncTable extends AsyncTableBase { } /** - * Execute the given coprocessor call on the regions which are covered by the range from - * {@code startKey} inclusive and {@code endKey} exclusive. See the comment of - * {@link #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean, CoprocessorCallback)} - * for more details. - * @see #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean, - * CoprocessorCallback) + * Helper class for sending coprocessorService request that executes a coprocessor call on regions + * which are covered by a range. + * <p> + * If {@code fromRow} is not specified the selection will start with the first table region. If + * {@code toRow} is not specified the selection will continue through the last table region. + * @param <S> the type of the protobuf Service you want to call. + * @param <R> the type of the return value. */ - default <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, byte[] startKey, byte[] endKey, - CoprocessorCallback<R> callback) { - coprocessorService(stubMaker, callable, startKey, true, endKey, false, callback); + interface CoprocessorServiceBuilder<S, R> { + + /** + * @param startKey start region selection with region containing this row, inclusive. + */ + default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) { + return fromRow(startKey, true); + } + + /** + * @param startKey start region selection with region containing this row + * @param inclusive whether to include the startKey + */ + CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive); + + /** + * @param endKey select regions up to and including the region containing this row, exclusive. + */ + default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) { + return toRow(endKey, false); + } + + /** + * @param endKey select regions up to and including the region containing this row + * @param inclusive whether to include the endKey + */ + CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive); + + /** + * Execute the coprocessorService request. You can get the response through the + * {@link CoprocessorCallback}. + */ + void execute(); } /** - * Execute the given coprocessor call on the regions which are covered by the range from - * {@code startKey} and {@code endKey}. The inclusive of boundaries are specified by - * {@code startKeyInclusive} and {@code endKeyInclusive}. The {@code stubMaker} is just a - * delegation to the {@code xxxService.newStub} call. Usually it is only a one line lambda - * expression, like: + * Execute a coprocessor call on the regions which are covered by a range. + * <p> + * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it. + * <p> + * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it + * is only a one line lambda expression, like: * * <pre> * <code> @@ -229,20 +255,9 @@ public interface RawAsyncTable extends AsyncTableBase { * @param stubMaker a delegation to the actual {@code newStub} call. * @param callable a delegation to the actual protobuf rpc call. See the comment of * {@link CoprocessorCallable} for more details. - * @param startKey start region selection with region containing this row. If {@code null}, the - * selection will start with the first table region. - * @param startKeyInclusive whether to include the startKey - * @param endKey select regions up to and including the region containing this row. If - * {@code null}, selection will continue through the last table region. - * @param endKeyInclusive whether to include the endKey * @param callback callback to get the response. See the comment of {@link CoprocessorCallback} * for more details. - * @param <S> the type of the asynchronous stub - * @param <R> the type of the return value - * @see CoprocessorCallable - * @see CoprocessorCallback */ - <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey, - boolean endKeyInclusive, CoprocessorCallback<R> callback); + <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, + CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback); } http://git-wip-us.apache.org/repos/asf/hbase/blob/fad7d01d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 6107f7f..d4de573 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; -import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; -import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; @@ -29,7 +27,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,6 +35,7 @@ import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; @@ -560,19 +558,64 @@ class RawAsyncTableImpl implements RawAsyncTable { }); } - @Override - public <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker, - CoprocessorCallable<S, R> callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey, - boolean endKeyInclusive, CoprocessorCallback<R> callback) { - byte[] nonNullStartKey = Optional.ofNullable(startKey).orElse(EMPTY_START_ROW); - byte[] nonNullEndKey = Optional.ofNullable(endKey).orElse(EMPTY_END_ROW); - List<HRegionLocation> locs = new ArrayList<>(); - conn.getLocator() - .getRegionLocation(tableName, nonNullStartKey, - startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs) - .whenComplete( - (loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey, - endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); + private final class CoprocessorServiceBuilderImpl<S, R> + implements CoprocessorServiceBuilder<S, R> { + + private final Function<RpcChannel, S> stubMaker; + + private final CoprocessorCallable<S, R> callable; + + private final CoprocessorCallback<R> callback; + + private byte[] startKey = HConstants.EMPTY_START_ROW; + + private boolean startKeyInclusive; + + private byte[] endKey = HConstants.EMPTY_END_ROW; + + private boolean endKeyInclusive; + + public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, + CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback) { + this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); + this.callable = Preconditions.checkNotNull(callable, "callable is null"); + this.callback = Preconditions.checkNotNull(callback, "callback is null"); + } + + @Override + public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { + this.startKey = Preconditions.checkNotNull(startKey, + "startKey is null. Consider using" + + " an empty byte array, or just do not call this method if you want to start selection" + + " from the first region"); + this.startKeyInclusive = inclusive; + return this; + } + + @Override + public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { + this.endKey = Preconditions.checkNotNull(endKey, + "endKey is null. Consider using" + + " an empty byte array, or just do not call this method if you want to continue" + + " selection to the last region"); + this.endKeyInclusive = inclusive; + return this; + } + + @Override + public void execute() { + conn.getLocator().getRegionLocation(tableName, startKey, + startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs) + .whenComplete( + (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), + endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); + } } + @Override + public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( + Function<RpcChannel, S> stubMaker, CoprocessorCallable<S, R> callable, + CoprocessorCallback<R> callback) { + return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fad7d01d/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java index 51c8248..ff9b873 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.client.coprocessor; import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB; +import com.google.protobuf.Message; + import java.io.IOException; import java.util.Map; import java.util.NavigableMap; @@ -29,6 +31,7 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RawAsyncTable; import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback; import org.apache.hadoop.hbase.client.RawScanResultConsumer; @@ -43,8 +46,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; -import com.google.protobuf.Message; - /** * This client class is for invoking the aggregate functions deployed on the Region Server side via * the AggregateService. This class will implement the supporting functionality for @@ -120,6 +121,10 @@ public class AsyncAggregationClient { return ci.getPromotedValueFromProto(t); } + private static byte[] nullToEmpty(byte[] b) { + return b != null ? b : HConstants.EMPTY_BYTE_ARRAY; + } + public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { CompletableFuture<R> future = new CompletableFuture<>(); @@ -149,10 +154,11 @@ public class AsyncAggregationClient { return max; } }; - table.coprocessorService(channel -> AggregateService.newStub(channel), - (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback), - scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), - callback); + table + .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, + (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback), callback) + .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) + .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); return future; } @@ -185,10 +191,11 @@ public class AsyncAggregationClient { return min; } }; - table.coprocessorService(channel -> AggregateService.newStub(channel), - (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback), - scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), - callback); + table + .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, + (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback), callback) + .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) + .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); return future; } @@ -217,10 +224,11 @@ public class AsyncAggregationClient { return count; } }; - table.coprocessorService(channel -> AggregateService.newStub(channel), - (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback), - scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), - callback); + table + .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, + (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback), callback) + .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) + .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); return future; } @@ -251,10 +259,11 @@ public class AsyncAggregationClient { return sum; } }; - table.coprocessorService(channel -> AggregateService.newStub(channel), - (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback), - scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), - callback); + table + .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, + (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback), callback) + .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) + .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); return future; } @@ -288,10 +297,11 @@ public class AsyncAggregationClient { return ci.divideForAvg(sum, count); } }; - table.coprocessorService(channel -> AggregateService.newStub(channel), - (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback), - scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), - callback); + table + .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, + (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback), callback) + .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) + .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); return future; } @@ -330,10 +340,11 @@ public class AsyncAggregationClient { return Math.sqrt(avgSq - avg * avg); } }; - table.coprocessorService(channel -> AggregateService.newStub(channel), - (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback), - scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), - callback); + table + .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, + (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback), callback) + .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) + .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); return future; } @@ -368,10 +379,11 @@ public class AsyncAggregationClient { return map; } }; - table.coprocessorService(channel -> AggregateService.newStub(channel), - (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), - scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), - callback); + table + .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, + (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), callback) + .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) + .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); return future; }
