Revert "HBASE-17346 AggregationClient cleanup" Revert because I had wrong JIRA number (Spotted by Duo Zhang)
This reverts commit 0a93241b61e6183b5671a4e7940e6212a17acd66. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0583d793 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0583d793 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0583d793 Branch: refs/heads/hbase-12439 Commit: 0583d79346e16962bb35fed7587d56d2ec71c0fa Parents: 69ce596 Author: Michael Stack <[email protected]> Authored: Tue Jan 3 19:15:53 2017 -0800 Committer: Michael Stack <[email protected]> Committed: Tue Jan 3 19:15:53 2017 -0800 ---------------------------------------------------------------------- .../client/coprocessor/AggregationClient.java | 94 +++++--------------- 1 file changed, 23 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0583d793/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java index d236342..cde7d41 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; @@ -58,8 +59,6 @@ import org.apache.hadoop.hbase.util.Pair; import com.google.protobuf.ByteString; import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; /** * This client class is for invoking the aggregate functions deployed on the @@ -82,60 +81,13 @@ import com.google.protobuf.RpcController; * </ul> * <p>Call {@link #close()} when done. */ [email protected] [email protected] public class AggregationClient implements Closeable { // TODO: This class is not used. Move to examples? private static final Log log = LogFactory.getLog(AggregationClient.class); private final Connection connection; /** - * An RpcController implementation for use here in this endpoint. - */ - static class AggregationClientRpcController implements RpcController { - private String errorText; - private boolean cancelled = false; - private boolean failed = false; - - @Override - public String errorText() { - return this.errorText; - } - - @Override - public boolean failed() { - return this.failed; - } - - @Override - public boolean isCanceled() { - return this.cancelled; - } - - @Override - public void notifyOnCancel(RpcCallback<Object> arg0) { - throw new UnsupportedOperationException(); - } - - @Override - public void reset() { - this.errorText = null; - this.cancelled = false; - this.failed = false; - } - - @Override - public void setFailed(String errorText) { - this.failed = true; - this.errorText = errorText; - } - - @Override - public void startCancel() { - this.cancelled = true; - } - } - - /** * Constructor with Conf object * @param cfg */ @@ -208,13 +160,13 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, R>() { @Override public R call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getMax(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); @@ -296,13 +248,13 @@ public class AggregationClient implements Closeable { @Override public R call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getMin(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); @@ -371,13 +323,13 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, Long>() { @Override public Long call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getRowNum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); @@ -436,14 +388,14 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, S>() { @Override public S call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); // Not sure what is going on here why I have to do these casts. TODO. CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getSum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } if (response.getFirstPartCount() == 0) { return null; @@ -504,13 +456,13 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, Pair<S, Long>>() { @Override public Pair<S, Long> call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getAvg(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } Pair<S, Long> pair = new Pair<S, Long>(null, 0L); if (response.getFirstPartCount() == 0) { @@ -608,13 +560,13 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, Pair<List<S>, Long>>() { @Override public Pair<List<S>, Long> call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getStd(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L); if (response.getFirstPartCount() == 0) { @@ -724,13 +676,13 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, List<S>>() { @Override public List<S> call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getMedian(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } List<S> list = new ArrayList<S>();
