Repository: hbase
Updated Branches:
  refs/heads/master 2511cc827 -> b7fc7bf24


HBASE-17346 Add coprocessor service support


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b7fc7bf2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b7fc7bf2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b7fc7bf2

Branch: refs/heads/master
Commit: b7fc7bf246934cea09e22f55bea62415f7319647
Parents: 2511cc8
Author: zhangduo <zhang...@apache.org>
Authored: Mon Jan 30 16:23:04 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue Jan 31 15:19:33 2017 +0800

----------------------------------------------------------------------
 .../client/ClientCoprocessorRpcController.java  |  74 +++
 .../hadoop/hbase/client/RawAsyncTable.java      | 189 ++++++++
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |  95 ++++
 .../client/RegionCoprocessorRpcChannelImpl.java | 117 +++++
 .../client/coprocessor/AggregationClient.java   |  88 +---
 .../client/coprocessor/AggregationHelper.java   | 109 +++++
 .../coprocessor/AsyncAggregationClient.java     | 464 +++++++++++++++++++
 .../coprocessor/AggregateImplementation.java    |  17 +-
 .../client/TestAsyncAggregationClient.java      | 167 +++++++
 9 files changed, 1233 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java
new file mode 100644
index 0000000..149e1d3
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Client side rpc controller for coprocessor implementation. It is only used 
to pass error.
+ */
+@InterfaceAudience.Private
+public class ClientCoprocessorRpcController implements RpcController {
+
+  private Throwable error;
+
+  @Override
+  public void reset() {
+  }
+
+  @Override
+  public boolean failed() {
+    return error != null;
+  }
+
+  @Override
+  public String errorText() {
+    return error != null ? error.getMessage() : null;
+  }
+
+  @Override
+  public void startCancel() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setFailed(String reason) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isCanceled() {
+    return false;
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> callback) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void setFailed(Throwable error) {
+    this.error = error;
+  }
+
+  public Throwable getFailed() {
+    return error;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
index 67099e8..59924cf 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
@@ -17,6 +17,14 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
@@ -60,4 +68,185 @@ public interface RawAsyncTable extends AsyncTableBase {
    * @param consumer the consumer used to receive results.
    */
   void scan(Scan scan, RawScanResultConsumer consumer);
+
+  /**
+   * Delegate to a protobuf rpc call.
+   * <p>
+   * Usually, it is just a simple lambda expression, like:
+   *
+   * <pre>
+   * <code>
+   * (stub, controller, rpcCallback) -> {
+   *   XXXRequest request = ...; // prepare the request
+   *   stub.xxx(controller, request, rpcCallback);
+   * }
+   * </code>
+   * </pre>
+   *
+   * And if you can prepare the {@code request} before calling the 
coprocessorService method, the
+   * lambda expression will be:
+   *
+   * <pre>
+   * <code>
+   * (stub, controller, rpcCallback) -> stub.xxx(controller, request, 
rpcCallback)
+   * </code>
+   * </pre>
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  @FunctionalInterface
+  interface CoprocessorCallable<S, R> {
+
+    /**
+     * Represent the actual protobuf rpc call.
+     * @param stub the asynchronous stub
+     * @param controller the rpc controller, has already been prepared for you
+     * @param rpcCallback the rpc callback, has already been prepared for you
+     */
+    void call(S stub, RpcController controller, RpcCallback<R> rpcCallback);
+  }
+
+  /**
+   * Execute the given coprocessor call on the region which contains the given 
{@code row}.
+   * <p>
+   * The {@code stubMaker} is just a delegation to the {@code newStub} call. 
Usually it is only a
+   * one line lambda expression, like:
+   *
+   * <pre>
+   * <code>
+   * channel -> xxxService.newStub(channel)
+   * </code>
+   * </pre>
+   *
+   * @param stubMaker a delegation to the actual {@code newStub} call.
+   * @param callable a delegation to the actual protobuf rpc call. See the 
comment of
+   *          {@link CoprocessorCallable} for more details.
+   * @param row The row key used to identify the remote region location
+   * @param <S> the type of the asynchronous stub
+   * @param <R> the type of the return value
+   * @return the return value of the protobuf rpc call, wrapped by a {@link 
CompletableFuture}.
+   * @see CoprocessorCallable
+   */
+  <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> 
stubMaker,
+      CoprocessorCallable<S, R> callable, byte[] row);
+
+  /**
+   * The callback when we want to execute a coprocessor call on a range of 
regions.
+   * <p>
+   * As the locating itself also takes some time, the implementation may want 
to send rpc calls on
+   * the fly, which means we do not know how many regions we have when we get 
the return value of
+   * the rpc calls, so we need an {@link #onComplete()} which is used to tell 
you that we have
+   * passed all the return values to you(through the {@link 
#onRegionComplete(HRegionInfo, Object)}
+   * or {@link #onRegionError(HRegionInfo, Throwable)} calls), i.e, there will 
be no
+   * {@link #onRegionComplete(HRegionInfo, Object)} or
+   * {@link #onRegionError(HRegionInfo, Throwable)} calls in the future.
+   * <p>
+   * Here is a pseudo code to describe a typical implementation of a range 
coprocessor service
+   * method to help you better understand how the {@link CoprocessorCallback} 
will be called. The
+   * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. 
And notice that the
+   * {@code whenComplete} is {@code CompletableFuture.whenComplete}.
+   *
+   * <pre>
+   * locateThenCall(byte[] row) {
+   *   locate(row).whenComplete((location, locateError) -> {
+   *     if (locateError != null) {
+   *       callback.onError(locateError);
+   *       return;
+   *     }
+   *     incPendingCall();
+   *     region = location.getRegion();
+   *     if (region.getEndKey() > endKey) {
+   *       locateEnd = true;
+   *     } else {
+   *       locateThenCall(region.getEndKey());
+   *     }
+   *     sendCall().whenComplete((resp, error) -> {
+   *       if (error != null) {
+   *         callback.onRegionError(region, error);
+   *       } else {
+   *         callback.onRegionComplete(region, resp);
+   *       }
+   *       if (locateEnd && decPendingCallAndGet() == 0) {
+   *         callback.onComplete();
+   *       }
+   *     });
+   *   });
+   * }
+   * </pre>
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  interface CoprocessorCallback<R> {
+
+    /**
+     * @param region the region that the response belongs to
+     * @param resp the response of the coprocessor call
+     */
+    void onRegionComplete(HRegionInfo region, R resp);
+
+    /**
+     * @param region the region that the error belongs to
+     * @param error the response error of the coprocessor call
+     */
+    void onRegionError(HRegionInfo region, Throwable error);
+
+    /**
+     * Indicate that all responses of the regions have been notified by calling
+     * {@link #onRegionComplete(HRegionInfo, Object)} or
+     * {@link #onRegionError(HRegionInfo, Throwable)}.
+     */
+    void onComplete();
+
+    /**
+     * Indicate that we got an error which does not belong to any regions. 
Usually a locating error.
+     */
+    void onError(Throwable error);
+  }
+
+  /**
+   * Execute the given coprocessor call on the regions which are covered by 
the range from
+   * {@code startKey} inclusive and {@code endKey} exclusive. See the comment 
of
+   * {@link #coprocessorService(Function, CoprocessorCallable, byte[], 
boolean, byte[], boolean, CoprocessorCallback)}
+   * for more details.
+   * @see #coprocessorService(Function, CoprocessorCallable, byte[], boolean, 
byte[], boolean,
+   *      CoprocessorCallback)
+   */
+  default <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
+      CoprocessorCallable<S, R> callable, byte[] startKey, byte[] endKey,
+      CoprocessorCallback<R> callback) {
+    coprocessorService(stubMaker, callable, startKey, true, endKey, false, 
callback);
+  }
+
+  /**
+   * Execute the given coprocessor call on the regions which are covered by 
the range from
+   * {@code startKey} and {@code endKey}. The inclusive of boundaries are 
specified by
+   * {@code startKeyInclusive} and {@code endKeyInclusive}. The {@code 
stubMaker} is just a
+   * delegation to the {@code xxxService.newStub} call. Usually it is only a 
one line lambda
+   * expression, like:
+   *
+   * <pre>
+   * <code>
+   * channel -> xxxService.newStub(channel)
+   * </code>
+   * </pre>
+   *
+   * @param stubMaker a delegation to the actual {@code newStub} call.
+   * @param callable a delegation to the actual protobuf rpc call. See the 
comment of
+   *          {@link CoprocessorCallable} for more details.
+   * @param startKey start region selection with region containing this row. 
If {@code null}, the
+   *          selection will start with the first table region.
+   * @param startKeyInclusive whether to include the startKey
+   * @param endKey select regions up to and including the region containing 
this row. If
+   *          {@code null}, selection will continue through the last table 
region.
+   * @param endKeyInclusive whether to include the endKey
+   * @param callback callback to get the response. See the comment of {@link 
CoprocessorCallback}
+   *          for more details.
+   * @param <S> the type of the asynchronous stub
+   * @param <R> the type of the return value
+   * @see CoprocessorCallable
+   * @see CoprocessorCallback
+   */
+  <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
+      CoprocessorCallable<S, R> callable, byte[] startKey, boolean 
startKeyInclusive, byte[] endKey,
+      boolean endKeyInclusive, CoprocessorCallback<R> callback);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 87323ac..00f255e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -18,17 +18,26 @@
 package org.apache.hadoop.hbase.client;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+
