This is an automated email from the ASF dual-hosted git repository.

brfrn169 pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 3775464  HBASE-25242 Add Increment/Append support to RowMutations 
(#2711)
3775464 is described below

commit 3775464981b473599c6d1bb24a7eea3badf0ed03
Author: Toshihiro Suzuki <brfrn...@gmail.com>
AuthorDate: Fri Nov 27 03:53:19 2020 +0900

    HBASE-25242 Add Increment/Append support to RowMutations (#2711)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Signed-off-by: Andrew Purtell <apurt...@apache.org>
---
 .../org/apache/hadoop/hbase/client/AsyncTable.java |   4 +-
 .../apache/hadoop/hbase/client/AsyncTableImpl.java |   2 +-
 .../apache/hadoop/hbase/client/CheckAndMutate.java | 127 +++------
 .../org/apache/hadoop/hbase/client/HTable.java     |  21 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  10 +-
 .../apache/hadoop/hbase/client/RowMutations.java   |   5 +-
 .../java/org/apache/hadoop/hbase/client/Table.java |   5 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |   8 +-
 .../hbase/shaded/protobuf/RequestConverter.java    |  37 +--
 .../hbase/shaded/protobuf/ResponseConverter.java   |  85 ++++--
 .../hadoop/hbase/rest/client/RemoteHTable.java     |   2 +-
 .../hadoop/hbase/coprocessor/RegionObserver.java   |   2 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 145 +++++-----
 .../regionserver/MiniBatchOperationInProgress.java |   1 +
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  93 ++++++-
 .../apache/hadoop/hbase/regionserver/Region.java   |  12 +-
 .../apache/hadoop/hbase/client/TestAsyncTable.java | 146 +++++++++-
 .../hadoop/hbase/client/TestAsyncTableBatch.java   |  44 ++-
 .../hbase/client/TestAsyncTableNoncedRetry.java    |  27 ++
 .../hadoop/hbase/client/TestCheckAndMutate.java    | 118 ++++++++
 .../hadoop/hbase/client/TestFromClientSide3.java   |  44 ++-
 .../hadoop/hbase/client/TestFromClientSide5.java   |  41 ++-
 .../hbase/coprocessor/SimpleRegionObserver.java    |  12 +
 .../coprocessor/TestRegionObserverInterface.java   |  92 ++++++-
 .../hadoop/hbase/regionserver/RegionAsTable.java   |   2 +-
 .../hadoop/hbase/regionserver/TestHRegion.java     | 305 ++++++++++++++++++++-
 .../regionserver/TestWALEntrySinkFilter.java       |   4 +-
 .../hadoop/hbase/thrift2/client/ThriftTable.java   |   3 +-
 28 files changed, 1098 insertions(+), 299 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 25ea143..7473ed0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -396,9 +396,9 @@ public interface AsyncTable<C extends 
ScanResultConsumerBase> {
    * Performs multiple mutations atomically on a single row. Currently {@link 
Put} and
    * {@link Delete} are supported.
    * @param mutation object that specifies the set of mutations to perform 
atomically
-   * @return A {@link CompletableFuture} that always returns null when 
complete normally.
+   * @return A {@link CompletableFuture} that returns results of 
Increment/Append operations
    */
-  CompletableFuture<Void> mutateRow(RowMutations mutation);
+  CompletableFuture<Result> mutateRow(RowMutations mutation);
 
   /**
    * The scan API uses the observer pattern.
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 82e8cd5..ba2c560 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -217,7 +217,7 @@ class AsyncTableImpl implements 
AsyncTable<ScanResultConsumer> {
   }
 
   @Override
-  public CompletableFuture<Void> mutateRow(RowMutations mutation) {
+  public CompletableFuture<Result> mutateRow(RowMutations mutation) {
     return wrap(rawTable.mutateRow(mutation));
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
index a163c8d..47bbc53 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
@@ -17,18 +17,15 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.NavigableMap;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
 
 /**
  * Used to perform CheckAndMutate operations.
@@ -58,7 +55,7 @@ import 
org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public final class CheckAndMutate extends Mutation {
+public final class CheckAndMutate implements Row {
 
   /**
    * A builder class for building a CheckAndMutate object.
@@ -200,15 +197,15 @@ public final class CheckAndMutate extends Mutation {
     }
 
     /**
-     * @param mutation mutations to perform if check succeeds
+     * @param mutations mutations to perform if check succeeds
      * @return a CheckAndMutate object
      */
-    public CheckAndMutate build(RowMutations mutation) {
-      preCheck(mutation);
+    public CheckAndMutate build(RowMutations mutations) {
+      preCheck(mutations);
       if (filter != null) {
-        return new CheckAndMutate(row, filter, timeRange, mutation);
+        return new CheckAndMutate(row, filter, timeRange, mutations);
       } else {
-        return new CheckAndMutate(row, family, qualifier, op, value, 
timeRange, mutation);
+        return new CheckAndMutate(row, family, qualifier, op, value, 
timeRange, mutations);
       }
     }
   }
@@ -223,6 +220,7 @@ public final class CheckAndMutate extends Mutation {
     return new Builder(row);
   }
 
+  private final byte[] row;
   private final byte[] family;
   private final byte[] qualifier;
   private final CompareOperator op;
@@ -233,7 +231,7 @@ public final class CheckAndMutate extends Mutation {
 
   private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final 
CompareOperator op,
     byte[] value, TimeRange timeRange, Row action) {
-    super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
+    this.row = row;
     this.family = family;
     this.qualifier = qualifier;
     this.op = op;
@@ -244,7 +242,7 @@ public final class CheckAndMutate extends Mutation {
   }
 
   private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row 
action) {
-    super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
+    this.row = row;
     this.family = null;
     this.qualifier = null;
     this.op = null;
@@ -255,6 +253,37 @@ public final class CheckAndMutate extends Mutation {
   }
 
   /**
+   * @return the row
+   */
+  @Override
+  public byte[] getRow() {
+    return row;
+  }
+
+  @Override
+  public int compareTo(Row row) {
+    return Bytes.compareTo(this.getRow(), row.getRow());
+  }
+
+  // Added to get rid of the stopbugs error
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    Row other = (Row) obj;
+    return compareTo(other) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Bytes.hashCode(this.getRow());
+  }
+
+  /**
    * @return the family to check
    */
   public byte[] getFamily() {
@@ -309,76 +338,4 @@ public final class CheckAndMutate extends Mutation {
   public Row getAction() {
     return action;
   }
-
-  @Override
-  public NavigableMap<byte[], List<Cell>> getFamilyCellMap() {
-    if (action instanceof Mutation) {
-      return ((Mutation) action).getFamilyCellMap();
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getTimestamp() {
-    if (action instanceof Mutation) {
-      return ((Mutation) action).getTimestamp();
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Mutation setTimestamp(long timestamp) {
-    if (action instanceof Mutation) {
-      return ((Mutation) action).setTimestamp(timestamp);
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Durability getDurability() {
-    if (action instanceof Mutation) {
-      return ((Mutation) action).getDurability();
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Mutation setDurability(Durability d) {
-    if (action instanceof Mutation) {
-      return ((Mutation) action).setDurability(d);
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public byte[] getAttribute(String name) {
-    if (action instanceof Mutation) {
-      return ((Mutation) action).getAttribute(name);
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public OperationWithAttributes setAttribute(String name, byte[] value) {
-    if (action instanceof Mutation) {
-      return ((Mutation) action).setAttribute(name, value);
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int getPriority() {
-    if (action instanceof Mutation) {
-      return ((Mutation) action).getPriority();
-    }
-    return ((RowMutations) action).getMaxPriority();
-  }
-
-  @Override
-  public OperationWithAttributes setPriority(int priority) {
-    if (action instanceof Mutation) {
-      return ((Mutation) action).setPriority(priority);
-    }
-    throw new UnsupportedOperationException();
-  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 5f3afa1..a219fed 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -554,7 +554,7 @@ public class HTable implements Table {
   }
 
   @Override
-  public void mutateRow(final RowMutations rm) throws IOException {
+  public Result mutateRow(final RowMutations rm) throws IOException {
     CancellableRegionServerCallable<MultiResponse> callable =
       new CancellableRegionServerCallable<MultiResponse>(this.connection, 
getName(), rm.getRow(),
           rpcControllerFactory.newController(), writeRpcTimeoutMs,
@@ -578,20 +578,23 @@ public class HTable implements Table {
         return ResponseConverter.getResults(request, response, 
getRpcControllerCellScanner());
       }
     };
+    Object[] results = new Object[rm.getMutations().size()];
     AsyncProcessTask task = AsyncProcessTask.newBuilder()
-            .setPool(pool)
-            .setTableName(tableName)
-            .setRowAccess(rm.getMutations())
-            .setCallable(callable)
-            .setRpcTimeout(writeRpcTimeoutMs)
-            .setOperationTimeout(operationTimeoutMs)
-            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
-            .build();
+      .setPool(pool)
+      .setTableName(tableName)
+      .setRowAccess(rm.getMutations())
+      .setCallable(callable)
+      .setRpcTimeout(writeRpcTimeoutMs)
+      .setOperationTimeout(operationTimeoutMs)
+      .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+      .setResults(results)
+      .build();
     AsyncRequestFuture ars = multiAp.submit(task);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
     }
+    return (Result) results[0];
   }
 
   @Override
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index c3d3246..63ade0d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -551,17 +551,17 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
   }
 
   @Override
-  public CompletableFuture<Void> mutateRow(RowMutations mutation) {
-    return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), 
writeRpcTimeoutNs)
-      .action((controller, loc, stub) ->
-        this.<Result, Void> mutateRow(controller, loc, stub, mutation,
+  public CompletableFuture<Result> mutateRow(RowMutations mutations) {
+    return this.<Result> newCaller(mutations.getRow(), 
mutations.getMaxPriority(),
+      writeRpcTimeoutNs).action((controller, loc, stub) ->
+        this.<Result, Result> mutateRow(controller, loc, stub, mutations,
           (rn, rm) -> {
             RegionAction.Builder regionMutationBuilder = RequestConverter
               .buildRegionAction(rn, rm);
             regionMutationBuilder.setAtomic(true);
             return 
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build())
               .build();
-          }, resp -> null))
+          }, resp -> resp))
       .call();
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
index 345e26a..084e259 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
@@ -30,7 +30,6 @@ import 
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
 
 /**
  * Performs multiple mutations atomically on a single row.
- * Currently {@link Put} and {@link Delete} are supported.
  *
  * The mutations are performed in the order in which they
  * were added.
@@ -110,15 +109,13 @@ public class RowMutations implements Row {
   }
 
   /**
-   * Currently only supports {@link Put} and {@link Delete} mutations.
-   *
    * @param mutations The data to send.
    * @throws IOException if the row of added mutation doesn't match the 
original row
    */
   public RowMutations add(List<? extends Mutation> mutations) throws 
IOException {
     for (Mutation mutation : mutations) {
       if (!Bytes.equals(row, mutation.getRow())) {
-        throw new WrongRowIOException("The row in the recently added 
Put/Delete <" +
+        throw new WrongRowIOException("The row in the recently added Mutation 
<" +
           Bytes.toStringBinary(mutation.getRow()) + "> doesn't match the 
original one <" +
           Bytes.toStringBinary(this.row) + ">");
       }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 34f5e39..f8a28d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -644,9 +644,10 @@ public interface Table extends Closeable {
    * {@link Put} and {@link Delete} are supported.
    *
    * @param rm object that specifies the set of mutations to perform atomically
-   * @throws IOException
+   * @return results of Increment/Append operations
+   * @throws IOException if a remote or network exception occurs.
    */
-  default void mutateRow(final RowMutations rm) throws IOException {
+  default Result mutateRow(final RowMutations rm) throws IOException {
     throw new NotImplementedException("Add an implementation!");
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 3e90465..33feb29 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -3570,9 +3570,13 @@ public final class ProtobufUtil {
           return builder.build((Put) m);
         } else if (m instanceof Delete) {
           return builder.build((Delete) m);
+        } else if (m instanceof Increment) {
+          return builder.build((Increment) m);
+        } else if (m instanceof Append) {
+          return builder.build((Append) m);
         } else {
-          throw new DoNotRetryIOException("Unsupported mutate type: " + 
mutations.get(0)
-            .getClass().getSimpleName().toUpperCase());
+          throw new DoNotRetryIOException("Unsupported mutate type: " + 
m.getClass()
+            .getSimpleName().toUpperCase());
         }
       } else {
         return builder.build(new 
RowMutations(mutations.get(0).getRow()).add(mutations));
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index f435f56..54b55cc 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -271,17 +271,9 @@ public final class RequestConverter {
     ClientProtos.Action.Builder actionBuilder = 
ClientProtos.Action.newBuilder();
     MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
     for (Mutation mutation: rowMutations.getMutations()) {
-      MutationType mutateType;
-      if (mutation instanceof Put) {
-        mutateType = MutationType.PUT;
-      } else if (mutation instanceof Delete) {
-        mutateType = MutationType.DELETE;
-      } else {
-        throw new DoNotRetryIOException("RowMutations supports only put and 
delete, not " +
-            mutation.getClass().getName());
-      }
       mutationBuilder.clear();
-      MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, 
mutationBuilder);
+      MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), 
mutation,
+        mutationBuilder);
       actionBuilder.clear();
       actionBuilder.setMutation(mp);
       builder.addAction(actionBuilder.build());
@@ -387,17 +379,9 @@ public final class RequestConverter {
     ClientProtos.Action.Builder actionBuilder = 
ClientProtos.Action.newBuilder();
     MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
     for (Mutation mutation: rowMutations.getMutations()) {
-      MutationType mutateType = null;
-      if (mutation instanceof Put) {
-        mutateType = MutationType.PUT;
-      } else if (mutation instanceof Delete) {
-        mutateType = MutationType.DELETE;
-      } else {
-        throw new DoNotRetryIOException("RowMutations supports only put and 
delete, not " +
-          mutation.getClass().getName());
-      }
       mutationBuilder.clear();
-      MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, 
mutationBuilder);
+      MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), 
mutation,
+        mutationBuilder);
       actionBuilder.clear();
       actionBuilder.setMutation(mp);
       builder.addAction(actionBuilder.build());
@@ -928,17 +912,9 @@ public final class RequestConverter {
     final ClientProtos.Action.Builder actionBuilder, final 
MutationProto.Builder mutationBuilder)
     throws IOException {
     for (Mutation mutation: rowMutations.getMutations()) {
-      MutationType type;
-      if (mutation instanceof Put) {
-        type = MutationType.PUT;
-      } else if (mutation instanceof Delete) {
-        type = MutationType.DELETE;
-      } else {
-        throw new DoNotRetryIOException("RowMutations supports only put and 
delete, not " +
-          mutation.getClass().getName());
-      }
       mutationBuilder.clear();
-      MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, 
mutationBuilder);
+      MutationProto mp = 
ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation,
+        mutationBuilder);
       cells.add(mutation);
       actionBuilder.clear();
       regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
@@ -946,7 +922,6 @@ public final class RequestConverter {
   }
 
   private static MutationType getMutationType(Mutation mutation) {
-    assert !(mutation instanceof CheckAndMutate);
     if (mutation instanceof Put) {
       return MutationType.PUT;
     } else if (mutation instanceof Delete) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index 87a0df1..d3f8b8f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -148,8 +148,6 @@ public final class ResponseConverter {
             actionResult.getResultOrExceptionCount() + " for region " + 
actions.getRegion());
       }
 
-      Object responseValue;
-
       // For RowMutations/CheckAndMutate action, if there is an exception, the 
exception is set
       // at the RegionActionResult level and the ResultOrException is null at 
the original index
       Integer index = (indexMap == null ? null : indexMap.get(i));
@@ -158,23 +156,10 @@ public final class ResponseConverter {
         // If there is an exception from the server, the exception is set at
         // the RegionActionResult level, which has been handled above.
         if (actions.hasCondition()) {
-          Result result = null;
-          if (actionResult.getResultOrExceptionCount() > 0) {
-            ResultOrException roe = actionResult.getResultOrException(0);
-            if (roe.hasResult()) {
-              Result r = ProtobufUtil.toResult(roe.getResult(), cells);
-              if (!r.isEmpty()) {
-                result = r;
-              }
-            }
-          }
-          responseValue = new 
CheckAndMutateResult(actionResult.getProcessed(), result);
+          results.add(regionName, index, getCheckAndMutateResult(actionResult, 
cells));
         } else {
-          responseValue = actionResult.getProcessed() ?
-            ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
-            ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
+          results.add(regionName, index, getMutateRowResult(actionResult, 
cells));
         }
-        results.add(regionName, index, responseValue);
         continue;
       }
 
@@ -185,11 +170,25 @@ public final class ResponseConverter {
           if (!r.isEmpty()) {
             result = r;
           }
-          responseValue = new 
CheckAndMutateResult(actionResult.getProcessed(), result);
-          results.add(regionName, roe.getIndex(), responseValue);
+          results.add(regionName, roe.getIndex(),
+            new CheckAndMutateResult(actionResult.getProcessed(), result));
         }
       } else {
+        if (actionResult.hasProcessed()) {
+          for (ResultOrException roe : 
actionResult.getResultOrExceptionList()) {
+            Result result = actionResult.getProcessed() ?
+              ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : 
ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
+            Result r = ProtobufUtil.toResult(roe.getResult(), cells);
+            if (!r.isEmpty()) {
+              r.setExists(true);
+              result = r;
+            }
+            results.add(regionName, roe.getIndex(), result);
+          }
+          continue;
+        }
         for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
+          Object responseValue;
           if (roe.hasException()) {
             responseValue = ProtobufUtil.toException(roe.getException());
           } else if (roe.hasResult()) {
@@ -197,12 +196,7 @@ public final class ResponseConverter {
           } else if (roe.hasServiceResult()) {
             responseValue = roe.getServiceResult();
           } else {
-            // Sometimes, the response is just "it was processed". Generally, 
this occurs for things
-            // like mutateRows where either we get back 'processed' (or not) 
and optionally some
-            // statistics about the regions we touched.
-            responseValue = actionResult.getProcessed() ?
-              ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
-              ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
+            responseValue = ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE;
           }
           results.add(regionName, roe.getIndex(), responseValue);
         }
@@ -219,6 +213,47 @@ public final class ResponseConverter {
     return results;
   }
 
+  private static CheckAndMutateResult 
getCheckAndMutateResult(RegionActionResult actionResult,
+    CellScanner cells) throws IOException {
+    Result result = null;
+    if (actionResult.getResultOrExceptionCount() > 0) {
+      // Get the result of the Increment/Append operations from the first 
element of the
+      // ResultOrException list
+      ResultOrException roe = actionResult.getResultOrException(0);
+      if (roe.hasResult()) {
+        Result r = ProtobufUtil.toResult(roe.getResult(), cells);
+        if (!r.isEmpty()) {
+          result = r;
+        }
+      }
+    }
+    return new CheckAndMutateResult(actionResult.getProcessed(), result);
+  }
+
+  private static Result getMutateRowResult(RegionActionResult actionResult, 
CellScanner cells)
+    throws IOException {
+    if (actionResult.getProcessed()) {
+      Result result = null;
+      if (actionResult.getResultOrExceptionCount() > 0) {
+        // Get the result of the Increment/Append operations from the first 
element of the
+        // ResultOrException list
+        ResultOrException roe = actionResult.getResultOrException(0);
+        Result r = ProtobufUtil.toResult(roe.getResult(), cells);
+        if (!r.isEmpty()) {
+          r.setExists(true);
+          result = r;
+        }
+      }
+      if (result != null) {
+        return result;
+      } else {
+        return ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE;
+      }
+    } else {
+      return ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
+    }
+  }
+
   /**
    * Create a CheckAndMutateResult object from a protocol buffer MutateResponse
    *
diff --git 
a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
 
b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index f849601..86de239 100644
--- 
a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ 
b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -875,7 +875,7 @@ public class RemoteHTable implements Table {
   }
 
   @Override
-  public void mutateRow(RowMutations rm) throws IOException {
+  public Result mutateRow(RowMutations rm) throws IOException {
     throw new IOException("atomicMutation not supported");
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index ab2c8ce..0782dae 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -919,7 +919,7 @@ public interface RegionObserver {
   /**
    * Called after checkAndMutate
    * <p>
-   * Note: Do not retain references to any Cells in 'delete' beyond the life 
of this invocation.
+   * Note: Do not retain references to any Cells in actions beyond the life of 
this invocation.
    * If need a Cell reference for later use, copy the cell and use that.
    * @param c the environment provided by the region server
    * @param checkAndMutate the CheckAndMutate object
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9468130..3e0710b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3454,8 +3454,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       try {
         this.checkAndPrepareMutation(mutation, timestamp);
 
-        // store the family map reference to allow for mutations
-        familyCellMaps[index] = mutation.getFamilyCellMap();
+        if (mutation instanceof Put || mutation instanceof Delete) {
+          // store the family map reference to allow for mutations
+          familyCellMaps[index] = mutation.getFamilyCellMap();
+        }
+
         // store durability for the batch (highest durability of all 
operations in the batch)
         Durability tmpDur = 
region.getEffectiveDurability(mutation.getDurability());
         if (tmpDur.ordinal() > durability.ordinal()) {
@@ -3846,33 +3849,51 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
             Bytes.toBytes(timestamp));
           miniBatchOp.incrementNumOfDeletes();
         } else if (mutation instanceof Increment || mutation instanceof 
Append) {
+          boolean returnResults;
+          if (mutation instanceof Increment) {
+            returnResults = ((Increment) mutation).isReturnResults();
+          } else {
+            returnResults = ((Append) mutation).isReturnResults();
+          }
+
           // For nonce operations
           canProceed[index] = startNonceOperation(nonceGroup, nonce);
           if (!canProceed[index]) {
-            // convert duplicate increment/append to get
-            List<Cell> results = region.get(toGet(mutation), false, 
nonceGroup, nonce);
-            retCodeDetails[index] = new 
OperationStatus(OperationStatusCode.SUCCESS,
-              Result.create(results));
+            Result result;
+            if (returnResults) {
+              // convert duplicate increment/append to get
+              List<Cell> results = region.get(toGet(mutation), false, 
nonceGroup, nonce);
+              result = Result.create(results);
+            } else {
+              result = Result.EMPTY_RESULT;
+            }
+            retCodeDetails[index] = new 
OperationStatus(OperationStatusCode.SUCCESS, result);
             return true;
           }
 
-          boolean returnResults;
-          if (mutation instanceof Increment) {
-            returnResults = ((Increment) mutation).isReturnResults();
-            miniBatchOp.incrementNumOfIncrements();
-          } else {
-            returnResults = ((Append) mutation).isReturnResults();
-            miniBatchOp.incrementNumOfAppends();
+          Result result = null;
+          if (region.coprocessorHost != null) {
+            if (mutation instanceof Increment) {
+              result = 
region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
+            } else {
+              result = region.coprocessorHost.preAppendAfterRowLock((Append) 
mutation);
+            }
           }
-          Result result = doCoprocessorPreCallAfterRowLock(mutation);
           if (result != null) {
             retCodeDetails[index] = new 
OperationStatus(OperationStatusCode.SUCCESS,
               returnResults ? result : Result.EMPTY_RESULT);
             return true;
           }
+
           List<Cell> results = returnResults ? new 
ArrayList<>(mutation.size()) : null;
           familyCellMaps[index] = reckonDeltas(mutation, results, timestamp);
-          this.results[index] = results != null ? Result.create(results): 
Result.EMPTY_RESULT;
+          this.results[index] = results != null ? Result.create(results) : 
Result.EMPTY_RESULT;
+
+          if (mutation instanceof Increment) {
+            miniBatchOp.incrementNumOfIncrements();
+          } else {
+            miniBatchOp.incrementNumOfAppends();
+          }
         }
         region.rewriteCellTags(familyCellMaps[index], mutation);
 
@@ -3954,28 +3975,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       return get;
     }
 
-    /**
-     * Do coprocessor pre-increment or pre-append after row lock call.
-     * @return Result returned out of the coprocessor, which means bypass all 
further processing
-     *   and return the preferred Result instead, or null which means proceed.
-     */
-    private Result doCoprocessorPreCallAfterRowLock(Mutation mutation) throws 
IOException {
-      assert mutation instanceof Increment || mutation instanceof Append;
-      Result result = null;
-      if (region.coprocessorHost != null) {
-        if (mutation instanceof Increment) {
-          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) 
mutation);
-        } else {
-          result = region.coprocessorHost.preAppendAfterRowLock((Append) 
mutation);
-        }
-      }
-      return result;
-    }
-
     private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> 
results,
       long now) throws IOException {
       assert mutation instanceof Increment || mutation instanceof Append;
-      Map<byte[], List<Cell>> ret = new HashMap<>();
+      Map<byte[], List<Cell>> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
       // Process a Store/family at a time.
       for (Map.Entry<byte [], List<Cell>> entry: 
mutation.getFamilyCellMap().entrySet()) {
         final byte[] columnFamilyName = entry.getKey();
@@ -4018,14 +4021,28 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
       List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
 
+      // Sort the cells so that they match the order that they appear in the 
Get results.
+      // Otherwise, we won't be able to find the existing values if the cells 
are not specified
+      // in order by the client since cells are in an array list.
+      sort(deltas, store.getComparator());
+
       // Get previous values for all columns in this family.
+      Get get = new Get(mutation.getRow());
+      for (Cell cell: deltas) {
+        get.addColumn(columnFamily, CellUtil.cloneQualifier(cell));
+      }
       TimeRange tr;
       if (mutation instanceof Increment) {
         tr = ((Increment) mutation).getTimeRange();
       } else {
         tr = ((Append) mutation).getTimeRange();
       }
-      List<Cell> currentValues = get(mutation, store, deltas, tr);
+
+      if (tr != null) {
+        get.setTimeRange(tr.getMin(), tr.getMax());
+      }
+
+      List<Cell> currentValues = region.get(get, false);
 
       // Iterate the input columns and update existing values if they were 
found, otherwise
       // add new column initialized to the delta amount
@@ -4119,31 +4136,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       return PrivateCellUtil.getValueAsLong(cell);
     }
 
-    /**
-     * Do a specific Get on passed <code>columnFamily</code> and column 
qualifiers.
-     * @param mutation Mutation we are doing this Get for.
-     * @param store Which column family on row (TODO: Go all Gets in one go)
-     * @param coordinates Cells from <code>mutation</code> used as coordinates 
applied to Get.
-     * @return Return list of Cells found.
-     */
-    private List<Cell> get(Mutation mutation, HStore store, List<Cell> 
coordinates,
-      TimeRange tr) throws IOException {
-      // Sort the cells so that they match the order that they appear in the 
Get results.
-      // Otherwise, we won't be able to find the existing values if the cells 
are not specified
-      // in order by the client since cells are in an array list.
-      // TODO: I don't get why we are sorting. St.Ack 20150107
-      sort(coordinates, store.getComparator());
-      Get get = new Get(mutation.getRow());
-      for (Cell cell: coordinates) {
-        get.addColumn(store.getColumnFamilyDescriptor().getName(), 
CellUtil.cloneQualifier(cell));
-      }
-      // Increments carry time range. If an Increment instance, put it on the 
Get.
-      if (tr != null) {
-        get.setTimeRange(tr.getMin(), tr.getMax());
-      }
-      return region.get(get, false);
-    }
-
     @Override
     public List<Pair<NonceKey, WALEdit>> buildWALEdits(final 
MiniBatchOperationInProgress<Mutation>
         miniBatchOp) throws IOException {
@@ -4318,6 +4310,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       }
     }
 
+    // TODO Support Increment/Append operations
     private void checkAndMergeCPMutations(final 
MiniBatchOperationInProgress<Mutation> miniBatchOp,
         final List<RowLock> acquiredRowLocks, final long timestamp) throws 
IOException {
       visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), (int 
i) -> {
@@ -4527,7 +4520,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   /**
    * Perform a batch of mutations.
    *
-   * It supports Put, Delete, Increment, Append mutations and will ignore 
other types passed.
    * Operations in a batch are stored with highest durability specified of for 
all operations in a
    * batch, except for {@link Durability#SKIP_WAL}.
    *
@@ -4885,11 +4877,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
             // timestamp from get (see prepareDeleteTimestamps).
           }
           // All edits for the given row (across all column families) must 
happen atomically.
-          Result r = null;
+          Result r;
           if (mutation != null) {
             r = doBatchMutate(mutation, true).getResult();
           } else {
-            mutateRow(rowMutations);
+            r = mutateRow(rowMutations);
           }
           this.checkAndMutateChecksPassed.increment();
           return new CheckAndMutateResult(true, r);
@@ -8250,11 +8242,30 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   }
 
   @Override
-  public void mutateRow(RowMutations rm) throws IOException {
-    // Don't need nonces here - RowMutations only supports puts and deletes
+  public Result mutateRow(RowMutations rm) throws IOException {
     final List<Mutation> m = rm.getMutations();
-    batchMutate(m.toArray(new Mutation[m.size()]), true, HConstants.NO_NONCE,
-        HConstants.NO_NONCE);
+    OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true,
+      HConstants.NO_NONCE, HConstants.NO_NONCE);
+
+    List<Result> results = new ArrayList<>();
+    for (OperationStatus status : statuses) {
+      if (status.getResult() != null) {
+        results.add(status.getResult());
+      }
+    }
+
+    if (results.isEmpty()) {
+      return null;
+    }
+
+    // Merge the results of the Increment/Append operations
+    List<Cell> cells = new ArrayList<>();
+    for (Result result : results) {
+      if (result.rawCells() != null) {
+        cells.addAll(Arrays.asList(result.rawCells()));
+      }
+    }
+    return Result.create(cells);
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
index ae5b6ec..ac6b5dc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
@@ -124,6 +124,7 @@ public class MiniBatchOperationInProgress<T> {
    * in the same batch. These mutations are applied to the WAL and applied to 
the memstore as well.
    * The timestamp of the cells in the given Mutations MUST be obtained from 
the original mutation.
    * <b>Note:</b> The durability from CP will be replaced by the durability of 
corresponding mutation.
+   * <b>Note:</b> Currently only supports Put and Delete operations.
    * @param index the index that corresponds to the original mutation index in 
the batch
    * @param newOperations the Mutations to add
    */
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index c9a710a..7862b93 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -620,8 +620,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
             mutations.add(del);
             break;
+          case INCREMENT:
+            Increment increment = 
ProtobufUtil.toIncrement(action.getMutation(), cellScanner);
+            ++countOfCompleteMutation;
+            checkCellSizeLimit(region, increment);
+            
spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
+            mutations.add(increment);
+            break;
+          case APPEND:
+            Append append = ProtobufUtil.toAppend(action.getMutation(), 
cellScanner);
+            ++countOfCompleteMutation;
+            checkCellSizeLimit(region, append);
+            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
+            mutations.add(append);
+            break;
           default:
-            throw new DoNotRetryIOException("Atomic put and/or delete only, 
not " + type.name());
+            throw new DoNotRetryIOException("invalid mutation type : " + type);
         }
       }
 
@@ -914,7 +928,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   /**
-   * Execute a list of Put/Delete mutations.
+   * Execute a list of mutations.
    *
    * @param builder
    * @param region
@@ -944,12 +958,27 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         }
         MutationProto m = action.getMutation();
         Mutation mutation;
-        if (m.getMutateType() == MutationType.PUT) {
-          mutation = ProtobufUtil.toPut(m, cells);
-          batchContainsPuts = true;
-        } else {
-          mutation = ProtobufUtil.toDelete(m, cells);
-          batchContainsDelete = true;
+        switch (m.getMutateType()) {
+          case PUT:
+            mutation = ProtobufUtil.toPut(m, cells);
+            batchContainsPuts = true;
+            break;
+
+          case DELETE:
+            mutation = ProtobufUtil.toDelete(m, cells);
+            batchContainsDelete = true;
+            break;
+
+          case INCREMENT:
+            mutation = ProtobufUtil.toIncrement(m, cells);
+            break;
+
+          case APPEND:
+            mutation = ProtobufUtil.toAppend(m, cells);
+            break;
+
+          default:
+            throw new DoNotRetryIOException("Invalid mutation type : " + 
m.getMutateType());
         }
         mutationActionMap.put(mutation, action);
         mArray[i++] = mutation;
@@ -972,11 +1001,51 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
 
       OperationStatus[] codes = region.batchMutate(mArray, atomic, 
HConstants.NO_NONCE,
         HConstants.NO_NONCE);
+
+      // When atomic is true, it indicates that the mutateRow API or the batch 
API with
+      // RowMutations is called. In this case, we need to merge the results of 
the
+      // Increment/Append operations if the mutations include those 
operations, and set the merged
+      // result to the first element of the ResultOrException list
+      if (atomic) {
+        List<ResultOrException> resultOrExceptions = new ArrayList<>();
+        List<Result> results = new ArrayList<>();
+        for (i = 0; i < codes.length; i++) {
+          if (codes[i].getResult() != null) {
+            results.add(codes[i].getResult());
+          }
+          if (i != 0) {
+            resultOrExceptions.add(getResultOrException(
+              ClientProtos.Result.getDefaultInstance(), i));
+          }
+        }
+
+        if (results.isEmpty()) {
+          builder.addResultOrException(getResultOrException(
+            ClientProtos.Result.getDefaultInstance(), 0));
+        } else {
+          // Merge the results of the Increment/Append operations
+          List<Cell> cellList = new ArrayList<>();
+          for (Result result : results) {
+            if (result.rawCells() != null) {
+              cellList.addAll(Arrays.asList(result.rawCells()));
+            }
+          }
+          Result result = Result.create(cellList);
+
+          // Set the merged result of the Increment/Append operations to the 
first element of the
+          // ResultOrException list
+          
builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result),
 0));
+        }
+
+        builder.addAllResultOrException(resultOrExceptions);
+        return;
+      }
+
       for (i = 0; i < codes.length; i++) {
         Mutation currentMutation = mArray[i];
         ClientProtos.Action currentAction = 
mutationActionMap.get(currentMutation);
-        int index = currentAction.hasIndex() || !atomic ? 
currentAction.getIndex() : i;
-        Exception e = null;
+        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
+        Exception e;
         switch (codes[i].getOperationStatusCode()) {
           case BAD_FAMILY:
             e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
@@ -2738,6 +2807,8 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
               regionActionResultBuilder.setProcessed(result.isSuccess());
               for (int i = 0; i < regionAction.getActionCount(); i++) {
                 if (i == 0 && result.getResult() != null) {
+                  // Set the result of the Increment/Append operations to the 
first element of the
+                  // ResultOrException list
                   resultOrExceptionOrBuilder.setIndex(i);
                   
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
                     
.setResult(ProtobufUtil.toResult(result.getResult())).build());
@@ -2761,7 +2832,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
               cellScanner, spaceQuotaEnforcement);
             regionActionResultBuilder.setProcessed(true);
             // We no longer use MultiResponse#processed. Instead, we use
-            // RegionActionResult#condition. This is for backward 
compatibility for old clients.
+            // RegionActionResult#processed. This is for backward 
compatibility for old clients.
             responseBuilder.setProcessed(true);
           } catch (IOException e) {
             rpcServer.getMetrics().exception(e);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 8b7de78..71d9c4c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -290,8 +290,9 @@ public interface Region extends ConfigurationObserver {
   /**
    * Perform a batch of mutations.
    * <p>
-   * Note this supports only Put, Delete, Increment and Append mutations and 
will ignore other
-   * types passed.
+   * Please do not operate on a same column of a single row in a batch, we 
will not consider the
+   * previous operation in the same batch when performing the operations in 
the batch.
+   *
    * @param mutations the list of mutations
    * @return an array of OperationStatus which internally contains the
    *         OperationStatusCode and the exceptionMessage if any.
@@ -528,13 +529,14 @@ public interface Region extends ConfigurationObserver {
   Result increment(Increment increment) throws IOException;
 
   /**
-   * Performs multiple mutations atomically on a single row. Currently
-   * {@link Put} and {@link Delete} are supported.
+   * Performs multiple mutations atomically on a single row.
    *
    * @param mutations object that specifies the set of mutations to perform 
atomically
+   * @return results of Increment/Append operations. If no Increment/Append 
operations, it returns
+   *   null
    * @throws IOException
    */
-  void mutateRow(RowMutations mutations) throws IOException;
+  Result mutateRow(RowMutations mutations) throws IOException;
 
   /**
    * Perform atomic mutations within the region.
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 2944c40..9e6748e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -250,18 +250,29 @@ public class TestAsyncTable {
   public void testMutateRow() throws InterruptedException, ExecutionException, 
IOException {
     AsyncTable<?> table = getTable.get();
     RowMutations mutation = new RowMutations(row);
-    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 
1), VALUE));
-    table.mutateRow(mutation).get();
-    Result result = table.get(new Get(row)).get();
+    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
+    Result result = table.mutateRow(mutation).get();
+    assertTrue(result.getExists());
+    assertTrue(result.isEmpty());
+
+    result = table.get(new Get(row)).get();
     assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
 
     mutation = new RowMutations(row);
-    mutation.add((Mutation) new Delete(row).addColumn(FAMILY, 
concat(QUALIFIER, 1)));
-    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 
2), VALUE));
-    table.mutateRow(mutation).get();
+    mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
+    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
+    mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 
2L));
+    mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), 
Bytes.toBytes("abc")));
+    result = table.mutateRow(mutation).get();
+    assertTrue(result.getExists());
+    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 
3))));
+    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, 
concat(QUALIFIER, 4))));
+
     result = table.get(new Get(row)).get();
     assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
     assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
+    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 
3))));
+    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, 
concat(QUALIFIER, 4))));
   }
 
   // Tests for old checkAndMutate API
@@ -995,7 +1006,7 @@ public class TestAsyncTable {
   public void testCheckAndIncrement() throws Throwable {
     AsyncTable<?> table = getTable.get();
 
-    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("a")));
+    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("a"))).get();
 
     // CheckAndIncrement with correct value
     CheckAndMutateResult res = 
table.checkAndMutate(CheckAndMutate.newBuilder(row)
@@ -1052,7 +1063,7 @@ public class TestAsyncTable {
   public void testCheckAndAppend() throws Throwable {
     AsyncTable<?> table = getTable.get();
 
-    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("a")));
+    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("a"))).get();
 
     // CheckAndAppend with correct value
     CheckAndMutateResult res = 
table.checkAndMutate(CheckAndMutate.newBuilder(row)
@@ -1105,6 +1116,66 @@ public class TestAsyncTable {
     assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
   }
 
+  @Test
+  public void testCheckAndRowMutations() throws Throwable {
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] q3 = Bytes.toBytes("q3");
+    final byte[] q4 = Bytes.toBytes("q4");
+    final String v1 = "v1";
+
+    AsyncTable<?> table = getTable.get();
+
+    // Initial values
+    table.putAll(Arrays.asList(
+      new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")),
+      new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)),
+      new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get();
+
+    // Do CheckAndRowMutations
+    CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, q1)
+      .build(new RowMutations(row).add(Arrays.asList(
+        new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)),
+        new Delete(row).addColumns(FAMILY, q2),
+        new Increment(row).addColumn(FAMILY, q3, 1),
+        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
+      );
+
+    CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get();
+    assertTrue(result.isSuccess());
+    assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3)));
+    assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, 
q4)));
+
+    // Verify the value
+    Result r = table.get(new Get(row)).get();
+    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
+    assertNull(r.getValue(FAMILY, q2));
+    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
+    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
+
+    // Do CheckAndRowMutations again
+    checkAndMutate = CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, q1)
+      .build(new RowMutations(row).add(Arrays.asList(
+        new Delete(row).addColumns(FAMILY, q1),
+        new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)),
+        new Increment(row).addColumn(FAMILY, q3, 1),
+        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
+      );
+
+    result = table.checkAndMutate(checkAndMutate).get();
+    assertFalse(result.isSuccess());
+    assertNull(result.getResult());
+
+    // Verify the value
+    r = table.get(new Get(row)).get();
+    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
+    assertNull(r.getValue(FAMILY, q2));
+    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
+    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
+  }
+
   // Tests for batch version of checkAndMutate
 
   @Test
@@ -1513,6 +1584,65 @@ public class TestAsyncTable {
   }
 
   @Test
+  public void testCheckAndRowMutationsBatch() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+    table.putAll(Arrays.asList(
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
+        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L))
+        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
+      new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))
+        .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L))
+        .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))
+    ).get();
+
+    // CheckAndIncrement with correct value
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
+      .build(new RowMutations(row).add(Arrays.asList(
+        new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
+        new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")),
+        new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L),
+        new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d"))
+      )));
+
+    // CheckAndIncrement with wrong value
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a"))
+      .build(new RowMutations(row2).add(Arrays.asList(
+        new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("e")),
+        new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")),
+        new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L),
+        new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), 
Bytes.toBytes("h"))
+      )));
+
+    List<CheckAndMutateResult> results =
+      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0).isSuccess());
+    assertEquals(2, Bytes.toLong(results.get(0).getResult()
+      .getValue(FAMILY, Bytes.toBytes("C"))));
+    assertEquals("dd", Bytes.toString(results.get(0).getResult()
+      .getValue(FAMILY, Bytes.toBytes("D"))));
+
+    assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
+
+    Result result = table.get(new Get(row)).get();
+    assertEquals("a", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+    assertNull(result.getValue(FAMILY, Bytes.toBytes("B")));
+    assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
+    assertEquals("dd", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+    result = table.get(new Get(row2)).get();
+    assertNull(result.getValue(FAMILY, Bytes.toBytes("E")));
+    assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G"))));
+    assertEquals("h", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("H"))));
+  }
+
+  @Test
   public void testDisabled() throws InterruptedException, ExecutionException {
     ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
     try {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index 20ec40e..3e322bb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -358,11 +358,17 @@ public class TestAsyncTableBatch {
 
     CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
       .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
-      .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("g")));
+      .build(new RowMutations(row1)
+        .add((Mutation) new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("g")))
+        .add((Mutation) new Delete(row1).addColumns(FAMILY, 
Bytes.toBytes("A")))
+        .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
+        .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d"))));
     Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
     RowMutations mutations = new RowMutations(row3)
       .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
-      .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("f")));
+      .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("f")))
+      .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
+      .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b")));
     CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
       .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
       .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("h")));
@@ -378,16 +384,28 @@ public class TestAsyncTableBatch {
       checkAndMutate3, checkAndMutate4);
     List<Object> results = table.batchAll(actions).get();
 
-    assertTrue(((CheckAndMutateResult) results.get(0)).isSuccess());
-    assertNull(((CheckAndMutateResult) results.get(0)).getResult());
+    CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) 
results.get(0);
+    assertTrue(checkAndMutateResult.isSuccess());
+    assertEquals(3L,
+      Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, 
Bytes.toBytes("C"))));
+    assertEquals("d",
+      Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, 
Bytes.toBytes("D"))));
+
     assertEquals("b",
       Bytes.toString(((Result) results.get(1)).getValue(FAMILY, 
Bytes.toBytes("B"))));
-    assertTrue(((Result) results.get(2)).getExists());
-    assertFalse(((CheckAndMutateResult) results.get(3)).isSuccess());
-    assertNull(((CheckAndMutateResult) results.get(3)).getResult());
+
+    Result result = (Result) results.get(2);
+    assertTrue(result.getExists());
+    assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+    checkAndMutateResult = (CheckAndMutateResult) results.get(3);
+    assertFalse(checkAndMutateResult.isSuccess());
+    assertNull(checkAndMutateResult.getResult());
+
     assertTrue(((Result) results.get(4)).isEmpty());
 
-    CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) 
results.get(5);
+    checkAndMutateResult = (CheckAndMutateResult) results.get(5);
     assertTrue(checkAndMutateResult.isSuccess());
     assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
       .getValue(FAMILY, Bytes.toBytes("F"))));
@@ -397,12 +415,18 @@ public class TestAsyncTableBatch {
     assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
       .getValue(FAMILY, Bytes.toBytes("G"))));
 
-    Result result = table.get(new Get(row1)).get();
-    assertEquals("g", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+    result = table.get(new Get(row1)).get();
+    assertEquals("g", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+    assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
+    assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
 
     result = table.get(new Get(row3)).get();
     assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
     assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+    assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
+    assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
 
     result = table.get(new Get(row4)).get();
     assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 82a57f2..10b358f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -135,6 +136,19 @@ public class TestAsyncTableNoncedRetry {
   }
 
   @Test
+  public void testAppendWhenReturnResultsEqualsFalse() throws 
InterruptedException,
+    ExecutionException {
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
+    Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, 
VALUE)
+      .setReturnResults(false)).get();
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isEmpty());
+  }
+
+  @Test
   public void testIncrement() throws InterruptedException, ExecutionException {
     assertEquals(0, CALLED.get());
     AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
@@ -143,4 +157,17 @@ public class TestAsyncTableNoncedRetry {
     // make sure we called twice and the result is still correct
     assertEquals(2, CALLED.get());
   }
+
+  @Test
+  public void testIncrementWhenReturnResultsEqualsFalse() throws 
InterruptedException,
+    ExecutionException {
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
+    Result result = table.increment(new Increment(row).addColumn(FAMILY, 
QUALIFIER, 1L)
+      .setReturnResults(false)).get();
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isEmpty());
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
index a4b79c8..51918d4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
@@ -704,6 +704,66 @@ public class TestCheckAndMutate {
     }
   }
 
+  @Test
+  public void testCheckAndRowMutations() throws Throwable {
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] q3 = Bytes.toBytes("q3");
+    final byte[] q4 = Bytes.toBytes("q4");
+    final String v1 = "v1";
+
+    try (Table table = createTable()) {
+      // Initial values
+      table.put(Arrays.asList(
+        new Put(ROWKEY).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")),
+        new Put(ROWKEY).addColumn(FAMILY, q3, Bytes.toBytes(5L)),
+        new Put(ROWKEY).addColumn(FAMILY, q4, Bytes.toBytes("a"))));
+
+      // Do CheckAndRowMutations
+      CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(ROWKEY)
+        .ifNotExists(FAMILY, q1)
+        .build(new RowMutations(ROWKEY).add(Arrays.asList(
+          new Put(ROWKEY).addColumn(FAMILY, q1, Bytes.toBytes(v1)),
+          new Delete(ROWKEY).addColumns(FAMILY, q2),
+          new Increment(ROWKEY).addColumn(FAMILY, q3, 1),
+          new Append(ROWKEY).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
+        );
+
+      CheckAndMutateResult result = table.checkAndMutate(checkAndMutate);
+      assertTrue(result.isSuccess());
+      assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3)));
+      assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, 
q4)));
+
+      // Verify the value
+      Result r = table.get(new Get(ROWKEY));
+      assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
+      assertNull(r.getValue(FAMILY, q2));
+      assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
+      assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
+
+      // Do CheckAndRowMutations again
+      checkAndMutate = CheckAndMutate.newBuilder(ROWKEY)
+        .ifNotExists(FAMILY, q1)
+        .build(new RowMutations(ROWKEY).add(Arrays.asList(
+          new Delete(ROWKEY).addColumns(FAMILY, q1),
+          new Put(ROWKEY).addColumn(FAMILY, q2, Bytes.toBytes(v1)),
+          new Increment(ROWKEY).addColumn(FAMILY, q3, 1),
+          new Append(ROWKEY).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
+        );
+
+      result = table.checkAndMutate(checkAndMutate);
+      assertFalse(result.isSuccess());
+      assertNull(result.getResult());
+
+      // Verify the value
+      r = table.get(new Get(ROWKEY));
+      assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
+      assertNull(r.getValue(FAMILY, q2));
+      assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
+      assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
+    }
+  }
+
   // Tests for batch version of checkAndMutate
 
   @Test
@@ -1100,4 +1160,62 @@ public class TestCheckAndMutate {
       assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
     }
   }
+
+  @Test
+  public void testCheckAndRowMutationsBatch() throws Throwable {
+    try (Table table = createTable()) {
+      table.put(Arrays.asList(
+        new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b"))
+          .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L))
+          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
+        new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("f"))
+          .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L))
+          .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))
+      );
+
+      // CheckAndIncrement with correct value
+      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
+        .build(new RowMutations(ROWKEY).add(Arrays.asList(
+          new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("a")),
+          new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("B")),
+          new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), 1L),
+          new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d"))
+        )));
+
+      // CheckAndIncrement with wrong value
+      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+        .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a"))
+        .build(new RowMutations(ROWKEY2).add(Arrays.asList(
+          new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("e")),
+          new Delete(ROWKEY2).addColumns(FAMILY, Bytes.toBytes("F")),
+          new Increment(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("G"), 1L),
+          new Append(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("H"), 
Bytes.toBytes("h"))
+        )));
+
+      List<CheckAndMutateResult> results =
+        table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+      assertTrue(results.get(0).isSuccess());
+      assertEquals(2, Bytes.toLong(results.get(0).getResult()
+        .getValue(FAMILY, Bytes.toBytes("C"))));
+      assertEquals("dd", Bytes.toString(results.get(0).getResult()
+        .getValue(FAMILY, Bytes.toBytes("D"))));
+
+      assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
+
+      Result result = table.get(new Get(ROWKEY));
+      assertEquals("a", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+      assertNull(result.getValue(FAMILY, Bytes.toBytes("B")));
+      assertEquals(2, Bytes.toLong(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+      assertEquals("dd", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+      result = table.get(new Get(ROWKEY2));
+      assertNull(result.getValue(FAMILY, Bytes.toBytes("E")));
+      assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+      assertEquals(1, Bytes.toLong(result.getValue(FAMILY, 
Bytes.toBytes("G"))));
+      assertEquals("h", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("H"))));
+    }
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 3397b8c..ed7d72d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -506,11 +506,17 @@ public class TestFromClientSide3 {
 
       CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
         .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
-        .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("g")));
+        .build(new RowMutations(row1)
+          .add((Mutation) new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("g")))
+          .add((Mutation) new Delete(row1).addColumns(FAMILY, 
Bytes.toBytes("A")))
+          .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
+          .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d"))));
       Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
       RowMutations mutations = new RowMutations(row3)
         .add((Mutation) new Delete(row3).addColumns(FAMILY, 
Bytes.toBytes("C")))
-        .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("f")));
+        .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("f")))
+        .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
+        .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b")));
       CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
         .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
         .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("h")));
@@ -527,16 +533,28 @@ public class TestFromClientSide3 {
       Object[] results = new Object[actions.size()];
       table.batch(actions, results);
 
-      assertTrue(((CheckAndMutateResult) results[0]).isSuccess());
-      assertNull(((CheckAndMutateResult) results[0]).getResult());
+      CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) 
results[0];
+      assertTrue(checkAndMutateResult.isSuccess());
+      assertEquals(3L,
+        Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, 
Bytes.toBytes("C"))));
+      assertEquals("d",
+        Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, 
Bytes.toBytes("D"))));
+
       assertEquals("b",
         Bytes.toString(((Result) results[1]).getValue(FAMILY, 
Bytes.toBytes("B"))));
-      assertTrue(((Result) results[2]).getExists());
-      assertFalse(((CheckAndMutateResult) results[3]).isSuccess());
-      assertNull(((CheckAndMutateResult) results[3]).getResult());
+
+      Result result = (Result) results[2];
+      assertTrue(result.getExists());
+      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+      checkAndMutateResult = (CheckAndMutateResult) results[3];
+      assertFalse(checkAndMutateResult.isSuccess());
+      assertNull(checkAndMutateResult.getResult());
+
       assertTrue(((Result) results[4]).isEmpty());
 
-      CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) 
results[5];
+      checkAndMutateResult = (CheckAndMutateResult) results[5];
       assertTrue(checkAndMutateResult.isSuccess());
       assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
         .getValue(FAMILY, Bytes.toBytes("F"))));
@@ -546,12 +564,18 @@ public class TestFromClientSide3 {
       assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
         .getValue(FAMILY, Bytes.toBytes("G"))));
 
-      Result result = table.get(new Get(row1));
-      assertEquals("g", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+      result = table.get(new Get(row1));
+      assertEquals("g", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+      assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
+      assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
 
       result = table.get(new Get(row3));
       assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
       assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+      assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
+      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
 
       result = table.get(new Get(row4));
       assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java
index 0ae15ca..a23234a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java
@@ -293,21 +293,27 @@ public class TestFromClientSide5 extends 
FromClientSideBase {
   }
 
   @Test
-  public void testRowMutation() throws Exception {
-    LOG.info("Starting testRowMutation");
+  public void testRowMutations() throws Exception {
+    LOG.info("Starting testRowMutations");
     final TableName tableName = name.getTableName();
     try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
-      byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), 
Bytes.toBytes("b") };
+      byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), 
Bytes.toBytes("b"),
+        Bytes.toBytes("c"), Bytes.toBytes("d") };
+
+      // Test for Put operations
       RowMutations arm = new RowMutations(ROW);
       Put p = new Put(ROW);
       p.addColumn(FAMILY, QUALIFIERS[0], VALUE);
       arm.add(p);
-      t.mutateRow(arm);
+      Result r = t.mutateRow(arm);
+      assertTrue(r.getExists());
+      assertTrue(r.isEmpty());
 
       Get g = new Get(ROW);
-      Result r = t.get(g);
+      r = t.get(g);
       assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, 
QUALIFIERS[0])));
 
+      // Test for Put and Delete operations
       arm = new RowMutations(ROW);
       p = new Put(ROW);
       p.addColumn(FAMILY, QUALIFIERS[1], VALUE);
@@ -316,11 +322,34 @@ public class TestFromClientSide5 extends 
FromClientSideBase {
       d.addColumns(FAMILY, QUALIFIERS[0]);
       arm.add(d);
       // TODO: Trying mutateRow again. The batch was failing with a one try 
only.
-      t.mutateRow(arm);
+      r = t.mutateRow(arm);
+      assertTrue(r.getExists());
+      assertTrue(r.isEmpty());
+
       r = t.get(g);
       assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, 
QUALIFIERS[1])));
       assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
 
+      // Test for Increment and Append operations
+      arm = new RowMutations(ROW);
+      arm.add(Arrays.asList(
+        new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE),
+        new Delete(ROW).addColumns(FAMILY, QUALIFIERS[1]),
+        new Increment(ROW).addColumn(FAMILY, QUALIFIERS[2], 5L),
+        new Append(ROW).addColumn(FAMILY, QUALIFIERS[3], Bytes.toBytes("abc"))
+      ));
+      r = t.mutateRow(arm);
+      assertTrue(r.getExists());
+      assertEquals(5L, Bytes.toLong(r.getValue(FAMILY, QUALIFIERS[2])));
+      assertEquals("abc", Bytes.toString(r.getValue(FAMILY, QUALIFIERS[3])));
+
+      g = new Get(ROW);
+      r = t.get(g);
+      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, 
QUALIFIERS[0])));
+      assertNull(r.getValue(FAMILY, QUALIFIERS[1]));
+      assertEquals(5L, Bytes.toLong(r.getValue(FAMILY, QUALIFIERS[2])));
+      assertEquals("abc", Bytes.toString(r.getValue(FAMILY, QUALIFIERS[3])));
+
       // Test that we get a region level exception
       try {
         arm = new RowMutations(ROW);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index 0727385..e432acd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -777,14 +777,26 @@ public class SimpleRegionObserver implements 
RegionCoprocessor, RegionObserver {
     return ctPreBatchMutate.get() > 0;
   }
 
+  public int getPreBatchMutate() {
+    return ctPreBatchMutate.get();
+  }
+
   public boolean hadPostBatchMutate() {
     return ctPostBatchMutate.get() > 0;
   }
 
+  public int getPostBatchMutate() {
+    return ctPostBatchMutate.get();
+  }
+
   public boolean hadPostBatchMutateIndispensably() {
     return ctPostBatchMutateIndispensably.get() > 0;
   }
 
+  public int getPostBatchMutateIndispensably() {
+    return ctPostBatchMutateIndispensably.get();
+  }
+
   public boolean hadPostStartRegionOperation() {
     return ctPostStartRegionOperation.get() > 0;
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index a3502bf..0b861ce 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -243,14 +243,16 @@ public class TestRegionObserverInterface {
       inc.addColumn(A, A, 1);
 
       verifyMethodResult(SimpleRegionObserver.class,
-        new String[] { "hadPreIncrement", "hadPostIncrement", 
"hadPreIncrementAfterRowLock" },
-        tableName, new Boolean[] { false, false, false });
+        new String[] { "hadPreIncrement", "hadPostIncrement", 
"hadPreIncrementAfterRowLock",
+          "hadPreBatchMutate", "hadPostBatchMutate", 
"hadPostBatchMutateIndispensably" },
+        tableName, new Boolean[] { false, false, false, false, false, false });
 
       table.increment(inc);
 
       verifyMethodResult(SimpleRegionObserver.class,
-        new String[] { "hadPreIncrement", "hadPostIncrement", 
"hadPreIncrementAfterRowLock" },
-        tableName, new Boolean[] { true, true, true });
+        new String[] { "hadPreIncrement", "hadPostIncrement", 
"hadPreIncrementAfterRowLock",
+          "hadPreBatchMutate", "hadPostBatchMutate", 
"hadPostBatchMutateIndispensably" },
+        tableName, new Boolean[] { true, true, true, true, true, true });
     } finally {
       util.deleteTable(tableName);
       table.close();
@@ -339,7 +341,75 @@ public class TestRegionObserverInterface {
   }
 
   @Test
-  public void testCheckAndMutateWithRowMutationsHooks() throws Exception {
+  public void testCheckAndIncrementHooks() throws Exception {
+    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() 
+ "." +
+      name.getMethodName());
+    Table table = util.createTable(tableName, new byte[][] { A, B, C });
+    try {
+      byte[] row = Bytes.toBytes(0);
+
+      verifyMethodResult(
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 0, 0, 0 });
+
+      table.checkAndMutate(CheckAndMutate.newBuilder(row)
+        .ifNotExists(A, A)
+        .build(new Increment(row).addColumn(A, A, 1)));
+      verifyMethodResult(
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 1, 1, 1 });
+
+      table.checkAndMutate(CheckAndMutate.newBuilder(row)
+        .ifEquals(A, A, Bytes.toBytes(1L))
+        .build(new Increment(row).addColumn(A, A, 1)));
+      verifyMethodResult(
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 2, 2, 2 });
+    } finally {
+      util.deleteTable(tableName);
+      table.close();
+    }
+  }
+
+  @Test
+  public void testCheckAndAppendHooks() throws Exception {
+    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() 
+ "." +
+      name.getMethodName());
+    Table table = util.createTable(tableName, new byte[][] { A, B, C });
+    try {
+      byte[] row = Bytes.toBytes(0);
+
+      verifyMethodResult(
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 0, 0, 0 });
+
+      table.checkAndMutate(CheckAndMutate.newBuilder(row)
+        .ifNotExists(A, A)
+        .build(new Append(row).addColumn(A, A, A)));
+      verifyMethodResult(
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 1, 1, 1 });
+
+      table.checkAndMutate(CheckAndMutate.newBuilder(row)
+        .ifEquals(A, A, A)
+        .build(new Append(row).addColumn(A, A, A)));
+      verifyMethodResult(
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 2, 2, 2 });
+    } finally {
+      util.deleteTable(tableName);
+      table.close();
+    }
+  }
+
+  @Test
+  public void testCheckAndRowMutationsHooks() throws Exception {
     final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() 
+ "." +
       name.getMethodName());
     Table table = util.createTable(tableName, new byte[][] { A, B, C });
@@ -388,14 +458,18 @@ public class TestRegionObserverInterface {
       app.addColumn(A, A, A);
 
       verifyMethodResult(SimpleRegionObserver.class,
-        new String[] { "hadPreAppend", "hadPostAppend", 
"hadPreAppendAfterRowLock" }, tableName,
-        new Boolean[] { false, false, false });
+        new String[] { "hadPreAppend", "hadPostAppend", 
"hadPreAppendAfterRowLock",
+          "hadPreBatchMutate", "hadPostBatchMutate", 
"hadPostBatchMutateIndispensably" },
+        tableName,
+        new Boolean[] { false, false, false, false, false, false });
 
       table.append(app);
 
       verifyMethodResult(SimpleRegionObserver.class,
-        new String[] { "hadPreAppend", "hadPostAppend", 
"hadPreAppendAfterRowLock" }, tableName,
-        new Boolean[] { true, true, true });
+        new String[] { "hadPreAppend", "hadPostAppend", 
"hadPreAppendAfterRowLock",
+          "hadPreBatchMutate", "hadPostBatchMutate", 
"hadPostBatchMutateIndispensably" },
+        tableName,
+        new Boolean[] { true, true, true, true, true, true });
     } finally {
       util.deleteTable(tableName);
       table.close();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
index 5a1315d..cec98cb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
@@ -280,7 +280,7 @@ public class RegionAsTable implements Table {
   }
 
   @Override
-  public void mutateRow(RowMutations rm) throws IOException {
+  public Result mutateRow(RowMutations rm) throws IOException {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index abf716f..64dfed95 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -61,6 +61,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -2390,6 +2391,7 @@ public class TestHRegion {
     CheckAndMutateResult res = 
region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     // Putting data in key
     put = new Put(row1);
@@ -2399,17 +2401,20 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     // not empty anymore
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     Delete delete = new Delete(row1);
     delete.addColumn(fam1, qf1);
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     put = new Put(row1);
     put.addColumn(fam1, qf1, val2);
@@ -2417,6 +2422,7 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     // checkAndDelete with correct value
     delete = new Delete(row1);
@@ -2425,11 +2431,13 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     delete = new Delete(row1);
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     // checkAndPut looking for a null value
     put = new Put(row1);
@@ -2438,6 +2446,7 @@ public class TestHRegion {
     res = 
region.checkAndMutate(CheckAndMutate.newBuilder(row1).ifNotExists(fam1, qf1)
       .build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
   }
 
   @Test
@@ -2461,6 +2470,7 @@ public class TestHRegion {
     CheckAndMutateResult res = 
region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     // checkAndDelete with wrong value
     Delete delete = new Delete(row1);
@@ -2468,6 +2478,7 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     // Putting data in key
     put = new Put(row1);
@@ -2478,6 +2489,7 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, 
Bytes.toBytes(bd2)).build(put));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     // checkAndDelete with wrong value
     delete = new Delete(row1);
@@ -2485,6 +2497,7 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, 
Bytes.toBytes(bd2)).build(delete));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
   }
 
   @Test
@@ -2514,6 +2527,7 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
     assertTrue("Delete", res.isSuccess());
+    assertNull(res.getResult());
 
     // Putting data in key
     put = new Put(row1);
@@ -2524,6 +2538,7 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
         .ifMatches(fam1, qf1, CompareOperator.EQUAL, 
Bytes.toBytes(bd1)).build(put));
     assertTrue("Second put", res.isSuccess());
+    assertNull(res.getResult());
 
     // checkAndDelete with correct value
     delete = new Delete(row1, now + 3);
@@ -2531,6 +2546,7 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
         .ifMatches(fam1, qf1, CompareOperator.EQUAL, 
Bytes.toBytes(bd1)).build(delete));
     assertTrue("Second delete", res.isSuccess());
+    assertNull(res.getResult());
   }
 
   @Test
@@ -2554,11 +2570,13 @@ public class TestHRegion {
     CheckAndMutateResult res = 
region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.LESS, val3).build(put));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.LESS: original = val3, compare with val4, fail
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.LESS, val4).build(put));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.LESS: original = val3, compare with val2,
     // succeed (now value = val2)
@@ -2567,17 +2585,20 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.LESS, val2).build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val3).build(put));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
     // succeed (value still = val2)
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val2).build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
     // succeed (now value = val3)
@@ -2586,16 +2607,19 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val1).build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.GREATER: original = val3, compare with val3, fail
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.GREATER, val3).build(put));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.GREATER: original = val3, compare with val2, fail
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.GREATER, val2).build(put));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.GREATER: original = val3, compare with val4,
     // succeed (now value = val2)
@@ -2604,22 +2628,26 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.GREATER, val4).build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, 
fail
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 
val1).build(put));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
     // succeed (value still = val2)
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 
val2).build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, 
succeed
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 
val3).build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
   }
 
   @Test
@@ -2650,6 +2678,7 @@ public class TestHRegion {
     CheckAndMutateResult res = 
region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     Get get = new Get(row1);
     get.addColumn(fam2, qf1);
@@ -2704,6 +2733,7 @@ public class TestHRegion {
     CheckAndMutateResult res = 
region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     Get get = new Get(row1);
     get.addColumn(fam1, qf1);
@@ -2720,6 +2750,7 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam2, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     get = new Get(row1);
     r = region.get(get);
@@ -2731,6 +2762,8 @@ public class TestHRegion {
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
       .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
+
     get = new Get(row1);
     r = region.get(get);
     assertEquals(0, r.size());
@@ -2759,6 +2792,7 @@ public class TestHRegion {
           Bytes.toBytes("b"))))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d"))));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     Result result = region.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("D")));
     assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
@@ -2772,6 +2806,7 @@ public class TestHRegion {
           Bytes.toBytes("c"))))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("e"))));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     assertTrue(region.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("E"))).isEmpty());
 
@@ -2784,6 +2819,7 @@ public class TestHRegion {
           Bytes.toBytes("b"))))
       .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     assertTrue(region.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("D"))).isEmpty());
 
@@ -2799,6 +2835,7 @@ public class TestHRegion {
           .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
         .add((Mutation) new Delete(row).addColumns(FAMILY, 
Bytes.toBytes("A")))));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
     assertEquals("e", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("E"))));
@@ -2823,6 +2860,7 @@ public class TestHRegion {
       .timeRange(TimeRange.between(0, 101))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b"))));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     Result result = region.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("B")));
     assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
@@ -2834,10 +2872,11 @@ public class TestHRegion {
       .timeRange(TimeRange.between(0, 100))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("c"))));
     assertFalse(res.isSuccess());
+    assertNull(res.getResult());
 
     assertTrue(region.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("C"))).isEmpty());
 
-    // Mutate with success
+    // RowMutations with success
     res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
       .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
         Bytes.toBytes("a")))
@@ -2847,6 +2886,7 @@ public class TestHRegion {
           .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
         .add((Mutation) new Delete(row).addColumns(FAMILY, 
Bytes.toBytes("A")))));
     assertTrue(res.isSuccess());
+    assertNull(res.getResult());
 
     result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
     assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
@@ -2975,6 +3015,126 @@ public class TestHRegion {
     assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
   }
 
+  @Test
+  public void testCheckAndIncrementAndAppend() throws Throwable {
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, fam1);
+
+    // CheckAndMutate with Increment and Append
+    CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+      .ifNotExists(fam1, qual)
+      .build(new RowMutations(row)
+        .add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L))
+        .add((Mutation) new Append(row).addColumn(fam1, qual2, 
Bytes.toBytes("a")))
+      );
+
+    CheckAndMutateResult result = region.checkAndMutate(checkAndMutate);
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(fam1, qual1)));
+    assertEquals("a", Bytes.toString(result.getResult().getValue(fam1, 
qual2)));
+
+    Result r = region.get(new Get(row));
+    assertEquals(1L, Bytes.toLong(r.getValue(fam1, qual1)));
+    assertEquals("a", Bytes.toString(r.getValue(fam1, qual2)));
+
+    // Set return results to false
+    checkAndMutate = CheckAndMutate.newBuilder(row)
+      .ifNotExists(fam1, qual)
+      .build(new RowMutations(row)
+        .add((Mutation) new Increment(row).addColumn(fam1, qual1, 
1L).setReturnResults(false))
+        .add((Mutation) new Append(row).addColumn(fam1, qual2, 
Bytes.toBytes("a"))
+          .setReturnResults(false))
+      );
+
+    result = region.checkAndMutate(checkAndMutate);
+    assertTrue(result.isSuccess());
+    assertNull(result.getResult().getValue(fam1, qual1));
+    assertNull(result.getResult().getValue(fam1, qual2));
+
+    r = region.get(new Get(row));
+    assertEquals(2L, Bytes.toLong(r.getValue(fam1, qual1)));
+    assertEquals("aa", Bytes.toString(r.getValue(fam1, qual2)));
+
+    checkAndMutate = CheckAndMutate.newBuilder(row)
+      .ifNotExists(fam1, qual)
+      .build(new RowMutations(row)
+        .add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L))
+        .add((Mutation) new Append(row).addColumn(fam1, qual2, 
Bytes.toBytes("a"))
+          .setReturnResults(false))
+      );
+
+    result = region.checkAndMutate(checkAndMutate);
+    assertTrue(result.isSuccess());
+    assertEquals(3L, Bytes.toLong(result.getResult().getValue(fam1, qual1)));
+    assertNull(result.getResult().getValue(fam1, qual2));
+
+    r = region.get(new Get(row));
+    assertEquals(3L, Bytes.toLong(r.getValue(fam1, qual1)));
+    assertEquals("aaa", Bytes.toString(r.getValue(fam1, qual2)));
+  }
+
+  @Test
+  public void testCheckAndRowMutations() throws Throwable {
+    final byte[] row = Bytes.toBytes("row");
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] q3 = Bytes.toBytes("q3");
+    final byte[] q4 = Bytes.toBytes("q4");
+    final String v1 = "v1";
+
+    region = initHRegion(tableName, method, CONF, fam1);
+
+    // Initial values
+    region.batchMutate(new Mutation[] {
+      new Put(row).addColumn(fam1, q2, Bytes.toBytes("toBeDeleted")),
+      new Put(row).addColumn(fam1, q3, Bytes.toBytes(5L)),
+      new Put(row).addColumn(fam1, q4, Bytes.toBytes("a")),
+    });
+
+    // Do CheckAndRowMutations
+    CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+      .ifNotExists(fam1, q1)
+      .build(new RowMutations(row).add(Arrays.asList(
+        new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)),
+        new Delete(row).addColumns(fam1, q2),
+        new Increment(row).addColumn(fam1, q3, 1),
+        new Append(row).addColumn(fam1, q4, Bytes.toBytes("b"))))
+      );
+
+    CheckAndMutateResult result = region.checkAndMutate(checkAndMutate);
+    assertTrue(result.isSuccess());
+    assertEquals(6L, Bytes.toLong(result.getResult().getValue(fam1, q3)));
+    assertEquals("ab", Bytes.toString(result.getResult().getValue(fam1, q4)));
+
+    // Verify the value
+    Result r = region.get(new Get(row));
+    assertEquals(v1, Bytes.toString(r.getValue(fam1, q1)));
+    assertNull(r.getValue(fam1, q2));
+    assertEquals(6L, Bytes.toLong(r.getValue(fam1, q3)));
+    assertEquals("ab", Bytes.toString(r.getValue(fam1, q4)));
+
+    // Do CheckAndRowMutations again
+    checkAndMutate = CheckAndMutate.newBuilder(row)
+      .ifNotExists(fam1, q1)
+      .build(new RowMutations(row).add(Arrays.asList(
+        new Delete(row).addColumns(fam1, q1),
+        new Put(row).addColumn(fam1, q2, Bytes.toBytes(v1)),
+        new Increment(row).addColumn(fam1, q3, 1),
+        new Append(row).addColumn(fam1, q4, Bytes.toBytes("b"))))
+      );
+
+    result = region.checkAndMutate(checkAndMutate);
+    assertFalse(result.isSuccess());
+    assertNull(result.getResult());
+
+    // Verify the value
+    r = region.get(new Get(row));
+    assertEquals(v1, Bytes.toString(r.getValue(fam1, q1)));
+    assertNull(r.getValue(fam1, q2));
+    assertEquals(6L, Bytes.toLong(r.getValue(fam1, q3)));
+    assertEquals("ab", Bytes.toString(r.getValue(fam1, q4)));
+  }
+
   // 
////////////////////////////////////////////////////////////////////////////
   // Delete tests
   // 
////////////////////////////////////////////////////////////////////////////
@@ -7233,6 +7393,149 @@ public class TestHRegion {
   }
 
   @Test
+  public void testMutateRow() throws Exception {
+    final byte[] row = Bytes.toBytes("row");
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] q3 = Bytes.toBytes("q3");
+    final byte[] q4 = Bytes.toBytes("q4");
+    final String v1 = "v1";
+
+    region = initHRegion(tableName, method, CONF, fam1);
+
+    // Initial values
+    region.batchMutate(new Mutation[] {
+      new Put(row).addColumn(fam1, q2, Bytes.toBytes("toBeDeleted")),
+      new Put(row).addColumn(fam1, q3, Bytes.toBytes(5L)),
+      new Put(row).addColumn(fam1, q4, Bytes.toBytes("a")),
+    });
+
+    // Do mutateRow
+    Result result = region.mutateRow(new RowMutations(row).add(Arrays.asList(
+      new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)),
+      new Delete(row).addColumns(fam1, q2),
+      new Increment(row).addColumn(fam1, q3, 1),
+      new Append(row).addColumn(fam1, q4, Bytes.toBytes("b")))));
+
+    assertNotNull(result);
+    assertEquals(6L, Bytes.toLong(result.getValue(fam1, q3)));
+    assertEquals("ab", Bytes.toString(result.getValue(fam1, q4)));
+
+    // Verify the value
+    result = region.get(new Get(row));
+    assertEquals(v1, Bytes.toString(result.getValue(fam1, q1)));
+    assertNull(result.getValue(fam1, q2));
+    assertEquals(6L, Bytes.toLong(result.getValue(fam1, q3)));
+    assertEquals("ab", Bytes.toString(result.getValue(fam1, q4)));
+  }
+
+  @Test
+  public void testMutateRowInParallel() throws Exception {
+    final int numReaderThreads = 100;
+    final CountDownLatch latch = new CountDownLatch(numReaderThreads);
+
+    final byte[] row = Bytes.toBytes("row");
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] q3 = Bytes.toBytes("q3");
+    final byte[] q4 = Bytes.toBytes("q4");
+    final String v1 = "v1";
+    final String v2 = "v2";
+
+    // We need to ensure the timestamp of the delete operation is more than 
the previous one
+    final AtomicLong deleteTimestamp = new AtomicLong();
+
+    region = initHRegion(tableName, method, CONF, fam1);
+
+    // Initial values
+    region.batchMutate(new Mutation[] {
+      new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1))
+        .addColumn(fam1, q2, deleteTimestamp.getAndIncrement(), 
Bytes.toBytes(v2))
+        .addColumn(fam1, q3, Bytes.toBytes(1L))
+        .addColumn(fam1, q4, Bytes.toBytes("a"))
+    });
+
+    final AtomicReference<AssertionError> assertionError = new 
AtomicReference<>();
+
+    // Writer thread
+    Thread writerThread = new Thread(() -> {
+      try {
+        while (true) {
+          // If all the reader threads finish, then stop the writer thread
+          if (latch.await(0, TimeUnit.MILLISECONDS)) {
+            return;
+          }
+
+          // Execute the mutations. This should be done atomically
+          region.mutateRow(new RowMutations(row).add(Arrays.asList(
+            new Put(row).addColumn(fam1, q1, Bytes.toBytes(v2)),
+            new Delete(row).addColumns(fam1, q2, 
deleteTimestamp.getAndIncrement()),
+            new Increment(row).addColumn(fam1, q3, 1L),
+            new Append(row).addColumn(fam1, q4, Bytes.toBytes("b")))));
+
+          // We need to ensure the timestamps of the Increment/Append 
operations are more than the
+          // previous ones
+          Result result = region.get(new Get(row).addColumn(fam1, 
q3).addColumn(fam1, q4));
+          long tsIncrement = result.getColumnLatestCell(fam1, 
q3).getTimestamp();
+          long tsAppend = result.getColumnLatestCell(fam1, q4).getTimestamp();
+
+          // Put the initial values
+          region.batchMutate(new Mutation[] {
+            new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1))
+              .addColumn(fam1, q2, deleteTimestamp.getAndIncrement(), 
Bytes.toBytes(v2))
+              .addColumn(fam1, q3, tsIncrement + 1, Bytes.toBytes(1L))
+              .addColumn(fam1, q4, tsAppend + 1, Bytes.toBytes("a"))
+          });
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+        assertionError.set(new AssertionError(e.getMessage()));
+      }
+    });
+    writerThread.start();
+
+    // Reader threads
+    for (int i = 0; i < numReaderThreads; i++) {
+      new Thread(() -> {
+        try {
+          for (int j = 0; j < 10000; j++) {
+            // Verify the values
+            Result result = region.get(new Get(row));
+
+            // The values should be equals to either the initial values or the 
values after
+            // executing the mutations
+            String q1Value = Bytes.toString(result.getValue(fam1, q1));
+            if (v1.equals(q1Value)) {
+              assertEquals(v2, Bytes.toString(result.getValue(fam1, q2)));
+              assertEquals(1L, Bytes.toLong(result.getValue(fam1, q3)));
+              assertEquals("a", Bytes.toString(result.getValue(fam1, q4)));
+            } else if (v2.equals(q1Value)) {
+              assertNull(Bytes.toString(result.getValue(fam1, q2)));
+              assertEquals(2L, Bytes.toLong(result.getValue(fam1, q3)));
+              assertEquals("ab", Bytes.toString(result.getValue(fam1, q4)));
+            } else {
+              fail("the qualifier " + q1 + " should be " + v1 + " or " + v2 + 
", but " + q1Value);
+            }
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+          assertionError.set(new AssertionError(e.getMessage()));
+        } catch (AssertionError e) {
+          assertionError.set(e);
+        }
+
+        latch.countDown();
+      }).start();
+    }
+
+    writerThread.join();
+
+    if (assertionError.get() != null) {
+      throw assertionError.get();
+    }
+  }
+
+  @Test
   public void testMutateRow_WriteRequestCount() throws Exception {
     byte[] row1 = Bytes.toBytes("row1");
     byte[] fam1 = Bytes.toBytes("fam1");
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
index a3a84fe..cc6c2ae 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
@@ -415,8 +415,8 @@ public class TestWALEntrySinkFilter {
             }
 
             @Override
-            public void mutateRow(RowMutations rm) throws IOException {
-
+            public Result mutateRow(RowMutations rm) throws IOException {
+              return null;
             }
 
             @Override
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
index 9469f26..1f7dcc2 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
@@ -447,10 +447,11 @@ public class ThriftTable implements Table {
   }
 
   @Override
-  public void mutateRow(RowMutations rm) throws IOException {
+  public Result mutateRow(RowMutations rm) throws IOException {
     TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
     try {
       client.mutateRow(tableNameInBytes, tRowMutations);
+      return Result.EMPTY_RESULT;
     }  catch (TException e) {
       throw new IOException(e);
     }

Reply via email to