HBASE-14703 HTable.mutateRow does not collect stats (Heng Chen)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ef712df9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ef712df9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ef712df9 Branch: refs/heads/master Commit: ef712df944b0745892bc13bcecdfd6e358a71b66 Parents: d083e4f Author: Jesse Yates <[email protected]> Authored: Fri Mar 4 19:07:59 2016 -0800 Committer: Jesse Yates <[email protected]> Committed: Sat Mar 5 11:01:45 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 114 +- .../org/apache/hadoop/hbase/client/HTable.java | 151 +- .../hadoop/hbase/client/MetricsConnection.java | 10 +- .../hadoop/hbase/client/MultiResponse.java | 57 +- .../hbase/client/MultiServerCallable.java | 17 +- .../client/PayloadCarryingServerCallable.java | 48 + .../hadoop/hbase/client/ResultStatsUtil.java | 12 +- .../hbase/client/RetryingTimeTracker.java | 57 + .../hbase/client/RpcRetryingCallerFactory.java | 6 - .../hbase/client/RpcRetryingCallerImpl.java | 34 +- .../hbase/client/ServerStatisticTracker.java | 3 +- .../hadoop/hbase/client/StatisticTrackable.java | 33 + .../client/StatsTrackingRpcRetryingCaller.java | 77 - .../hadoop/hbase/protobuf/ProtobufUtil.java | 4 +- .../hbase/protobuf/ResponseConverter.java | 28 +- .../hadoop/hbase/client/TestAsyncProcess.java | 18 +- .../hbase/protobuf/generated/ClientProtos.java | 1452 +++++++++++++++++- hbase-protocol/src/main/protobuf/Client.proto | 8 +- .../hadoop/hbase/regionserver/HRegion.java | 4 +- .../hbase/regionserver/RSRpcServices.java | 68 +- .../hadoop/hbase/client/TestCheckAndMutate.java | 9 +- .../hadoop/hbase/client/TestClientPushback.java | 29 + .../hadoop/hbase/client/TestFromClientSide.java | 8 +- .../hadoop/hbase/client/TestReplicasClient.java | 5 +- 24 files changed, 1891 insertions(+), 361 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 65c15ce..7b0016c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Trace; @@ -428,7 +429,7 @@ class AsyncProcess { if (retainedActions.isEmpty()) return NO_REQS_RESULT; return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, - locationErrors, locationErrorRows, actionsByServer, pool); + locationErrors, locationErrorRows, actionsByServer, pool); } <CResult> AsyncRequestFuture submitMultiActions(TableName tableName, @@ -444,7 +445,7 @@ class AsyncProcess { int originalIndex = locationErrorRows.get(i); Row row = retainedActions.get(originalIndex).getAction(); ars.manageError(originalIndex, row, - Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); + Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); } } ars.sendMultiAction(actionsByServer, 1, null, false); @@ -546,9 +547,13 @@ class AsyncProcess { */ public <CResult> AsyncRequestFuture submitAll(TableName tableName, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { - return submitAll(null, tableName, rows, callback, results); + return submitAll(null, tableName, rows, callback, results, null, timeout); } + public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, + List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { + return submitAll(pool, tableName, rows, callback, results, null, timeout); + } /** * Submit immediately the list of rows, whatever the server status. Kept for backward * compatibility: it allows to be used with the batch interface that return an array of objects. @@ -560,7 +565,8 @@ class AsyncProcess { * @param results Optional array to return the results thru; backward compat. */ public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { + List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results, + PayloadCarryingServerCallable callable, int curTimeout) { List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -579,7 +585,8 @@ class AsyncProcess { actions.add(action); } AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( - tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null); + tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, + callable, curTimeout); ars.groupAndSendMultiAction(actions, 1); return ars; } @@ -711,11 +718,11 @@ class AsyncProcess { private final MultiAction<Row> multiAction; private final int numAttempt; private final ServerName server; - private final Set<MultiServerCallable<Row>> callsInProgress; + private final Set<PayloadCarryingServerCallable> callsInProgress; private SingleServerRequestRunnable( MultiAction<Row> multiAction, int numAttempt, ServerName server, - Set<MultiServerCallable<Row>> callsInProgress) { + Set<PayloadCarryingServerCallable> callsInProgress) { this.multiAction = multiAction; this.numAttempt = numAttempt; this.server = server; @@ -725,19 +732,22 @@ class AsyncProcess { @Override public void run() { MultiResponse res; - MultiServerCallable<Row> callable = null; + PayloadCarryingServerCallable callable = currentCallable; try { - callable = createCallable(server, tableName, multiAction); + // setup the callable based on the actions, if we don't have one already from the request + if (callable == null) { + callable = createCallable(server, tableName, multiAction); + } + RpcRetryingCaller<MultiResponse> caller = createCaller(callable); try { - RpcRetryingCaller<MultiResponse> caller = createCaller(callable); - if (callsInProgress != null) callsInProgress.add(callable); - res = caller.callWithoutRetries(callable, timeout); - + if (callsInProgress != null) { + callsInProgress.add(callable); + } + res = caller.callWithoutRetries(callable, currentCallTotalTimeout); if (res == null) { // Cancelled return; } - } catch (IOException e) { // The service itself failed . It may be an error coming from the communication // layer, but, as well, a functional error raised by the server. @@ -771,7 +781,7 @@ class AsyncProcess { private final BatchErrors errors; private final ConnectionImplementation.ServerErrorTracker errorsByServer; private final ExecutorService pool; - private final Set<MultiServerCallable<Row>> callsInProgress; + private final Set<PayloadCarryingServerCallable> callsInProgress; private final TableName tableName; @@ -798,10 +808,12 @@ class AsyncProcess { private final int[] replicaGetIndices; private final boolean hasAnyReplicaGets; private final long nonceGroup; + private PayloadCarryingServerCallable currentCallable; + private int currentCallTotalTimeout; public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback<CResult> callback) { + Batch.Callback<CResult> callback, PayloadCarryingServerCallable callable, int timeout) { this.pool = pool; this.callback = callback; this.nonceGroup = nonceGroup; @@ -865,13 +877,16 @@ class AsyncProcess { this.replicaGetIndices = null; } this.callsInProgress = !hasAnyReplicaGets ? null : - Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>()); + Collections.newSetFromMap( + new ConcurrentHashMap<PayloadCarryingServerCallable, Boolean>()); this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); + this.currentCallable = callable; + this.currentCallTotalTimeout = timeout; } - public Set<MultiServerCallable<Row>> getCallsInProgress() { + public Set<PayloadCarryingServerCallable> getCallsInProgress() { return callsInProgress; } @@ -1275,11 +1290,15 @@ class AsyncProcess { int failureCount = 0; boolean canRetry = true; - // Go by original action. + Map<byte[], MultiResponse.RegionResult> results = responses.getResults(); + updateStats(server, results); + int failed = 0, stopped = 0; + // Go by original action. for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) { byte[] regionName = regionEntry.getKey(); - Map<Integer, Object> regionResults = responses.getResults().get(regionName); + Map<Integer, Object> regionResults = results.get(regionName) == null + ? null : results.get(regionName).result; if (regionResults == null) { if (!responses.getExceptions().containsKey(regionName)) { LOG.error("Server sent us neither results nor exceptions for " @@ -1308,7 +1327,7 @@ class AsyncProcess { } ++failureCount; Retry retry = manageError(sentAction.getOriginalIndex(), row, - canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server); + canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server); if (retry == Retry.YES) { toReplay.add(sentAction); } else if (retry == Retry.NO_OTHER_SUCCEEDED) { @@ -1317,24 +1336,11 @@ class AsyncProcess { ++failed; } } else { - - if (AsyncProcess.this.connection.getConnectionMetrics() != null) { - AsyncProcess.this.connection.getConnectionMetrics(). - updateServerStats(server, regionName, result); - } - - // update the stats about the region, if its a user table. We don't want to slow down - // updates to meta tables, especially from internal updates (master, etc). - if (AsyncProcess.this.connection.getStatisticsTracker() != null) { - result = ResultStatsUtil.updateStats(result, - AsyncProcess.this.connection.getStatisticsTracker(), server, regionName); - } - if (callback != null) { try { //noinspection unchecked // TODO: would callback expect a replica region name if it gets one? - this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result); + this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result); } catch (Throwable t) { LOG.error("User callback threw an exception for " + Bytes.toStringBinary(regionName) + ", ignoring", t); @@ -1384,7 +1390,6 @@ class AsyncProcess { } } } - if (toReplay.isEmpty()) { logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); } else { @@ -1438,8 +1443,8 @@ class AsyncProcess { boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); int index = action.getOriginalIndex(); if (results == null) { - decActionCounter(index); - return; // Simple case, no replica requests. + decActionCounter(index); + return; // Simple case, no replica requests. } state = trySetResultSimple(index, action.getAction(), false, result, null, isStale); if (state == null) { @@ -1618,7 +1623,7 @@ class AsyncProcess { throw new InterruptedIOException(iex.getMessage()); } finally { if (callsInProgress != null) { - for (MultiServerCallable<Row> clb : callsInProgress) { + for (PayloadCarryingServerCallable clb : callsInProgress) { clb.cancel(); } } @@ -1675,13 +1680,38 @@ class AsyncProcess { } } + private void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { + boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() != null; + boolean stats = AsyncProcess.this.connection.getStatisticsTracker() != null; + if (!stats && !metrics) { + return; + } + for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : results.entrySet()) { + byte[] regionName = regionStats.getKey(); + ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat(); + ResultStatsUtil.updateStats(AsyncProcess.this.connection.getStatisticsTracker(), server, + regionName, stat); + ResultStatsUtil.updateStats(AsyncProcess.this.connection.getConnectionMetrics(), + server, regionName, stat); + } + } + + protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( + TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, + Batch.Callback<CResult> callback, Object[] results, boolean needResults, + PayloadCarryingServerCallable callable, int curTimeout) { + return new AsyncRequestFutureImpl<CResult>( + tableName, actions, nonceGroup, getPool(pool), needResults, + results, callback, callable, curTimeout); + } + @VisibleForTesting /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, Batch.Callback<CResult> callback, Object[] results, boolean needResults) { - return new AsyncRequestFutureImpl<CResult>( - tableName, actions, nonceGroup, getPool(pool), needResults, results, callback); + return createAsyncRequestFuture( + tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout); } /** @@ -1697,7 +1727,7 @@ class AsyncProcess { * Create a caller. Isolated to be easily overridden in the tests. */ @VisibleForTesting - protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { + protected RpcRetryingCaller<MultiResponse> createCaller(PayloadCarryingServerCallable callable) { return rpcCallerFactory.<MultiResponse> newCaller(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index ce5a44c..33fd94e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; @@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; @@ -263,7 +265,8 @@ public class HTable implements HTableInterface { */ @Override public HTableDescriptor getTableDescriptor() throws IOException { - HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, operationTimeout); + HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, + rpcCallerFactory, operationTimeout); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } @@ -450,10 +453,10 @@ public class HTable implements HTableInterface { // Call that takes into account the replica RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( - rpcControllerFactory, tableName, this.connection, get, pool, - tableConfiguration.getRetriesNumber(), - operationTimeout, - tableConfiguration.getPrimaryCallTimeoutMicroSecond()); + rpcControllerFactory, tableName, this.connection, get, pool, + tableConfiguration.getRetriesNumber(), + operationTimeout, + tableConfiguration.getPrimaryCallTimeoutMicroSecond()); return callable.call(); } @@ -587,35 +590,47 @@ public class HTable implements HTableInterface { */ @Override public void mutateRow(final RowMutations rm) throws IOException { - RegionServerCallable<Void> callable = - new RegionServerCallable<Void>(connection, getName(), rm.getRow()) { - @Override - public Void call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( - getLocation().getRegionInfo().getRegionName(), rm); - regionMutationBuilder.setAtomic(true); - MultiRequest request = - MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - ClientProtos.MultiResponse response = getStub().multi(controller, request); - ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); - if (res.hasException()) { - Throwable ex = ProtobufUtil.toException(res.getException()); - if(ex instanceof IOException) { - throw (IOException)ex; + final RetryingTimeTracker tracker = new RetryingTimeTracker(); + PayloadCarryingServerCallable<MultiResponse> callable = + new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(), + rpcControllerFactory) { + @Override + public MultiResponse call(int callTimeout) throws IOException { + tracker.start(); + controller.setPriority(tableName); + int remainingTime = tracker.getRemainingTime(callTimeout); + if (remainingTime == 0) { + throw new DoNotRetryIOException("Timeout for mutate row"); + } + controller.setCallTimeout(remainingTime); + try { + RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( + getLocation().getRegionInfo().getRegionName(), rm); + regionMutationBuilder.setAtomic(true); + MultiRequest request = + MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); + ClientProtos.MultiResponse response = getStub().multi(controller, request); + ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); + if (res.hasException()) { + Throwable ex = ProtobufUtil.toException(res.getException()); + if (ex instanceof IOException) { + throw (IOException) ex; + } + throw new IOException("Failed to mutate row: " + + Bytes.toStringBinary(rm.getRow()), ex); } - throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex); + return ResponseConverter.getResults(request, response, controller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); } - return null; - } - }; - rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout); + }; + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), + null, null, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw ars.getErrors(); + } } /** @@ -860,37 +875,55 @@ public class HTable implements HTableInterface { */ @Override public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, - final CompareOp compareOp, final byte [] value, final RowMutations rm) - throws IOException { - RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MultiRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, rm); - ClientProtos.MultiResponse response = getStub().multi(controller, request); - ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); - if (res.hasException()) { - Throwable ex = ProtobufUtil.toException(res.getException()); - if(ex instanceof IOException) { - throw (IOException)ex; - } - throw new IOException("Failed to checkAndMutate row: "+ - Bytes.toStringBinary(rm.getRow()), ex); + final CompareOp compareOp, final byte [] value, final RowMutations rm) + throws IOException { + final RetryingTimeTracker tracker = new RetryingTimeTracker(); + PayloadCarryingServerCallable<MultiResponse> callable = + new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(), + rpcControllerFactory) { + @Override + public MultiResponse call(int callTimeout) throws IOException { + tracker.start(); + controller.setPriority(tableName); + int remainingTime = tracker.getRemainingTime(callTimeout); + if (remainingTime == 0) { + throw new DoNotRetryIOException("Timeout for mutate row"); + } + controller.setCallTimeout(remainingTime); + try { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MultiRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, rm); + ClientProtos.MultiResponse response = getStub().multi(controller, request); + ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); + if (res.hasException()) { + Throwable ex = ProtobufUtil.toException(res.getException()); + if(ex instanceof IOException) { + throw (IOException)ex; } - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + throw new IOException("Failed to checkAndMutate row: "+ + Bytes.toStringBinary(rm.getRow()), ex); } + return ResponseConverter.getResults(request, response, controller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - }; - return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout); + } + }; + /** + * Currently, we use one array to store 'processed' flag which is returned by server. + * It is excessive to send such a large array, but that is required by the framework right now + * */ + Object[] results = new Object[rm.getMutations().size()]; + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), + null, results, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw ars.getErrors(); + } + + return ((Result)results[0]).getExists(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 6a292bc..400f505 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -52,7 +52,7 @@ import static com.codahale.metrics.MetricRegistry.name; * {@link #shutdown()} to terminate the thread pools they allocate. */ @InterfaceAudience.Private -public class MetricsConnection { +public class MetricsConnection implements StatisticTrackable { /** Set this key to {@code true} to enable metrics collection of client requests. */ public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; @@ -199,9 +199,15 @@ public class MetricsConnection { } Result result = (Result) r; ClientProtos.RegionLoadStats stats = result.getStats(); - if(stats == null){ + if (stats == null) { return; } + updateRegionStats(serverName, regionName, stats); + } + + @Override + public void updateRegionStats(ServerName serverName, byte[] regionName, + ClientProtos.RegionLoadStats stats) { String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); ConcurrentMap<byte[], RegionStats> rsStats = null; if (serverStats.containsKey(serverName)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java index 089ccff..79a9ed3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; /** @@ -33,8 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes; public class MultiResponse { // map of regionName to map of Results by the original index for that Result - private Map<byte[], Map<Integer, Object>> results = - new TreeMap<byte[], Map<Integer, Object>>(Bytes.BYTES_COMPARATOR); + private Map<byte[], RegionResult> results = new TreeMap<>(Bytes.BYTES_COMPARATOR); /** * The server can send us a failure for the region itself, instead of individual failure. @@ -52,8 +52,8 @@ public class MultiResponse { */ public int size() { int size = 0; - for (Map<?,?> c : results.values()) { - size += c.size(); + for (RegionResult result: results.values()) { + size += result.size(); } return size; } @@ -66,16 +66,7 @@ public class MultiResponse { * @param resOrEx the result or error; will be empty for successful Put and Delete actions. */ public void add(byte[] regionName, int originalIndex, Object resOrEx) { - Map<Integer, Object> rs = results.get(regionName); - if (rs == null) { - rs = new HashMap<Integer, Object>(); - results.put(regionName, rs); - } - rs.put(originalIndex, resOrEx); - } - - public Map<byte[], Map<Integer, Object>> getResults() { - return results; + getResult(regionName).addResult(originalIndex, resOrEx); } public void addException(byte []regionName, Throwable ie){ @@ -92,4 +83,42 @@ public class MultiResponse { public Map<byte[], Throwable> getExceptions() { return exceptions; } + + public void addStatistic(byte[] regionName, ClientProtos.RegionLoadStats stat) { + getResult(regionName).setStat(stat); + } + + private RegionResult getResult(byte[] region){ + RegionResult rs = results.get(region); + if (rs == null) { + rs = new RegionResult(); + results.put(region, rs); + } + return rs; + } + + public Map<byte[], RegionResult> getResults(){ + return this.results; + } + + static class RegionResult{ + Map<Integer, Object> result = new HashMap<>(); + ClientProtos.RegionLoadStats stat; + + public void addResult(int index, Object result){ + this.result.put(index, result); + } + + public void setStat(ClientProtos.RegionLoadStats stat){ + this.stat = stat; + } + + public int size() { + return this.result.size(); + } + + public ClientProtos.RegionLoadStats getStat() { + return this.stat; + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 85b401e..f78f348 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -50,21 +49,19 @@ import com.google.protobuf.ServiceException; * {@link RegionServerCallable} that goes against multiple regions. * @param <R> */ -class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> implements Cancellable { +class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> { private final MultiAction<R> multiAction; private final boolean cellBlock; - private final PayloadCarryingRpcController controller; MultiServerCallable(final ClusterConnection connection, final TableName tableName, final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) { - super(connection, tableName, null); + super(connection, tableName, null, rpcFactory); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so // we will store the server here, and throw if someone tries to obtain location/regioninfo. this.location = new HRegionLocation(null, location); this.cellBlock = isCellBlock(); - controller = rpcFactory.newController(); } @Override @@ -133,16 +130,6 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> impleme return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner()); } - @Override - public void cancel() { - controller.startCancel(); - } - - @Override - public boolean isCancelled() { - return controller.isCanceled(); - } - /** * @return True if we should send data in cellblocks. This is an expensive call. Cache the * result if you can rather than call each time. http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java new file mode 100644 index 0000000..d94f069 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +/** + * This class is used to unify HTable calls with AsyncProcess Framework. + * HTable can use AsyncProcess directly though this class. + */ [email protected] +public abstract class PayloadCarryingServerCallable<T> + extends RegionServerCallable<T> implements Cancellable { + protected PayloadCarryingRpcController controller; + + public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, + RpcControllerFactory rpcControllerFactory) { + super(connection, tableName, row); + this.controller = rpcControllerFactory.newController(); + } + + @Override + public void cancel() { + controller.startCancel(); + } + + @Override + public boolean isCancelled() { + return controller.isCanceled(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java index 3caa63e..6537d79 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java @@ -55,13 +55,17 @@ public final class ResultStatsUtil { return r; } - if (regionName != null) { - serverStats.updateRegionStats(server, regionName, stats); - } - + updateStats(serverStats, server, regionName, stats); return r; } + public static void updateStats(StatisticTrackable tracker, ServerName server, byte[] regionName, + ClientProtos.RegionLoadStats stats) { + if (regionName != null && stats != null && tracker != null) { + tracker.updateRegionStats(server, regionName, stats); + } + } + public static <T> T updateStats(T r, ServerStatisticTracker stats, HRegionLocation regionLocation) { byte[] regionName = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java new file mode 100644 index 0000000..24288e6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Tracks the amount of time remaining for an operation. + */ +class RetryingTimeTracker { + + private long globalStartTime = -1; + + public void start() { + if (this.globalStartTime < 0) { + this.globalStartTime = EnvironmentEdgeManager.currentTime(); + } + } + + public int getRemainingTime(int callTimeout) { + if (callTimeout <= 0) { + return 0; + } else { + if (callTimeout == Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + int remainingTime = (int) ( + callTimeout - + (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); + if (remainingTime < 1) { + // If there is no time left, we're trying anyway. It's too late. + // 0 means no timeout, and it's not the intent here. So we secure both cases by + // resetting to the minimum. + remainingTime = 1; + } + return remainingTime; + } + } + + public long getStartTime() { + return this.globalStartTime; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 0af8210..550812f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -67,12 +67,6 @@ public class RpcRetryingCallerFactory { // is cheap as it does not require parsing a complex structure. RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor, startLogErrorsCnt); - - // wrap it with stats, if we are tracking them - if (enableBackPressure && this.stats != null) { - caller = new StatsTrackingRpcRetryingCaller<T>(caller, this.stats); - } - return caller; } http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 12abc6a..6ce4956 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -51,10 +51,6 @@ import com.google.protobuf.ServiceException; public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> { // LOG is being used in TestMultiRowRangeFilter, hence leaving it public public static final Log LOG = LogFactory.getLog(RpcRetryingCallerImpl.class); - /** - * When we started making calls. - */ - private long globalStartTime; /** How many retries are allowed before we start to log */ private final int startLogErrorsCnt; @@ -64,6 +60,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> { private final AtomicBoolean cancelled = new AtomicBoolean(false); private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptorContext context; + private final RetryingTimeTracker tracker; public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) { this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt); @@ -76,23 +73,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> { this.interceptor = interceptor; context = interceptor.createEmptyContext(); this.startLogErrorsCnt = startLogErrorsCnt; - } - - private int getRemainingTime(int callTimeout) { - if (callTimeout <= 0) { - return 0; - } else { - if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE; - int remainingTime = (int) (callTimeout - - (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); - if (remainingTime < 1) { - // If there is no time left, we're trying anyway. It's too late. - // 0 means no timeout, and it's not the intent here. So we secure both cases by - // resetting to the minimum. - remainingTime = 1; - } - return remainingTime; - } + this.tracker = new RetryingTimeTracker(); } @Override @@ -108,21 +89,21 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> { throws IOException, RuntimeException { List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>(); - this.globalStartTime = EnvironmentEdgeManager.currentTime(); + tracker.start(); context.clear(); for (int tries = 0;; tries++) { long expectedSleep; try { callable.prepare(tries != 0); // if called with false, check table status on ZK interceptor.intercept(context.prepare(callable, tries)); - return callable.call(getRemainingTime(callTimeout)); + return callable.call(tracker.getRemainingTime(callTimeout)); } catch (PreemptiveFastFailException e) { throw e; } catch (Throwable t) { ExceptionUtil.rethrowIfInterrupt(t); if (tries > startLogErrorsCnt) { LOG.info("Call exception, tries=" + tries + ", maxAttempts=" + maxAttempts + ", started=" - + (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, " + + (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + " ms ago, " + "cancelled=" + cancelled.get() + ", msg=" + callable.getExceptionMessageAdditionalDetail()); } @@ -172,14 +153,13 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> { * @return Calculate how long a single call took */ private long singleCallDuration(final long expectedSleep) { - return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep; + return (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + expectedSleep; } @Override public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException { // The code of this method should be shared with withRetries. - this.globalStartTime = EnvironmentEdgeManager.currentTime(); try { callable.prepare(false); return callable.call(callTimeout); @@ -231,7 +211,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> { @Override public String toString() { - return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime + + return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() + ", pause=" + pause + ", maxAttempts=" + maxAttempts + '}'; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java index d03ecf6..b8e7923 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java @@ -31,11 +31,12 @@ import java.util.concurrent.ConcurrentHashMap; * Tracks the statistics for multiple regions */ @InterfaceAudience.Private -public class ServerStatisticTracker { +public class ServerStatisticTracker implements StatisticTrackable { private final ConcurrentHashMap<ServerName, ServerStatistics> stats = new ConcurrentHashMap<ServerName, ServerStatistics>(); + @Override public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats currentStats) { ServerStatistics stat = stats.get(server); http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java new file mode 100644 index 0000000..7bb49e7 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +/** + * Parent interface for an object to get updates about per-region statistics. + */ [email protected] +public interface StatisticTrackable { + /** + * Update stats per region. + * */ + void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats + stats); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java deleted file mode 100644 index e82f1e8..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -import java.io.IOException; - -/** - * An {@link RpcRetryingCaller} that will update the per-region stats for the call on return, - * if stats are available - */ [email protected] -public class StatsTrackingRpcRetryingCaller<T> implements RpcRetryingCaller<T> { - private final ServerStatisticTracker stats; - private final RpcRetryingCaller<T> delegate; - - public StatsTrackingRpcRetryingCaller(RpcRetryingCaller<T> delegate, - ServerStatisticTracker stats) { - this.delegate = delegate; - this.stats = stats; - } - - @Override - public void cancel() { - delegate.cancel(); - } - - @Override - public T callWithRetries(RetryingCallable<T> callable, int callTimeout) - throws IOException, RuntimeException { - T result = delegate.callWithRetries(callable, callTimeout); - return updateStatsAndUnwrap(result, callable); - } - - @Override - public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) - throws IOException, RuntimeException { - T result = delegate.callWithRetries(callable, callTimeout); - return updateStatsAndUnwrap(result, callable); - } - - private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) { - // don't track stats about requests that aren't to regionservers - if (!(callable instanceof RegionServerCallable)) { - return result; - } - - // mutli-server callables span multiple regions, so they don't have a location, - // but they are region server callables, so we have to handle them when we process the - // result in AsyncProcess#receiveMultiAction, not in here - if (callable instanceof MultiServerCallable) { - return result; - } - - // update the stats for the single server callable - RegionServerCallable<T> regionCallable = (RegionServerCallable) callable; - HRegionLocation location = regionCallable.getLocation(); - return ResultStatsUtil.updateStats(result, stats, location); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 261a9aa..b052e63 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -193,8 +193,8 @@ public final class ProtobufUtil { */ private final static Cell[] EMPTY_CELL_ARRAY = new Cell[]{}; private final static Result EMPTY_RESULT = Result.create(EMPTY_CELL_ARRAY); - private final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true); - private final static Result EMPTY_RESULT_EXISTS_FALSE = Result.create(null, false); + final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true); + final static Result EMPTY_RESULT_EXISTS_FALSE = Result.create(null, false); private final static Result EMPTY_RESULT_STALE = Result.create(EMPTY_CELL_ARRAY, null, true); private final static Result EMPTY_RESULT_EXISTS_TRUE_STALE = Result.create((Cell[])null, true, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 177b1c7..421907d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -89,7 +89,7 @@ public final class ResponseConverter { int requestRegionActionCount = request.getRegionActionCount(); int responseRegionActionResultCount = response.getRegionActionResultCount(); if (requestRegionActionCount != responseRegionActionResultCount) { - throw new IllegalStateException("Request mutation count=" + responseRegionActionResultCount + + throw new IllegalStateException("Request mutation count=" + requestRegionActionCount + " does not match response mutation result count=" + responseRegionActionResultCount); } @@ -125,21 +125,27 @@ public final class ResponseConverter { responseValue = ProtobufUtil.toException(roe.getException()); } else if (roe.hasResult()) { responseValue = ProtobufUtil.toResult(roe.getResult(), cells); - // add the load stats, if we got any - if (roe.hasLoadStats()) { - ((Result) responseValue).addResults(roe.getLoadStats()); - } } else if (roe.hasServiceResult()) { responseValue = roe.getServiceResult(); - } else { - // no result & no exception. Unexpected. - throw new IllegalStateException("No result & no exception roe=" + roe + - " for region " + actions.getRegion()); + } else{ + // Sometimes, the response is just "it was processed". Generally, this occurs for things + // like mutateRows where either we get back 'processed' (or not) and optionally some + // statistics about the regions we touched. + responseValue = response.getProcessed() ? + ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : + ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE; } results.add(regionName, roe.getIndex(), responseValue); } } + if (response.hasRegionStatistics()) { + ClientProtos.MultiRegionLoadStats stats = response.getRegionStatistics(); + for (int i = 0; i < stats.getRegionCount(); i++) { + results.addStatistic(stats.getRegion(i).getValue().toByteArray(), stats.getStat(i)); + } + } + return results; } @@ -161,11 +167,9 @@ public final class ResponseConverter { * @param r * @return an action result builder */ - public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r, - ClientProtos.RegionLoadStats stats) { + public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) { ResultOrException.Builder builder = ResultOrException.newBuilder(); if (r != null) builder.setResult(r); - if(stats != null) builder.setLoadStats(stats); return builder; } http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 645cc42..1003d24 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -185,10 +185,12 @@ public class TestAsyncProcess { } @Override - protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { + protected RpcRetryingCaller<MultiResponse> createCaller( + PayloadCarryingServerCallable callable) { callsCt.incrementAndGet(); + MultiServerCallable callable1 = (MultiServerCallable) callable; final MultiResponse mr = createMultiResponse( - callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { + callable1.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { @Override public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) { if (Arrays.equals(FAILS, a.getAction().getRow())) { @@ -227,7 +229,8 @@ public class TestAsyncProcess { } @Override - public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout) + public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, + int callTimeout) throws IOException, RuntimeException { throw e; } @@ -245,7 +248,8 @@ public class TestAsyncProcess { } @Override - protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { + protected RpcRetryingCaller<MultiResponse> createCaller( + PayloadCarryingServerCallable callable) { callsCt.incrementAndGet(); return new CallerWithFailure(ioe); } @@ -282,7 +286,8 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller<MultiResponse> createCaller( - MultiServerCallable<Row> callable) { + PayloadCarryingServerCallable payloadCallable) { + MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable; final MultiResponse mr = createMultiResponse( callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { @Override @@ -312,7 +317,8 @@ public class TestAsyncProcess { return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) { @Override - public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout) + public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, + int callTimeout) throws IOException, RuntimeException { long sleep = -1; if (isDefault) {