+import com.google.protobuf.RpcChannel;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -436,4 +445,90 @@ class RawAsyncTableImpl implements RawAsyncTable {
     return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
   }
 
+  private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, 
S> stubMaker,
+      CoprocessorCallable<S, R> callable, HRegionInfo region, byte[] row) {
+    RegionCoprocessorRpcChannelImpl channel = new 
RegionCoprocessorRpcChannelImpl(conn, tableName,
+        region, row, rpcTimeoutNs, operationTimeoutNs);
+    S stub = stubMaker.apply(channel);
+    CompletableFuture<R> future = new CompletableFuture<>();
+    ClientCoprocessorRpcController controller = new 
ClientCoprocessorRpcController();
+    callable.call(stub, controller, resp -> {
+      if (controller.failed()) {
+        future.completeExceptionally(controller.getFailed());
+      } else {
+        future.complete(resp);
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, 
S> stubMaker,
+      CoprocessorCallable<S, R> callable, byte[] row) {
+    return coprocessorService(stubMaker, callable, null, row);
+  }
+
+  private boolean locateFinished(HRegionInfo region, byte[] endKey, boolean 
endKeyInclusive) {
+    if (isEmptyStopRow(endKey)) {
+      if (isEmptyStopRow(region.getEndKey())) {
+        return true;
+      }
+      return false;
+    } else {
+      if (isEmptyStopRow(region.getEndKey())) {
+        return true;
+      }
+      int c = Bytes.compareTo(endKey, region.getEndKey());
+      // 1. if the region contains endKey
+      // 2. endKey is equal to the region's endKey and we do not want to 
include endKey.
+      return c < 0 || c == 0 && !endKeyInclusive;
+    }
+  }
+
+  private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
+      CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback,
+      List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive,
+      AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, 
HRegionLocation loc,
+      Throwable error) {
+    if (error != null) {
+      callback.onError(error);
+      return;
+    }
+    unfinishedRequest.incrementAndGet();
+    HRegionInfo region = loc.getRegionInfo();
+    if (locateFinished(region, endKey, endKeyInclusive)) {
+      locateFinished.set(true);
+    } else {
+      conn.getLocator()
+          .getRegionLocation(tableName, region.getEndKey(), 
RegionLocateType.CURRENT,
+            operationTimeoutNs)
+          .whenComplete((l, e) -> onLocateComplete(stubMaker, callable, 
callback, locs, endKey,
+            endKeyInclusive, locateFinished, unfinishedRequest, l, e));
+    }
+    coprocessorService(stubMaker, callable, region, 
region.getStartKey()).whenComplete((r, e) -> {
+      if (e != null) {
+        callback.onRegionError(region, e);
+      } else {
+        callback.onRegionComplete(region, r);
+      }
+      if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
+        callback.onComplete();
+      }
+    });
+  }
+
+  @Override
+  public <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
+      CoprocessorCallable<S, R> callable, byte[] startKey, boolean 
startKeyInclusive, byte[] endKey,
+      boolean endKeyInclusive, CoprocessorCallback<R> callback) {
+    byte[] nonNullStartKey = 
Optional.ofNullable(startKey).orElse(EMPTY_START_ROW);
+    byte[] nonNullEndKey = Optional.ofNullable(endKey).orElse(EMPTY_END_ROW);
+    List<HRegionLocation> locs = new ArrayList<>();
+    conn.getLocator()
+        .getRegionLocation(tableName, nonNullStartKey,
+          startKeyInclusive ? RegionLocateType.CURRENT : 
RegionLocateType.AFTER, operationTimeoutNs)
+        .whenComplete(
+          (loc, error) -> onLocateComplete(stubMaker, callable, callback, 
locs, nonNullEndKey,
+            endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), 
loc, error));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
new file mode 100644
index 0000000..28a5564
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The implementation of a region based coprocessor rpc channel.
+ */
+@InterfaceAudience.Private
+class RegionCoprocessorRpcChannelImpl implements RpcChannel {
+
+  private final AsyncConnectionImpl conn;
+
+  private final TableName tableName;
+
+  private final HRegionInfo region;
+
+  private final byte[] row;
+
+  private final long rpcTimeoutNs;
+
+  private final long operationTimeoutNs;
+
+  RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName 
tableName, HRegionInfo region,
+      byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
+    this.conn = conn;
+    this.tableName = tableName;
+    this.region = region;
+    this.row = row;
+    this.rpcTimeoutNs = rpcTimeoutNs;
+    this.operationTimeoutNs = operationTimeoutNs;
+  }
+
+  private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message 
request,
+      Message responsePrototype, HBaseRpcController controller, 
HRegionLocation loc,
+      ClientService.Interface stub) {
+    CompletableFuture<Message> future = new CompletableFuture<>();
+    if (region != null
+        && !Bytes.equals(loc.getRegionInfo().getRegionName(), 
region.getRegionName())) {
+      future.completeExceptionally(new DoNotRetryIOException(
+          "Region name is changed, expected " + region.getRegionNameAsString() 
+ ", actual "
+              + loc.getRegionInfo().getRegionNameAsString()));
+      return future;
+    }
+    CoprocessorServiceRequest csr = 
CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
+      request, row, loc.getRegionInfo().getRegionName());
+    stub.execService(controller, csr,
+      new 
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>()
 {
+
+        @Override
+        public void run(CoprocessorServiceResponse resp) {
+          if (controller.failed()) {
+            future.completeExceptionally(controller.getFailed());
+          } else {
+            try {
+              future.complete(CoprocessorRpcUtils.getResponse(resp, 
responsePrototype));
+            } catch (IOException e) {
+              future.completeExceptionally(e);
+            }
+          }
+        }
+      });
+    return future;
+  }
+
+  @Override
+  public void callMethod(MethodDescriptor method, RpcController controller, 
Message request,
+      Message responsePrototype, RpcCallback<Message> done) {
+    conn.callerFactory.<Message> single().table(tableName).row(row)
+        .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, 
TimeUnit.NANOSECONDS)
+        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+        .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, 
s)).call()
+        .whenComplete((r, e) -> {
+          if (e != null) {
+            ((ClientCoprocessorRpcController) controller).setFailed(e);
+          }
+          done.run(r);
+        });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
index 1eda730..304722e 100644
--- 
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
+++ 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
@@ -19,12 +19,16 @@
 
 package org.apache.hadoop.hbase.client.coprocessor;
 
