Apache9 commented on a change in pull request #2630:
URL: https://github.com/apache/hbase/pull/2630#discussion_r520340151



##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
##########
@@ -3657,9 +3657,10 @@ public static CheckAndMutate 
toCheckAndMutate(ClientProtos.Condition condition,
           return builder.build((Put) m);
         } else if (m instanceof Delete) {
           return builder.build((Delete) m);
+        } else if (m instanceof Increment) {
+          return builder.build((Increment) m);
         } else {
-          throw new DoNotRetryIOException("Unsupported mutate type: " + 
mutations.get(0)
-            .getClass().getSimpleName().toUpperCase());
+          return builder.build((Append) m);

Review comment:
       Let's also test for Append and keep the last else branch throwing 
DoNotRetryIOException?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
##########
@@ -60,7 +53,7 @@
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public final class CheckAndMutate extends Mutation {
+public final class CheckAndMutate implements Row {

Review comment:
       Why this change?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3900,33 +3903,51 @@ public void 
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
             Bytes.toBytes(timestamp));
           miniBatchOp.incrementNumOfDeletes();
         } else if (mutation instanceof Increment || mutation instanceof 
Append) {
+          boolean returnResults;
+          if (mutation instanceof Increment) {
+            returnResults = ((Increment) mutation).isReturnResults();
+          } else {
+            returnResults = ((Append) mutation).isReturnResults();
+          }
+
           // For nonce operations
           canProceed[index] = startNonceOperation(nonceGroup, nonce);
           if (!canProceed[index]) {
-            // convert duplicate increment/append to get
-            List<Cell> results = region.get(toGet(mutation), false, 
nonceGroup, nonce);
-            retCodeDetails[index] = new 
OperationStatus(OperationStatusCode.SUCCESS,
-              Result.create(results));
+            Result result;
+            if (returnResults) {

Review comment:
       This is an optimization?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -972,11 +1003,41 @@ private void doBatchOp(final RegionActionResult.Builder 
builder, final HRegion r
 
       OperationStatus[] codes = region.batchMutate(mArray, atomic, 
HConstants.NO_NONCE,
         HConstants.NO_NONCE);
+      if (atomic) {
+        LinkedList<ResultOrException> resultOrExceptions = new LinkedList<>();
+        List<Result> results = new ArrayList<>();
+        for (i = 0; i < codes.length; i++) {
+          if (codes[i].getResult() != null) {
+            results.add(codes[i].getResult());
+          }
+          if (i != 0) {
+            resultOrExceptions.add(getResultOrException(
+              ClientProtos.Result.getDefaultInstance(), i));
+          }
+        }
+
+        if (results.isEmpty()) {
+          resultOrExceptions.addFirst(getResultOrException(
+            ClientProtos.Result.getDefaultInstance(), 0));
+        } else {
+          // Set the result of the Increment/Append operations to the first 
element of the
+          // ResultOrException list
+          Result result = Result.create(results.stream()
+            .filter(r -> r.rawCells() != null)
+            .flatMap(r -> Arrays.stream(r.rawCells()))
+            .collect(Collectors.toList()));
+          
resultOrExceptions.addFirst(getResultOrException(ProtobufUtil.toResult(result), 
0));
+        }
+
+        builder.addAllResultOrException(resultOrExceptions);
+        return;
+      }
+
       for (i = 0; i < codes.length; i++) {
         Mutation currentMutation = mArray[i];
         ClientProtos.Action currentAction = 
mutationActionMap.get(currentMutation);
-        int index = currentAction.hasIndex() || !atomic ? 
currentAction.getIndex() : i;
-        Exception e = null;
+        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;

Review comment:
       Why remove atomic?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -8303,11 +8289,23 @@ void metricsUpdateForGet(List<Cell> results, long 
before) {
   }
 
   @Override
-  public void mutateRow(RowMutations rm) throws IOException {
-    // Don't need nonces here - RowMutations only supports puts and deletes
+  public Result mutateRow(RowMutations rm) throws IOException {
     final List<Mutation> m = rm.getMutations();
-    batchMutate(m.toArray(new Mutation[m.size()]), true, HConstants.NO_NONCE,
-        HConstants.NO_NONCE);
+    OperationStatus[] operationStatuses = batchMutate(m.toArray(new 
Mutation[0]), true,
+      HConstants.NO_NONCE, HConstants.NO_NONCE);
+
+    List<Result> results = Arrays.stream(operationStatuses)

Review comment:
       At least for JDK8, stream is slow so let's avoid stream on critical 
read/write path as much as possible for now.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -620,8 +622,22 @@ private CheckAndMutateResult checkAndMutate(HRegion 
region, List<ClientProtos.Ac
             spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
             mutations.add(del);
             break;
+          case INCREMENT:
+            Increment increment = 
ProtobufUtil.toIncrement(action.getMutation(), cellScanner);
+            ++countOfCompleteMutation;
+            checkCellSizeLimit(region, increment);
+            
spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
+            mutations.add(increment);
+            break;
+          case APPEND:
+            Append append = ProtobufUtil.toAppend(action.getMutation(), 
cellScanner);
+            ++countOfCompleteMutation;
+            checkCellSizeLimit(region, append);
+            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
+            mutations.add(append);
+            break;
           default:
-            throw new DoNotRetryIOException("Atomic put and/or delete only, 
not " + type.name());
+            throw new AssertionError("invalid mutation type : " + type);

Review comment:
       Why change to AssertionError?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to