Apache9 commented on a change in pull request #2630:
URL: https://github.com/apache/hbase/pull/2630#discussion_r520340151
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
##########
@@ -3657,9 +3657,10 @@ public static CheckAndMutate
toCheckAndMutate(ClientProtos.Condition condition,
return builder.build((Put) m);
} else if (m instanceof Delete) {
return builder.build((Delete) m);
+ } else if (m instanceof Increment) {
+ return builder.build((Increment) m);
} else {
- throw new DoNotRetryIOException("Unsupported mutate type: " +
mutations.get(0)
- .getClass().getSimpleName().toUpperCase());
+ return builder.build((Append) m);
Review comment:
Let's also test for Append and keep the last else branch throwing
DoNotRetryIOException?
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
##########
@@ -60,7 +53,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public final class CheckAndMutate extends Mutation {
+public final class CheckAndMutate implements Row {
Review comment:
Why this change?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3900,33 +3903,51 @@ public void
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
Bytes.toBytes(timestamp));
miniBatchOp.incrementNumOfDeletes();
} else if (mutation instanceof Increment || mutation instanceof
Append) {
+ boolean returnResults;
+ if (mutation instanceof Increment) {
+ returnResults = ((Increment) mutation).isReturnResults();
+ } else {
+ returnResults = ((Append) mutation).isReturnResults();
+ }
+
// For nonce operations
canProceed[index] = startNonceOperation(nonceGroup, nonce);
if (!canProceed[index]) {
- // convert duplicate increment/append to get
- List<Cell> results = region.get(toGet(mutation), false,
nonceGroup, nonce);
- retCodeDetails[index] = new
OperationStatus(OperationStatusCode.SUCCESS,
- Result.create(results));
+ Result result;
+ if (returnResults) {
Review comment:
This is an optimization?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -972,11 +1003,41 @@ private void doBatchOp(final RegionActionResult.Builder
builder, final HRegion r
OperationStatus[] codes = region.batchMutate(mArray, atomic,
HConstants.NO_NONCE,
HConstants.NO_NONCE);
+ if (atomic) {
+ LinkedList<ResultOrException> resultOrExceptions = new LinkedList<>();
+ List<Result> results = new ArrayList<>();
+ for (i = 0; i < codes.length; i++) {
+ if (codes[i].getResult() != null) {
+ results.add(codes[i].getResult());
+ }
+ if (i != 0) {
+ resultOrExceptions.add(getResultOrException(
+ ClientProtos.Result.getDefaultInstance(), i));
+ }
+ }
+
+ if (results.isEmpty()) {
+ resultOrExceptions.addFirst(getResultOrException(
+ ClientProtos.Result.getDefaultInstance(), 0));
+ } else {
+ // Set the result of the Increment/Append operations to the first
element of the
+ // ResultOrException list
+ Result result = Result.create(results.stream()
+ .filter(r -> r.rawCells() != null)
+ .flatMap(r -> Arrays.stream(r.rawCells()))
+ .collect(Collectors.toList()));
+
resultOrExceptions.addFirst(getResultOrException(ProtobufUtil.toResult(result),
0));
+ }
+
+ builder.addAllResultOrException(resultOrExceptions);
+ return;
+ }
+
for (i = 0; i < codes.length; i++) {
Mutation currentMutation = mArray[i];
ClientProtos.Action currentAction =
mutationActionMap.get(currentMutation);
- int index = currentAction.hasIndex() || !atomic ?
currentAction.getIndex() : i;
- Exception e = null;
+ int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
Review comment:
Why remove atomic?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -8303,11 +8289,23 @@ void metricsUpdateForGet(List<Cell> results, long
before) {
}
@Override
- public void mutateRow(RowMutations rm) throws IOException {
- // Don't need nonces here - RowMutations only supports puts and deletes
+ public Result mutateRow(RowMutations rm) throws IOException {
final List<Mutation> m = rm.getMutations();
- batchMutate(m.toArray(new Mutation[m.size()]), true, HConstants.NO_NONCE,
- HConstants.NO_NONCE);
+ OperationStatus[] operationStatuses = batchMutate(m.toArray(new
Mutation[0]), true,
+ HConstants.NO_NONCE, HConstants.NO_NONCE);
+
+ List<Result> results = Arrays.stream(operationStatuses)
Review comment:
At least for JDK8, stream is slow so let's avoid stream on critical
read/write path as much as possible for now.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -620,8 +622,22 @@ private CheckAndMutateResult checkAndMutate(HRegion
region, List<ClientProtos.Ac
spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
mutations.add(del);
break;
+ case INCREMENT:
+ Increment increment =
ProtobufUtil.toIncrement(action.getMutation(), cellScanner);
+ ++countOfCompleteMutation;
+ checkCellSizeLimit(region, increment);
+
spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
+ mutations.add(increment);
+ break;
+ case APPEND:
+ Append append = ProtobufUtil.toAppend(action.getMutation(),
cellScanner);
+ ++countOfCompleteMutation;
+ checkCellSizeLimit(region, append);
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
+ mutations.add(append);
+ break;
default:
- throw new DoNotRetryIOException("Atomic put and/or delete only,
not " + type.name());
+ throw new AssertionError("invalid mutation type : " + type);
Review comment:
Why change to AssertionError?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]