HBASE-14703 HTable.mutateRow does not collect stats (Heng Chen)
Conflicts:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ee33bf0c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ee33bf0c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ee33bf0c
Branch: refs/heads/branch-1
Commit: ee33bf0c7d539a63e2bf84c888ff8cf8bb57b7f6
Parents: 9fa44a8
Author: Jesse Yates <[email protected]>
Authored: Fri Mar 4 19:07:59 2016 -0800
Committer: chenheng <[email protected]>
Committed: Thu Mar 24 11:27:54 2016 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 110 +-
.../org/apache/hadoop/hbase/client/HTable.java | 154 +-
.../hadoop/hbase/client/MetricsConnection.java | 10 +-
.../hadoop/hbase/client/MultiResponse.java | 57 +-
.../hbase/client/MultiServerCallable.java | 17 +-
.../client/PayloadCarryingServerCallable.java | 48 +
.../hadoop/hbase/client/ResultStatsUtil.java | 12 +-
.../hbase/client/RetryingTimeTracker.java | 57 +
.../hbase/client/RpcRetryingCallerFactory.java | 7 -
.../hbase/client/ServerStatisticTracker.java | 3 +-
.../hadoop/hbase/client/StatisticTrackable.java | 33 +
.../client/StatsTrackingRpcRetryingCaller.java | 78 -
.../hadoop/hbase/protobuf/ProtobufUtil.java | 4 +-
.../hbase/protobuf/ResponseConverter.java | 28 +-
.../hadoop/hbase/client/TestAsyncProcess.java | 18 +-
.../hbase/protobuf/generated/ClientProtos.java | 1451 +++++++++++++++++-
hbase-protocol/src/main/protobuf/Client.proto | 8 +-
.../hadoop/hbase/regionserver/HRegion.java | 4 +-
.../hbase/regionserver/RSRpcServices.java | 68 +-
.../hadoop/hbase/client/TestCheckAndMutate.java | 9 +-
.../hadoop/hbase/client/TestClientPushback.java | 30 +
.../hadoop/hbase/client/TestFromClientSide.java | 8 +-
.../hadoop/hbase/client/TestReplicasClient.java | 5 +-
23 files changed, 1882 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 9a67990..86d2eae 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -55,6 +55,7 @@ import
org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Trace;
@@ -427,7 +428,7 @@ class AsyncProcess {
if (retainedActions.isEmpty()) return NO_REQS_RESULT;
return submitMultiActions(tableName, retainedActions, nonceGroup,
callback, null, needResults,
- locationErrors, locationErrorRows, actionsByServer, pool);
+ locationErrors, locationErrorRows, actionsByServer, pool);
}
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
@@ -443,7 +444,7 @@ class AsyncProcess {
int originalIndex = locationErrorRows.get(i);
Row row = retainedActions.get(originalIndex).getAction();
ars.manageError(originalIndex, row,
- Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
+ Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
}
}
ars.sendMultiAction(actionsByServer, 1, null, false);
@@ -545,9 +546,13 @@ class AsyncProcess {
*/
public <CResult> AsyncRequestFuture submitAll(TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[]
results) {
- return submitAll(null, tableName, rows, callback, results);
+ return submitAll(null, tableName, rows, callback, results, null, timeout);
}
+ public <CResult> AsyncRequestFuture submitAll(ExecutorService pool,
TableName tableName,
+ List<? extends Row> rows, Batch.Callback<CResult> callback, Object[]
results) {
+ return submitAll(pool, tableName, rows, callback, results, null, timeout);
+ }
/**
* Submit immediately the list of rows, whatever the server status. Kept for
backward
* compatibility: it allows to be used with the batch interface that return
an array of objects.
@@ -559,7 +564,8 @@ class AsyncProcess {
* @param results Optional array to return the results thru; backward compat.
*/
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool,
TableName tableName,
- List<? extends Row> rows, Batch.Callback<CResult> callback, Object[]
results) {
+ List<? extends Row> rows, Batch.Callback<CResult> callback, Object[]
results,
+ PayloadCarryingServerCallable callable, int curTimeout) {
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
// The position will be used by the processBatch to match the object array
returned.
@@ -578,7 +584,8 @@ class AsyncProcess {
actions.add(action);
}
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
- tableName, actions, ng.getNonceGroup(), getPool(pool), callback,
results, results != null);
+ tableName, actions, ng.getNonceGroup(), getPool(pool), callback,
results, results != null,
+ callable, curTimeout);
ars.groupAndSendMultiAction(actions, 1);
return ars;
}
@@ -710,11 +717,11 @@ class AsyncProcess {
private final MultiAction<Row> multiAction;
private final int numAttempt;
private final ServerName server;
- private final Set<MultiServerCallable<Row>> callsInProgress;
+ private final Set<PayloadCarryingServerCallable> callsInProgress;
private SingleServerRequestRunnable(
MultiAction<Row> multiAction, int numAttempt, ServerName server,
- Set<MultiServerCallable<Row>> callsInProgress) {
+ Set<PayloadCarryingServerCallable> callsInProgress) {
this.multiAction = multiAction;
this.numAttempt = numAttempt;
this.server = server;
@@ -724,19 +731,22 @@ class AsyncProcess {
@Override
public void run() {
MultiResponse res;
- MultiServerCallable<Row> callable = null;
+ PayloadCarryingServerCallable callable = currentCallable;
try {
- callable = createCallable(server, tableName, multiAction);
+ // setup the callable based on the actions, if we don't have one
already from the request
+ if (callable == null) {
+ callable = createCallable(server, tableName, multiAction);
+ }
+ RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
try {
- RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
- if (callsInProgress != null) callsInProgress.add(callable);
- res = caller.callWithoutRetries(callable, timeout);
-
+ if (callsInProgress != null) {
+ callsInProgress.add(callable);
+ }
+ res = caller.callWithoutRetries(callable, currentCallTotalTimeout);
if (res == null) {
// Cancelled
return;
}
-
} catch (IOException e) {
// The service itself failed . It may be an error coming from the
communication
// layer, but, as well, a functional error raised by the server.
@@ -770,7 +780,7 @@ class AsyncProcess {
private final BatchErrors errors;
private final ConnectionManager.ServerErrorTracker errorsByServer;
private final ExecutorService pool;
- private final Set<MultiServerCallable<Row>> callsInProgress;
+ private final Set<PayloadCarryingServerCallable> callsInProgress;
private final TableName tableName;
@@ -797,10 +807,12 @@ class AsyncProcess {
private final int[] replicaGetIndices;
private final boolean hasAnyReplicaGets;
private final long nonceGroup;
+ private PayloadCarryingServerCallable currentCallable;
+ private int currentCallTotalTimeout;
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>>
actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results,
- Batch.Callback<CResult> callback) {
+ Batch.Callback<CResult> callback, PayloadCarryingServerCallable
callable, int timeout) {
this.pool = pool;
this.callback = callback;
this.nonceGroup = nonceGroup;
@@ -864,13 +876,16 @@ class AsyncProcess {
this.replicaGetIndices = null;
}
this.callsInProgress = !hasAnyReplicaGets ? null :
- Collections.newSetFromMap(new
ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
+ Collections.newSetFromMap(
+ new ConcurrentHashMap<PayloadCarryingServerCallable, Boolean>());
this.errorsByServer = createServerErrorTracker();
this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
+ this.currentCallable = callable;
+ this.currentCallTotalTimeout = timeout;
}
- public Set<MultiServerCallable<Row>> getCallsInProgress() {
+ public Set<PayloadCarryingServerCallable> getCallsInProgress() {
return callsInProgress;
}
@@ -1275,11 +1290,15 @@ class AsyncProcess {
int failureCount = 0;
boolean canRetry = true;
- // Go by original action.
+ Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
+ updateStats(server, results);
+
int failed = 0, stopped = 0;
+ // Go by original action.
for (Map.Entry<byte[], List<Action<Row>>> regionEntry :
multiAction.actions.entrySet()) {
byte[] regionName = regionEntry.getKey();
- Map<Integer, Object> regionResults =
responses.getResults().get(regionName);
+ Map<Integer, Object> regionResults = results.get(regionName) == null
+ ? null : results.get(regionName).result;
if (regionResults == null) {
if (!responses.getExceptions().containsKey(regionName)) {
LOG.error("Server sent us neither results nor exceptions for "
@@ -1308,7 +1327,7 @@ class AsyncProcess {
}
++failureCount;
Retry retry = manageError(sentAction.getOriginalIndex(), row,
- canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED,
(Throwable)result, server);
+ canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)
result, server);
if (retry == Retry.YES) {
toReplay.add(sentAction);
} else if (retry == Retry.NO_OTHER_SUCCEEDED) {
@@ -1317,24 +1336,11 @@ class AsyncProcess {
++failed;
}
} else {
-
- if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
- AsyncProcess.this.connection.getConnectionMetrics().
- updateServerStats(server, regionName, result);
- }
-
- // update the stats about the region, if its a user table. We
don't want to slow down
- // updates to meta tables, especially from internal updates
(master, etc).
- if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
- result = ResultStatsUtil.updateStats(result,
- AsyncProcess.this.connection.getStatisticsTracker(), server,
regionName);
- }
-
if (callback != null) {
try {
//noinspection unchecked
// TODO: would callback expect a replica region name if it
gets one?
- this.callback.update(regionName,
sentAction.getAction().getRow(), (CResult)result);
+ this.callback.update(regionName,
sentAction.getAction().getRow(), (CResult) result);
} catch (Throwable t) {
LOG.error("User callback threw an exception for "
+ Bytes.toStringBinary(regionName) + ", ignoring", t);
@@ -1384,7 +1390,6 @@ class AsyncProcess {
}
}
}
-
if (toReplay.isEmpty()) {
logNoResubmit(server, numAttempt, failureCount, throwable, failed,
stopped);
} else {
@@ -1620,7 +1625,7 @@ class AsyncProcess {
throw new InterruptedIOException(iex.getMessage());
} finally {
if (callsInProgress != null) {
- for (MultiServerCallable<Row> clb : callsInProgress) {
+ for (PayloadCarryingServerCallable clb : callsInProgress) {
clb.cancel();
}
}
@@ -1677,13 +1682,38 @@ class AsyncProcess {
}
}
+ private void updateStats(ServerName server, Map<byte[],
MultiResponse.RegionResult> results) {
+ boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() !=
null;
+ boolean stats = AsyncProcess.this.connection.getStatisticsTracker() !=
null;
+ if (!stats && !metrics) {
+ return;
+ }
+ for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats :
results.entrySet()) {
+ byte[] regionName = regionStats.getKey();
+ ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
+
ResultStatsUtil.updateStats(AsyncProcess.this.connection.getStatisticsTracker(),
server,
+ regionName, stat);
+
ResultStatsUtil.updateStats(AsyncProcess.this.connection.getConnectionMetrics(),
+ server, regionName, stat);
+ }
+ }
+
+ protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
+ TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool,
+ Batch.Callback<CResult> callback, Object[] results, boolean needResults,
+ PayloadCarryingServerCallable callable, int curTimeout) {
+ return new AsyncRequestFutureImpl<CResult>(
+ tableName, actions, nonceGroup, getPool(pool), needResults,
+ results, callback, callable, curTimeout);
+ }
+
@VisibleForTesting
/** Create AsyncRequestFuture. Isolated to be easily overridden in the
tests. */
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults)
{
- return new AsyncRequestFutureImpl<CResult>(
- tableName, actions, nonceGroup, getPool(pool), needResults, results,
callback);
+ return createAsyncRequestFuture(
+ tableName, actions, nonceGroup, pool, callback, results, needResults,
null, timeout);
}
/**
@@ -1699,7 +1729,7 @@ class AsyncProcess {
* Create a caller. Isolated to be easily overridden in the tests.
*/
@VisibleForTesting
- protected RpcRetryingCaller<MultiResponse>
createCaller(MultiServerCallable<Row> callable) {
+ protected RpcRetryingCaller<MultiResponse>
createCaller(PayloadCarryingServerCallable callable) {
return rpcCallerFactory.<MultiResponse> newCaller();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 24dc06a..ec28c5a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -41,6 +41,7 @@ import
org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -48,7 +49,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
@@ -60,14 +60,13 @@ import
org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
-import
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
-import
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -571,7 +570,8 @@ public class HTable implements HTableInterface,
RegionLocator {
*/
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
- HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName,
connection, rpcCallerFactory, operationTimeout);
+ HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection,
+ rpcCallerFactory, operationTimeout);
if (htd != null) {
return new UnmodifyableHTableDescriptor(htd);
}
@@ -872,10 +872,10 @@ public class HTable implements HTableInterface,
RegionLocator {
// Call that takes into account the replica
RpcRetryingCallerWithReadReplicas callable = new
RpcRetryingCallerWithReadReplicas(
- rpcControllerFactory, tableName, this.connection, get, pool,
- tableConfiguration.getRetriesNumber(),
- operationTimeout,
- tableConfiguration.getPrimaryCallTimeoutMicroSecond());
+ rpcControllerFactory, tableName, this.connection, get, pool,
+ tableConfiguration.getRetriesNumber(),
+ operationTimeout,
+ tableConfiguration.getPrimaryCallTimeoutMicroSecond());
return callable.call();
}
@@ -1039,35 +1039,47 @@ public class HTable implements HTableInterface,
RegionLocator {
*/
@Override
public void mutateRow(final RowMutations rm) throws IOException {
- RegionServerCallable<Void> callable =
- new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
- @Override
- public Void call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller =
rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- RegionAction.Builder regionMutationBuilder =
RequestConverter.buildRegionAction(
- getLocation().getRegionInfo().getRegionName(), rm);
- regionMutationBuilder.setAtomic(true);
- MultiRequest request =
-
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
- ClientProtos.MultiResponse response = getStub().multi(controller,
request);
- ClientProtos.RegionActionResult res =
response.getRegionActionResultList().get(0);
- if (res.hasException()) {
- Throwable ex = ProtobufUtil.toException(res.getException());
- if(ex instanceof IOException) {
- throw (IOException)ex;
+ final RetryingTimeTracker tracker = new RetryingTimeTracker();
+ PayloadCarryingServerCallable<MultiResponse> callable =
+ new PayloadCarryingServerCallable<MultiResponse>(connection, getName(),
rm.getRow(),
+ rpcControllerFactory) {
+ @Override
+ public MultiResponse call(int callTimeout) throws IOException {
+ tracker.start();
+ controller.setPriority(tableName);
+ int remainingTime = tracker.getRemainingTime(callTimeout);
+ if (remainingTime == 0) {
+ throw new DoNotRetryIOException("Timeout for mutate row");
+ }
+ controller.setCallTimeout(remainingTime);
+ try {
+ RegionAction.Builder regionMutationBuilder =
RequestConverter.buildRegionAction(
+ getLocation().getRegionInfo().getRegionName(), rm);
+ regionMutationBuilder.setAtomic(true);
+ MultiRequest request =
+
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+ ClientProtos.MultiResponse response = getStub().multi(controller,
request);
+ ClientProtos.RegionActionResult res =
response.getRegionActionResultList().get(0);
+ if (res.hasException()) {
+ Throwable ex = ProtobufUtil.toException(res.getException());
+ if (ex instanceof IOException) {
+ throw (IOException) ex;
+ }
+ throw new IOException("Failed to mutate row: " +
+ Bytes.toStringBinary(rm.getRow()), ex);
}
- throw new IOException("Failed to mutate row:
"+Bytes.toStringBinary(rm.getRow()), ex);
+ return ResponseConverter.getResults(request, response,
controller.cellScanner());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
}
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
}
- return null;
- }
- };
- rpcCallerFactory.<Void> newCaller().callWithRetries(callable,
this.operationTimeout);
+ };
+ AsyncRequestFuture ars = multiAp.submitAll(pool, tableName,
rm.getMutations(),
+ null, null, callable, operationTimeout);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ throw ars.getErrors();
+ }
}
/**
@@ -1327,37 +1339,55 @@ public class HTable implements HTableInterface,
RegionLocator {
*/
@Override
public boolean checkAndMutate(final byte [] row, final byte [] family, final
byte [] qualifier,
- final CompareOp compareOp, final byte [] value, final RowMutations rm)
- throws IOException {
- RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller =
rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- CompareType compareType = CompareType.valueOf(compareOp.name());
- MultiRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family,
qualifier,
- new BinaryComparator(value), compareType, rm);
- ClientProtos.MultiResponse response =
getStub().multi(controller, request);
- ClientProtos.RegionActionResult res =
response.getRegionActionResultList().get(0);
- if (res.hasException()) {
- Throwable ex = ProtobufUtil.toException(res.getException());
- if(ex instanceof IOException) {
- throw (IOException)ex;
- }
- throw new IOException("Failed to checkAndMutate row: "+
- Bytes.toStringBinary(rm.getRow()), ex);
+ final CompareOp compareOp, final byte [] value, final RowMutations rm)
+ throws IOException {
+ final RetryingTimeTracker tracker = new RetryingTimeTracker();
+ PayloadCarryingServerCallable<MultiResponse> callable =
+ new PayloadCarryingServerCallable<MultiResponse>(connection, getName(),
rm.getRow(),
+ rpcControllerFactory) {
+ @Override
+ public MultiResponse call(int callTimeout) throws IOException {
+ tracker.start();
+ controller.setPriority(tableName);
+ int remainingTime = tracker.getRemainingTime(callTimeout);
+ if (remainingTime == 0) {
+ throw new DoNotRetryIOException("Timeout for mutate row");
+ }
+ controller.setCallTimeout(remainingTime);
+ try {
+ CompareType compareType = CompareType.valueOf(compareOp.name());
+ MultiRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family,
qualifier,
+ new BinaryComparator(value), compareType, rm);
+ ClientProtos.MultiResponse response = getStub().multi(controller,
request);
+ ClientProtos.RegionActionResult res =
response.getRegionActionResultList().get(0);
+ if (res.hasException()) {
+ Throwable ex = ProtobufUtil.toException(res.getException());
+ if(ex instanceof IOException) {
+ throw (IOException)ex;
}
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ throw new IOException("Failed to checkAndMutate row: "+
+ Bytes.toStringBinary(rm.getRow()), ex);
}
+ return ResponseConverter.getResults(request, response,
controller.cellScanner());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
}
- };
- return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable,
this.operationTimeout);
+ }
+ };
+ /**
+ * Currently, we use one array to store 'processed' flag which is
returned by server.
+ * It is excessive to send such a large array, but that is required by
the framework right now
+ * */
+ Object[] results = new Object[rm.getMutations().size()];
+ AsyncRequestFuture ars = multiAp.submitAll(pool, tableName,
rm.getMutations(),
+ null, results, callable, operationTimeout);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ throw ars.getErrors();
+ }
+
+ return ((Result)results[0]).getExists();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index b6efdb9..c2ce6ff 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit;
* {@link #shutdown()} to terminate the thread pools they allocate.
*/
@InterfaceAudience.Private
-public class MetricsConnection {
+public class MetricsConnection implements StatisticTrackable {
/** Set this key to {@code true} to enable metrics collection of client
requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY =
"hbase.client.metrics.enable";
@@ -192,9 +192,15 @@ public class MetricsConnection {
}
Result result = (Result) r;
ClientProtos.RegionLoadStats stats = result.getStats();
- if(stats == null){
+ if (stats == null) {
return;
}
+ updateRegionStats(serverName, regionName, stats);
+ }
+
+ @Override
+ public void updateRegionStats(ServerName serverName, byte[] regionName,
+ ClientProtos.RegionLoadStats stats) {
String name = serverName.getServerName() + "," +
Bytes.toStringBinary(regionName);
ConcurrentMap<byte[], RegionStats> rsStats = null;
if (serverStats.containsKey(serverName)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
index 089ccff..79a9ed3 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -33,8 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
public class MultiResponse {
// map of regionName to map of Results by the original index for that Result
- private Map<byte[], Map<Integer, Object>> results =
- new TreeMap<byte[], Map<Integer, Object>>(Bytes.BYTES_COMPARATOR);
+ private Map<byte[], RegionResult> results = new
TreeMap<>(Bytes.BYTES_COMPARATOR);
/**
* The server can send us a failure for the region itself, instead of
individual failure.
@@ -52,8 +52,8 @@ public class MultiResponse {
*/
public int size() {
int size = 0;
- for (Map<?,?> c : results.values()) {
- size += c.size();
+ for (RegionResult result: results.values()) {
+ size += result.size();
}
return size;
}
@@ -66,16 +66,7 @@ public class MultiResponse {
* @param resOrEx the result or error; will be empty for successful Put and
Delete actions.
*/
public void add(byte[] regionName, int originalIndex, Object resOrEx) {
- Map<Integer, Object> rs = results.get(regionName);
- if (rs == null) {
- rs = new HashMap<Integer, Object>();
- results.put(regionName, rs);
- }
- rs.put(originalIndex, resOrEx);
- }
-
- public Map<byte[], Map<Integer, Object>> getResults() {
- return results;
+ getResult(regionName).addResult(originalIndex, resOrEx);
}
public void addException(byte []regionName, Throwable ie){
@@ -92,4 +83,42 @@ public class MultiResponse {
public Map<byte[], Throwable> getExceptions() {
return exceptions;
}
+
+ public void addStatistic(byte[] regionName, ClientProtos.RegionLoadStats
stat) {
+ getResult(regionName).setStat(stat);
+ }
+
+ private RegionResult getResult(byte[] region){
+ RegionResult rs = results.get(region);
+ if (rs == null) {
+ rs = new RegionResult();
+ results.put(region, rs);
+ }
+ return rs;
+ }
+
+ public Map<byte[], RegionResult> getResults(){
+ return this.results;
+ }
+
+ static class RegionResult{
+ Map<Integer, Object> result = new HashMap<>();
+ ClientProtos.RegionLoadStats stat;
+
+ public void addResult(int index, Object result){
+ this.result.put(index, result);
+ }
+
+ public void setStat(ClientProtos.RegionLoadStats stat){
+ this.stat = stat;
+ }
+
+ public int size() {
+ return this.result.size();
+ }
+
+ public ClientProtos.RegionLoadStats getStat() {
+ return this.stat;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index f02d14d..d0b4c81 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -50,21 +49,19 @@ import com.google.protobuf.ServiceException;
* {@link RegionServerCallable} that goes against multiple regions.
* @param <R>
*/
-class MultiServerCallable<R> extends RegionServerCallable<MultiResponse>
implements Cancellable {
+class MultiServerCallable<R> extends
PayloadCarryingServerCallable<MultiResponse> {
private final MultiAction<R> multiAction;
private final boolean cellBlock;
- private final PayloadCarryingRpcController controller;
MultiServerCallable(final ClusterConnection connection, final TableName
tableName,
final ServerName location, RpcControllerFactory rpcFactory, final
MultiAction<R> multi) {
- super(connection, tableName, null);
+ super(connection, tableName, null, rpcFactory);
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a
multi-region request.
// Using region info from parent HRegionLocation would be a mistake for
this class; so
// we will store the server here, and throw if someone tries to obtain
location/regioninfo.
this.location = new HRegionLocation(null, location);
this.cellBlock = isCellBlock();
- controller = rpcFactory.newController();
}
@Override
@@ -133,16 +130,6 @@ class MultiServerCallable<R> extends
RegionServerCallable<MultiResponse> impleme
return ResponseConverter.getResults(requestProto, responseProto,
controller.cellScanner());
}
- @Override
- public void cancel() {
- controller.startCancel();
- }
-
- @Override
- public boolean isCancelled() {
- return controller.isCanceled();
- }
-
/**
* @return True if we should send data in cellblocks. This is an expensive
call. Cache the
* result if you can rather than call each time.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
new file mode 100644
index 0000000..d94f069
--- /dev/null
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * This class is used to unify HTable calls with AsyncProcess Framework.
+ * HTable can use AsyncProcess directly though this class.
+ */
[email protected]
+public abstract class PayloadCarryingServerCallable<T>
+ extends RegionServerCallable<T> implements Cancellable {
+ protected PayloadCarryingRpcController controller;
+
+ public PayloadCarryingServerCallable(Connection connection, TableName
tableName, byte[] row,
+ RpcControllerFactory rpcControllerFactory) {
+ super(connection, tableName, row);
+ this.controller = rpcControllerFactory.newController();
+ }
+
+ @Override
+ public void cancel() {
+ controller.startCancel();
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return controller.isCanceled();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
index 3caa63e..6537d79 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
@@ -55,13 +55,17 @@ public final class ResultStatsUtil {
return r;
}
- if (regionName != null) {
- serverStats.updateRegionStats(server, regionName, stats);
- }
-
+ updateStats(serverStats, server, regionName, stats);
return r;
}
+ public static void updateStats(StatisticTrackable tracker, ServerName
server, byte[] regionName,
+ ClientProtos.RegionLoadStats stats) {
+ if (regionName != null && stats != null && tracker != null) {
+ tracker.updateRegionStats(server, regionName, stats);
+ }
+ }
+
public static <T> T updateStats(T r, ServerStatisticTracker stats,
HRegionLocation regionLocation) {
byte[] regionName = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
new file mode 100644
index 0000000..24288e6
--- /dev/null
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Tracks the amount of time remaining for an operation.
+ */
+class RetryingTimeTracker {
+
+ private long globalStartTime = -1;
+
+ public void start() {
+ if (this.globalStartTime < 0) {
+ this.globalStartTime = EnvironmentEdgeManager.currentTime();
+ }
+ }
+
+ public int getRemainingTime(int callTimeout) {
+ if (callTimeout <= 0) {
+ return 0;
+ } else {
+ if (callTimeout == Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ int remainingTime = (int) (
+ callTimeout -
+ (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
+ if (remainingTime < 1) {
+ // If there is no time left, we're trying anyway. It's too late.
+ // 0 means no timeout, and it's not the intent here. So we secure both
cases by
+ // resetting to the minimum.
+ remainingTime = 1;
+ }
+ return remainingTime;
+ }
+ }
+
+ public long getStartTime() {
+ return this.globalStartTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 1bf7bb0..dac6bed 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -67,13 +67,6 @@ public class RpcRetryingCallerFactory {
// is cheap as it does not require parsing a complex structure.
RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries,
interceptor,
startLogErrorsCnt);
-
- // wrap it with stats, if we are tracking them
- if (enableBackPressure && this.stats != null) {
- caller = new StatsTrackingRpcRetryingCaller<T>(pause, retries,
interceptor,
- startLogErrorsCnt, stats);
- }
-
return caller;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
index 42da0b3..de9da1b 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
@@ -32,11 +32,12 @@ import java.util.concurrent.ConcurrentHashMap;
* Tracks the statistics for multiple regions
*/
@InterfaceAudience.Private
-public class ServerStatisticTracker {
+public class ServerStatisticTracker implements StatisticTrackable {
private final ConcurrentHashMap<ServerName, ServerStatistics> stats =
new ConcurrentHashMap<ServerName, ServerStatistics>();
+ @Override
public void updateRegionStats(ServerName server, byte[] region,
ClientProtos.RegionLoadStats
currentStats) {
ServerStatistics stat = stats.get(server);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java
new file mode 100644
index 0000000..7bb49e7
--- /dev/null
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+/**
+ * Parent interface for an object to get updates about per-region statistics.
+ */
[email protected]
+public interface StatisticTrackable {
+ /**
+ * Update stats per region.
+ * */
+ void updateRegionStats(ServerName server, byte[] region,
ClientProtos.RegionLoadStats
+ stats);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
deleted file mode 100644
index fc175bb..0000000
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-import java.io.IOException;
-
-/**
- * An {@link RpcRetryingCaller} that will update the per-region stats for the
call on return,
- * if stats are available
- */
[email protected]
-public class StatsTrackingRpcRetryingCaller<T> extends RpcRetryingCaller<T> {
- private final ServerStatisticTracker stats;
-
- public StatsTrackingRpcRetryingCaller(long pause, int retries, int
startLogErrorsCnt,
- ServerStatisticTracker stats) {
- super(pause, retries, startLogErrorsCnt);
- this.stats = stats;
- }
-
- public StatsTrackingRpcRetryingCaller(long pause, int retries,
- RetryingCallerInterceptor interceptor, int startLogErrorsCnt,
- ServerStatisticTracker stats) {
- super(pause, retries, interceptor, startLogErrorsCnt);
- this.stats = stats;
- }
-
- @Override
- public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
- throws IOException, RuntimeException {
- T result = super.callWithRetries(callable, callTimeout);
- return updateStatsAndUnwrap(result, callable);
- }
-
- @Override
- public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
- throws IOException, RuntimeException {
- T result = super.callWithRetries(callable, callTimeout);
- return updateStatsAndUnwrap(result, callable);
- }
-
- private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) {
- // don't track stats about requests that aren't to regionservers
- if (!(callable instanceof RegionServerCallable)) {
- return result;
- }
-
- // mutli-server callables span multiple regions, so they don't have a
location,
- // but they are region server callables, so we have to handle them when we
process the
- // result in AsyncProcess#receiveMultiAction, not in here
- if (callable instanceof MultiServerCallable) {
- return result;
- }
-
- // update the stats for the single server callable
- RegionServerCallable<T> regionCallable = (RegionServerCallable) callable;
- HRegionLocation location = regionCallable.getLocation();
- return ResultStatsUtil.updateStats(result, stats, location);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 4bc128b..2fbe27b 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -194,8 +194,8 @@ public final class ProtobufUtil {
*/
private final static Cell[] EMPTY_CELL_ARRAY = new Cell[]{};
private final static Result EMPTY_RESULT = Result.create(EMPTY_CELL_ARRAY);
- private final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null,
true);
- private final static Result EMPTY_RESULT_EXISTS_FALSE = Result.create(null,
false);
+ final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true);
+ final static Result EMPTY_RESULT_EXISTS_FALSE = Result.create(null, false);
private final static Result EMPTY_RESULT_STALE =
Result.create(EMPTY_CELL_ARRAY, null, true);
private final static Result EMPTY_RESULT_EXISTS_TRUE_STALE
= Result.create((Cell[])null, true, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 61bcd68..0c7cbd7 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -89,7 +89,7 @@ public final class ResponseConverter {
int requestRegionActionCount = request.getRegionActionCount();
int responseRegionActionResultCount =
response.getRegionActionResultCount();
if (requestRegionActionCount != responseRegionActionResultCount) {
- throw new IllegalStateException("Request mutation count=" +
responseRegionActionResultCount +
+ throw new IllegalStateException("Request mutation count=" +
requestRegionActionCount +
" does not match response mutation result count=" +
responseRegionActionResultCount);
}
@@ -125,21 +125,27 @@ public final class ResponseConverter {
responseValue = ProtobufUtil.toException(roe.getException());
} else if (roe.hasResult()) {
responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
- // add the load stats, if we got any
- if (roe.hasLoadStats()) {
- ((Result) responseValue).addResults(roe.getLoadStats());
- }
} else if (roe.hasServiceResult()) {
responseValue = roe.getServiceResult();
- } else {
- // no result & no exception. Unexpected.
- throw new IllegalStateException("No result & no exception roe=" +
roe +
- " for region " + actions.getRegion());
+ } else{
+ // Sometimes, the response is just "it was processed". Generally,
this occurs for things
+ // like mutateRows where either we get back 'processed' (or not) and
optionally some
+ // statistics about the regions we touched.
+ responseValue = response.getProcessed() ?
+ ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
+ ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
}
results.add(regionName, roe.getIndex(), responseValue);
}
}
+ if (response.hasRegionStatistics()) {
+ ClientProtos.MultiRegionLoadStats stats = response.getRegionStatistics();
+ for (int i = 0; i < stats.getRegionCount(); i++) {
+ results.addStatistic(stats.getRegion(i).getValue().toByteArray(),
stats.getStat(i));
+ }
+ }
+
return results;
}
@@ -161,11 +167,9 @@ public final class ResponseConverter {
* @param r
* @return an action result builder
*/
- public static ResultOrException.Builder buildActionResult(final
ClientProtos.Result r,
- ClientProtos.RegionLoadStats stats) {
+ public static ResultOrException.Builder buildActionResult(final
ClientProtos.Result r) {
ResultOrException.Builder builder = ResultOrException.newBuilder();
if (r != null) builder.setResult(r);
- if(stats != null) builder.setLoadStats(stats);
return builder;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee33bf0c/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 51427b8..b588ce8 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -182,10 +182,12 @@ public class TestAsyncProcess {
}
@Override
- protected RpcRetryingCaller<MultiResponse>
createCaller(MultiServerCallable<Row> callable) {
+ protected RpcRetryingCaller<MultiResponse> createCaller(
+ PayloadCarryingServerCallable callable) {
callsCt.incrementAndGet();
+ MultiServerCallable callable1 = (MultiServerCallable) callable;
final MultiResponse mr = createMultiResponse(
- callable.getMulti(), nbMultiResponse, nbActions, new
ResponseGenerator() {
+ callable1.getMulti(), nbMultiResponse, nbActions, new
ResponseGenerator() {
@Override
public void addResponse(MultiResponse mr, byte[] regionName,
Action<Row> a) {
if (Arrays.equals(FAILS, a.getAction().getRow())) {
@@ -224,7 +226,8 @@ public class TestAsyncProcess {
}
@Override
- public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse>
callable, int callTimeout)
+ public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse>
callable,
+ int callTimeout)
throws IOException, RuntimeException {
throw e;
}
@@ -242,7 +245,8 @@ public class TestAsyncProcess {
}
@Override
- protected RpcRetryingCaller<MultiResponse>
createCaller(MultiServerCallable<Row> callable) {
+ protected RpcRetryingCaller<MultiResponse> createCaller(
+ PayloadCarryingServerCallable callable) {
callsCt.incrementAndGet();
return new CallerWithFailure(ioe);
}
@@ -279,7 +283,8 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
- MultiServerCallable<Row> callable) {
+ PayloadCarryingServerCallable payloadCallable) {
+ MultiServerCallable<Row> callable = (MultiServerCallable)
payloadCallable;
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions, new
ResponseGenerator() {
@Override
@@ -309,7 +314,8 @@ public class TestAsyncProcess {
return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
@Override
- public MultiResponse
callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
+ public MultiResponse
callWithoutRetries(RetryingCallable<MultiResponse> callable,
+ int callTimeout)
throws IOException, RuntimeException {
long sleep = -1;
if (isDefault) {