+import static 
org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
+import static 
org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
 import java.io.Closeable;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -49,18 +53,12 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
 import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
 /**
  * This client class is for invoking the aggregate functions deployed on the
  * Region Server side via the AggregateService. This class will implement the
@@ -227,23 +225,7 @@ public class AggregationClient implements Closeable {
     return aMaxCallBack.getMax();
   }
 
-  /*
-   * @param scan
-   * @param canFamilyBeAbsent whether column family can be absent in familyMap 
of scan
-   */
-  private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws 
IOException {
-    if (scan == null
-        || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && 
!Bytes.equals(
-          scan.getStartRow(), HConstants.EMPTY_START_ROW))
-        || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && 
!Bytes.equals(
-          scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
-      throw new IOException("Agg client Exception: Startrow should be smaller 
than Stoprow");
-    } else if (!canFamilyBeAbsent) {
-      if (scan.getFamilyMap().size() != 1) {
-        throw new IOException("There must be only one family.");
-      }
-    }
-  }
+
 
   /**
    * It gives the minimum value of a column for a given column family for the
@@ -846,22 +828,6 @@ public class AggregationClient implements Closeable {
     return null;
   }
 
-  <R, S, P extends Message, Q extends Message, T extends Message> 
AggregateRequest
-  validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean 
canFamilyBeAbsent)
-      throws IOException {
-    validateParameters(scan, canFamilyBeAbsent);
-    final AggregateRequest.Builder requestBuilder =
-        AggregateRequest.newBuilder();
-    requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
-    P columnInterpreterSpecificData = null;
-    if ((columnInterpreterSpecificData = ci.getRequestData())
-       != null) {
-      
requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
-    }
-    requestBuilder.setScan(ProtobufUtil.toScan(scan));
-    return requestBuilder.build();
-  }
-
   byte[] getBytesFromResponse(ByteString response) {
     ByteBuffer bb = response.asReadOnlyByteBuffer();
     bb.rewind();
@@ -873,40 +839,4 @@ public class AggregationClient implements Closeable {
     }
     return bytes;
   }
-
-  /**
-   * Get an instance of the argument type declared in a class's signature. The
-   * argument type is assumed to be a PB Message subclass, and the instance is
-   * created using parseFrom method on the passed ByteString.
-   * @param runtimeClass the runtime type of the class
-   * @param position the position of the argument in the class declaration
-   * @param b the ByteString which should be parsed to get the instance created
-   * @return the instance
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  // Used server-side too by Aggregation Coprocesor Endpoint. Undo this 
interdependence. TODO.
-  public static <T extends Message>
-  T getParsedGenericInstance(Class<?> runtimeClass, int position, ByteString b)
-      throws IOException {
-    Type type = runtimeClass.getGenericSuperclass();
-    Type argType = 
((ParameterizedType)type).getActualTypeArguments()[position];
-    Class<T> classType = (Class<T>)argType;
-    T inst;
-    try {
-      Method m = classType.getMethod("parseFrom", ByteString.class);
-      inst = (T)m.invoke(null, b);
-      return inst;
-    } catch (SecurityException e) {
-      throw new IOException(e);
-    } catch (NoSuchMethodException e) {
-      throw new IOException(e);
-    } catch (IllegalArgumentException e) {
-      throw new IOException(e);
-    } catch (InvocationTargetException e) {
-      throw new IOException(e);
-    } catch (IllegalAccessException e) {
-      throw new IOException(e);
-    }
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java
 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java
new file mode 100644
index 0000000..b91128c
--- /dev/null
+++ 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Helper class for constructing aggregation request and response.
+ */
+@InterfaceAudience.Private
+public class AggregationHelper {
+
+  /**
+   * @param scan
+   * @param canFamilyBeAbsent whether column family can be absent in familyMap 
of scan
+   */
+  private static void validateParameters(Scan scan, boolean canFamilyBeAbsent) 
throws IOException {
+    if (scan == null
+        || (Bytes.equals(scan.getStartRow(), scan.getStopRow())
+            && !Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
+        || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0)
+            && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
+      throw new IOException("Agg client Exception: Startrow should be smaller 
than Stoprow");
+    } else if (!canFamilyBeAbsent) {
+      if (scan.getFamilyMap().size() != 1) {
+        throw new IOException("There must be only one family.");
+      }
+    }
+  }
+
+  static <R, S, P extends Message, Q extends Message, T extends Message> 
AggregateRequest
+      validateArgAndGetPB(Scan scan, ColumnInterpreter<R, S, P, Q, T> ci, 
boolean canFamilyBeAbsent)
+          throws IOException {
+    validateParameters(scan, canFamilyBeAbsent);
+    final AggregateRequest.Builder requestBuilder = 
AggregateRequest.newBuilder();
+    requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
+    P columnInterpreterSpecificData = null;
+    if ((columnInterpreterSpecificData = ci.getRequestData()) != null) {
+      
requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
+    }
+    requestBuilder.setScan(ProtobufUtil.toScan(scan));
+    return requestBuilder.build();
+  }
+
+  /**
+   * Get an instance of the argument type declared in a class's signature. The 
argument type is
+   * assumed to be a PB Message subclass, and the instance is created using 
parseFrom method on the
+   * passed ByteString.
+   * @param runtimeClass the runtime type of the class
+   * @param position the position of the argument in the class declaration
+   * @param b the ByteString which should be parsed to get the instance created
+   * @return the instance
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  // Used server-side too by Aggregation Coprocesor Endpoint. Undo this 
interdependence. TODO.
+  public static <T extends Message> T getParsedGenericInstance(Class<?> 
runtimeClass, int position,
+      ByteString b) throws IOException {
+    Type type = runtimeClass.getGenericSuperclass();
+    Type argType = ((ParameterizedType) 
type).getActualTypeArguments()[position];
+    Class<T> classType = (Class<T>) argType;
+    T inst;
+    try {
+      Method m = classType.getMethod("parseFrom", ByteString.class);
+      inst = (T) m.invoke(null, b);
+      return inst;
+    } catch (SecurityException e) {
+      throw new IOException(e);
+    } catch (NoSuchMethodException e) {
+      throw new IOException(e);
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
+    } catch (InvocationTargetException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
new file mode 100644
index 0000000..f8d0a19
--- /dev/null
+++ 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import static 
org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
+import static 
org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
+
+import com.google.protobuf.Message;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.RawAsyncTable;
+import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback;
+import org.apache.hadoop.hbase.client.RawScanResultConsumer;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * This client class is for invoking the aggregate functions deployed on the 
Region Server side via
+ * the AggregateService. This class will implement the supporting 
functionality for
+ * summing/processing the individual results obtained from the 
AggregateService for each region.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class AsyncAggregationClient {
+
+  private static abstract class AbstractAggregationCallback<T>
+      implements CoprocessorCallback<AggregateResponse> {
+
+    private final CompletableFuture<T> future;
+
+    protected boolean finished = false;
+
+    private void completeExceptionally(Throwable error) {
+      if (finished) {
+        return;
+      }
+      finished = true;
+      future.completeExceptionally(error);
+    }
+
+    protected AbstractAggregationCallback(CompletableFuture<T> future) {
+      this.future = future;
+    }
+
+    @Override
+    public synchronized void onRegionError(HRegionInfo region, Throwable 
error) {
+      completeExceptionally(error);
+    }
+
+    @Override
+    public synchronized void onError(Throwable error) {
+      completeExceptionally(error);
+    }
+
+    protected abstract void aggregate(HRegionInfo region, AggregateResponse 
resp)
+        throws IOException;
+
+    @Override
+    public synchronized void onRegionComplete(HRegionInfo region, 
AggregateResponse resp) {
+      try {
+        aggregate(region, resp);
+      } catch (IOException e) {
+        completeExceptionally(e);
+      }
+    }
+
+    protected abstract T getFinalResult();
+
+    @Override
+    public synchronized void onComplete() {
+      if (finished) {
+        return;
+      }
+      finished = true;
+      future.complete(getFinalResult());
+    }
+  }
+
+  private static <R, S, P extends Message, Q extends Message, T extends 
Message> R
+      getCellValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, 
AggregateResponse resp,
+          int firstPartIndex) throws IOException {
+    Q q = getParsedGenericInstance(ci.getClass(), 3, 
resp.getFirstPart(firstPartIndex));
+    return ci.getCellValueFromProto(q);
+  }
+
+  private static <R, S, P extends Message, Q extends Message, T extends 
Message> S
+      getPromotedValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, 
AggregateResponse resp,
+          int firstPartIndex) throws IOException {
+    T t = getParsedGenericInstance(ci.getClass(), 4, 
resp.getFirstPart(firstPartIndex));
+    return ci.getPromotedValueFromProto(t);
+  }
+
+  public static <R, S, P extends Message, Q extends Message, T extends 
Message> CompletableFuture<R>
+      max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) 
{
+    CompletableFuture<R> future = new CompletableFuture<>();
+    AggregateRequest req;
+    try {
+      req = validateArgAndGetPB(scan, ci, false);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+      return future;
+    }
+    AbstractAggregationCallback<R> callback = new 
AbstractAggregationCallback<R>(future) {
+
+      private R max;
+
+      @Override
+      protected void aggregate(HRegionInfo region, AggregateResponse resp) 
throws IOException {
+        if (resp.getFirstPartCount() > 0) {
+          R result = getCellValueFromProto(ci, resp, 0);
+          if (max == null || (result != null && ci.compare(max, result) < 0)) {
+            max = result;
+          }
+        }
+      }
+
+      @Override
+      protected R getFinalResult() {
+        return max;
+      }
+    };
+    table.coprocessorService(channel -> AggregateService.newStub(channel),
+      (stub, controller, rpcCallback) -> stub.getMax(controller, req, 
rpcCallback),
+      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), 
scan.includeStopRow(),
+      callback);
+    return future;
+  }
+
+  public static <R, S, P extends Message, Q extends Message, T extends 
Message> CompletableFuture<R>
+      min(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) 
{
+    CompletableFuture<R> future = new CompletableFuture<>();
+    AggregateRequest req;
+    try {
+      req = validateArgAndGetPB(scan, ci, false);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+      return future;
+    }
+    AbstractAggregationCallback<R> callback = new 
AbstractAggregationCallback<R>(future) {
+
+      private R min;
+
+      @Override
+      protected void aggregate(HRegionInfo region, AggregateResponse resp) 
throws IOException {
+        if (resp.getFirstPartCount() > 0) {
+          R result = getCellValueFromProto(ci, resp, 0);
+          if (min == null || (result != null && ci.compare(min, result) > 0)) {
+            min = result;
+          }
+        }
+      }
+
+      @Override
+      protected R getFinalResult() {
+        return min;
+      }
+    };
+    table.coprocessorService(channel -> AggregateService.newStub(channel),
+      (stub, controller, rpcCallback) -> stub.getMin(controller, req, 
rpcCallback),
+      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), 
scan.includeStopRow(),
+      callback);
+    return future;
+  }
+
+  public static <R, S, P extends Message, Q extends Message, T extends Message>
+      CompletableFuture<Long>
+      rowCount(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan 
scan) {
+    CompletableFuture<Long> future = new CompletableFuture<>();
+    AggregateRequest req;
+    try {
+      req = validateArgAndGetPB(scan, ci, true);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+      return future;
+    }
+    AbstractAggregationCallback<Long> callback = new 
AbstractAggregationCallback<Long>(future) {
+
+      private long count;
+
+      @Override
+      protected void aggregate(HRegionInfo region, AggregateResponse resp) 
throws IOException {
+        count += resp.getFirstPart(0).asReadOnlyByteBuffer().getLong();
+      }
+
+      @Override
+      protected Long getFinalResult() {
+        return count;
+      }
+    };
+    table.coprocessorService(channel -> AggregateService.newStub(channel),
+      (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, 
rpcCallback),
+      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), 
scan.includeStopRow(),
+      callback);
+    return future;
+  }
+
+  public static <R, S, P extends Message, Q extends Message, T extends 
Message> CompletableFuture<S>
+      sum(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) 
{
+    CompletableFuture<S> future = new CompletableFuture<>();
+    AggregateRequest req;
+    try {
+      req = validateArgAndGetPB(scan, ci, false);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+      return future;
+    }
+    AbstractAggregationCallback<S> callback = new 
AbstractAggregationCallback<S>(future) {
+
+      private S sum;
+
+      @Override
+      protected void aggregate(HRegionInfo region, AggregateResponse resp) 
throws IOException {
+        if (resp.getFirstPartCount() > 0) {
+          S s = getPromotedValueFromProto(ci, resp, 0);
+          sum = ci.add(sum, s);
+        }
+      }
+
+      @Override
+      protected S getFinalResult() {
+        return sum;
+      }
+    };
+    table.coprocessorService(channel -> AggregateService.newStub(channel),
+      (stub, controller, rpcCallback) -> stub.getSum(controller, req, 
rpcCallback),
+      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), 
scan.includeStopRow(),
+      callback);
+    return future;
+  }
+
+  public static <R, S, P extends Message, Q extends Message, T extends Message>
+      CompletableFuture<Double>
+      avg(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) 
{
+    CompletableFuture<Double> future = new CompletableFuture<>();
+    AggregateRequest req;
+    try {
+      req = validateArgAndGetPB(scan, ci, false);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+      return future;
+    }
+    AbstractAggregationCallback<Double> callback = new 
AbstractAggregationCallback<Double>(future) {
+
+      private S sum;
+
+      long count = 0L;
+
+      @Override
+      protected void aggregate(HRegionInfo region, AggregateResponse resp) 
throws IOException {
+        if (resp.getFirstPartCount() > 0) {
+          sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0));
+          count += resp.getSecondPart().asReadOnlyByteBuffer().getLong();
+        }
+      }
+
+      @Override
+      protected Double getFinalResult() {
+        return ci.divideForAvg(sum, count);
+      }
+    };
+    table.coprocessorService(channel -> AggregateService.newStub(channel),
+      (stub, controller, rpcCallback) -> stub.getAvg(controller, req, 
rpcCallback),
+      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), 
scan.includeStopRow(),
+      callback);
+    return future;
+  }
+
+  public static <R, S, P extends Message, Q extends Message, T extends Message>
+      CompletableFuture<Double>
+      std(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) 
{
+    CompletableFuture<Double> future = new CompletableFuture<>();
+    AggregateRequest req;
+    try {
+      req = validateArgAndGetPB(scan, ci, false);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+      return future;
+    }
+    AbstractAggregationCallback<Double> callback = new 
AbstractAggregationCallback<Double>(future) {
+
+      private S sum;
+
+      private S sumSq;
+
+      private long count;
+
+      @Override
+      protected void aggregate(HRegionInfo region, AggregateResponse resp) 
throws IOException {
+        if (resp.getFirstPartCount() > 0) {
+          sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0));
+          sumSq = ci.add(sumSq, getPromotedValueFromProto(ci, resp, 1));
+          count += resp.getSecondPart().asReadOnlyByteBuffer().getLong();
+        }
+      }
+
+      @Override
+      protected Double getFinalResult() {
+        double avg = ci.divideForAvg(sum, count);
+        double avgSq = ci.divideForAvg(sumSq, count);
+        return Math.sqrt(avgSq - avg * avg);
+      }
+    };
+    table.coprocessorService(channel -> AggregateService.newStub(channel),
+      (stub, controller, rpcCallback) -> stub.getStd(controller, req, 
rpcCallback),
+      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), 
scan.includeStopRow(),
+      callback);
+    return future;
+  }
+
+  // the map key is the startRow of the region
+  private static <R, S, P extends Message, Q extends Message, T extends 
Message>
+      CompletableFuture<NavigableMap<byte[], S>>
+      sumByRegion(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, 
Scan scan) {
+    CompletableFuture<NavigableMap<byte[], S>> future =
+        new CompletableFuture<NavigableMap<byte[], S>>();
+    AggregateRequest req;
+    try {
+      req = validateArgAndGetPB(scan, ci, false);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+      return future;
+    }
+    int firstPartIndex = scan.getFamilyMap().get(scan.getFamilies()[0]).size() 
- 1;
+    AbstractAggregationCallback<NavigableMap<byte[], S>> callback =
+        new AbstractAggregationCallback<NavigableMap<byte[], S>>(future) {
+
+          private final NavigableMap<byte[], S> map = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+          @Override
+          protected void aggregate(HRegionInfo region, AggregateResponse resp) 
throws IOException {
+            if (resp.getFirstPartCount() > 0) {
+              map.put(region.getStartKey(), getPromotedValueFromProto(ci, 
resp, firstPartIndex));
+            }
+          }
+
+          @Override
+          protected NavigableMap<byte[], S> getFinalResult() {
+            return map;
+          }
+        };
+    table.coprocessorService(channel -> AggregateService.newStub(channel),
+      (stub, controller, rpcCallback) -> stub.getMedian(controller, req, 
rpcCallback),
+      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), 
scan.includeStopRow(),
+      callback);
+    return future;
+  }
+
+  private static <R, S, P extends Message, Q extends Message, T extends 
Message> void findMedian(
+      CompletableFuture<R> future, RawAsyncTable table, ColumnInterpreter<R, 
S, P, Q, T> ci,
+      Scan scan, NavigableMap<byte[], S> sumByRegion) {
+    double halfSum = 
ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L);
+    S movingSum = null;
+    byte[] startRow = null;
+    for (Map.Entry<byte[], S> entry : sumByRegion.entrySet()) {
+      startRow = entry.getKey();
+      S newMovingSum = ci.add(movingSum, entry.getValue());
+      if (ci.divideForAvg(newMovingSum, 1L) > halfSum) {
+        break;
+      }
+      movingSum = newMovingSum;
+    }
+    if (startRow != null) {
+      scan.withStartRow(startRow);
+    }
+    // we can not pass movingSum directly to an anonymous class as it is not 
final.
+    S baseSum = movingSum;
+    byte[] family = scan.getFamilies()[0];
+    NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
+    byte[] weightQualifier = qualifiers.last();
+    byte[] valueQualifier = qualifiers.first();
+    table.scan(scan, new RawScanResultConsumer() {
+
+      private S sum = baseSum;
+
+      private R value = null;
+
+      @Override
+      public boolean onNext(Result[] results) {
+        try {
+          for (Result result : results) {
+            Cell weightCell = result.getColumnLatestCell(family, 
weightQualifier);
+            R weight = ci.getValue(family, weightQualifier, weightCell);
+            sum = ci.add(sum, ci.castToReturnType(weight));
+            if (ci.divideForAvg(sum, 1L) > halfSum) {
+              if (value != null) {
+                future.complete(value);
+              } else {
+                future.completeExceptionally(new NoSuchElementException());
+              }
+              return false;
+            }
+            Cell valueCell = result.getColumnLatestCell(family, 
valueQualifier);
+            value = ci.getValue(family, valueQualifier, valueCell);
+          }
+          return true;
+        } catch (IOException e) {
+          future.completeExceptionally(e);
+          return false;
+        }
+      }
+
+      @Override
+      public void onError(Throwable error) {
+        future.completeExceptionally(error);
+      }
+
+      @Override
+      public void onComplete() {
+        if (!future.isDone()) {
+          // we should not reach here as the future should be completed in 
onNext.
+          future.completeExceptionally(new NoSuchElementException());
+        }
+      }
+    });
+  }
+
+  public static <R, S, P extends Message, Q extends Message, T extends 
Message> CompletableFuture<R>
+      median(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan 
scan) {
+    CompletableFuture<R> future = new CompletableFuture<>();
+    sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+      } else if (sumByRegion.isEmpty()) {
+        future.completeExceptionally(new NoSuchElementException());
+      } else {
+        findMedian(future, table, ci, 
ReflectionUtils.newInstance(scan.getClass(), scan),
+          sumByRegion);
+      }
+    });
+    return future;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
----------------------------------------------------------------------
diff --git 
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
index 08b0562..bccb76a 100644
--- 
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
+++ 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -18,6 +18,14 @@
  */
 package org.apache.hadoop.hbase.coprocessor;
 
+import static 
org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -31,7 +39,6 @@ import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -40,12 +47,6 @@ import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRespo
 import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-
 /**
  * A concrete AggregateProtocol implementation. Its system level coprocessor
  * that computes the aggregate function at a region level.
@@ -485,7 +486,7 @@ extends AggregateService implements CoprocessorService, 
Coprocessor {
       ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) 
cls.newInstance();
       if (request.hasInterpreterSpecificBytes()) {
         ByteString b = request.getInterpreterSpecificBytes();
-        P initMsg = AggregationClient.getParsedGenericInstance(ci.getClass(), 
2, b);
+        P initMsg = getParsedGenericInstance(ci.getClass(), 2, b);
         ci.initialize(initMsg);
       }
       return ci;

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
new file mode 100644
index 0000000..1274dd5
--- /dev/null
+++ 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.coprocessor.AsyncAggregationClient;
+import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
+import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, CoprocessorTests.class })
+public class TestAsyncAggregationClient {
+
+  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = 
TableName.valueOf("TestAsyncAggregationClient");
+
+  private static byte[] CF = Bytes.toBytes("CF");
+
+  private static byte[] CQ = Bytes.toBytes("CQ");
+
+  private static byte[] CQ2 = Bytes.toBytes("CQ2");
+
+  private static int COUNT = 1000;
+
+  private static AsyncConnection CONN;
+
+  private static RawAsyncTable TABLE;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+      AggregateImplementation.class.getName());
+    UTIL.startMiniCluster(3);
+    byte[][] splitKeys = new byte[8][];
+    for (int i = 111; i < 999; i += 111) {
+      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+    }
+    UTIL.createTable(TABLE_NAME, CF, splitKeys);
+    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration());
+    TABLE = CONN.getRawTable(TABLE_NAME);
+    TABLE.putAll(LongStream.range(0, COUNT)
+        .mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l)))
+            .addColumn(CF, CQ, Bytes.toBytes(l)).addColumn(CF, CQ2, 
Bytes.toBytes(l * l)))
+        .collect(Collectors.toList())).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMax() throws InterruptedException, ExecutionException {
+    assertEquals(COUNT - 1, AsyncAggregationClient
+        .max(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get().longValue());
+  }
+
+  @Test
+  public void testMin() throws InterruptedException, ExecutionException {
+    assertEquals(0, AsyncAggregationClient
+        .min(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get().longValue());
+  }
+
+  @Test
+  public void testRowCount() throws InterruptedException, ExecutionException {
+    assertEquals(COUNT,
+      AsyncAggregationClient
+          .rowCount(TABLE, new LongColumnInterpreter(), new 
Scan().addColumn(CF, CQ)).get()
+          .longValue());
+  }
+
+  @Test
+  public void testSum() throws InterruptedException, ExecutionException {
+    assertEquals(COUNT * (COUNT - 1) / 2, AsyncAggregationClient
+        .sum(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get().longValue());
+  }
+
+  private static final double DELTA = 1E-3;
+
+  @Test
+  public void testAvg() throws InterruptedException, ExecutionException {
+    assertEquals((COUNT - 1) / 2.0, AsyncAggregationClient
+        .avg(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get().doubleValue(),
+      DELTA);
+  }
+
+  @Test
+  public void testStd() throws InterruptedException, ExecutionException {
+    double avgSq =
+        LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + 
l2).getAsLong()
+            / (double) COUNT;
+    double avg = (COUNT - 1) / 2.0;
+    double std = Math.sqrt(avgSq - avg * avg);
+    assertEquals(std, AsyncAggregationClient
+        .std(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get().doubleValue(),
+      DELTA);
+  }
+
+  @Test
+  public void testMedian() throws InterruptedException, ExecutionException {
+    long halfSum = COUNT * (COUNT - 1) / 4;
+    long median = 0L;
+    long sum = 0L;
+    for (int i = 0; i < COUNT; i++) {
+      sum += i;
+      if (sum > halfSum) {
+        median = i - 1;
+        break;
+      }
+    }
+    assertEquals(median,
+      AsyncAggregationClient
+          .median(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get()
+          .longValue());
+  }
+
+  @Test
+  public void testMedianWithWeight() throws InterruptedException, 
ExecutionException {
+    long halfSum =
+        LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + 
l2).getAsLong() / 2;
+    long median = 0L;
+    long sum = 0L;
+    for (int i = 0; i < COUNT; i++) {
+      sum += i * i;
+      if (sum > halfSum) {
+        median = i - 1;
+        break;
+      }
+    }
+    assertEquals(median, AsyncAggregationClient
+        .median(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ).addColumn(CF, CQ2))
+        .get().longValue());
+  }
+}

Reply via email to