This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new efd5a96  HBASE-8458 Support for batch version of checkAndMutate()
efd5a96 is described below

commit efd5a962e5a6aa07dcba4b55f8b165ea3dbbd6ef
Author: Toshihiro Suzuki <brfrn...@gmail.com>
AuthorDate: Mon May 4 16:53:41 2020 +0900

    HBASE-8458 Support for batch version of checkAndMutate()
    
    Closes #1648
    
    Signed-off-by: Josh Elser <els...@apache.org>
---
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |  16 +-
 .../org/apache/hadoop/hbase/client/AsyncTable.java |  47 ++
 .../apache/hadoop/hbase/client/AsyncTableImpl.java |  11 +
 .../apache/hadoop/hbase/client/CheckAndMutate.java | 362 ++++++++++
 .../org/apache/hadoop/hbase/client/Mutation.java   |  20 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  62 +-
 .../java/org/apache/hadoop/hbase/client/Table.java |  41 ++
 .../hadoop/hbase/client/TableOverAsyncTable.java   |  10 +
 .../hbase/shaded/protobuf/RequestConverter.java    | 215 +++---
 .../hbase/shaded/protobuf/ResponseConverter.java   |  21 +-
 .../src/main/protobuf/client/Client.proto          |  11 +-
 .../hadoop/hbase/rest/client/RemoteHTable.java     |  11 +
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 262 ++++++--
 .../hadoop/hbase/client/DummyAsyncTable.java       |  10 +
 .../apache/hadoop/hbase/client/TestAsyncTable.java | 732 ++++++++++++++++++++-
 .../hadoop/hbase/client/TestAsyncTableBatch.java   |  54 ++
 .../hadoop/hbase/client/TestCheckAndMutate.java    | 574 +++++++++++++++-
 .../hadoop/hbase/client/TestFromClientSide3.java   |  54 ++
 .../hbase/client/TestMalformedCellFromClient.java  |   5 +-
 .../hadoop/hbase/thrift2/client/ThriftTable.java   |  11 +
 20 files changed, 2270 insertions(+), 259 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 464eff5..7e05b05 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -256,7 +256,7 @@ class AsyncBatchRpcRetryingCaller<T> {
   }
 
   private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> 
actionsByRegion,
-      List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) 
throws IOException {
+      List<CellScannable> cells, Map<Integer, Integer> indexMap) throws 
IOException {
     ClientProtos.MultiRequest.Builder multiRequestBuilder = 
ClientProtos.MultiRequest.newBuilder();
     ClientProtos.RegionAction.Builder regionActionBuilder = 
ClientProtos.RegionAction.newBuilder();
     ClientProtos.Action.Builder actionBuilder = 
ClientProtos.Action.newBuilder();
@@ -264,14 +264,14 @@ class AsyncBatchRpcRetryingCaller<T> {
     for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
       long nonceGroup = conn.getNonceGenerator().getNonceGroup();
       // multiRequestBuilder will be populated with region actions.
-      // rowMutationsIndexMap will be non-empty after the call if there is 
RowMutations in the
+      // indexMap will be non-empty after the call if there is 
RowMutations/CheckAndMutate in the
       // action list.
       RequestConverter.buildNoDataRegionActions(entry.getKey(),
         entry.getValue().actions.stream()
           .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), 
a2.getOriginalIndex()))
           .collect(Collectors.toList()),
-        cells, multiRequestBuilder, regionActionBuilder, actionBuilder, 
mutationBuilder, nonceGroup,
-        rowMutationsIndexMap);
+        cells, multiRequestBuilder, regionActionBuilder, actionBuilder, 
mutationBuilder,
+        nonceGroup, indexMap);
     }
     return multiRequestBuilder.build();
   }
@@ -367,10 +367,10 @@ class AsyncBatchRpcRetryingCaller<T> {
     List<CellScannable> cells = new ArrayList<>();
     // Map from a created RegionAction to the original index for a 
RowMutations within
     // the original list of actions. This will be used to process the results 
when there
-    // is RowMutations in the action list.
-    Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
+    // is RowMutations/CheckAndMutate in the action list.
+    Map<Integer, Integer> indexMap = new HashMap<>();
     try {
-      req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
+      req = buildReq(serverReq.actionsByRegion, cells, indexMap);
     } catch (IOException e) {
       onError(serverReq.actionsByRegion, tries, e, serverName);
       return;
@@ -387,7 +387,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       } else {
         try {
           onComplete(serverReq.actionsByRegion, tries, serverName, 
ResponseConverter.getResults(req,
-            rowMutationsIndexMap, resp, controller.cellScanner()));
+            indexMap, resp, controller.cellScanner()));
         } catch (Exception e) {
           onError(serverReq.actionsByRegion, tries, e, serverName);
           return;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index ce1c1dc..b2bb2f7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -231,12 +231,20 @@ public interface AsyncTable<C extends 
ScanResultConsumerBase> {
    *     });
    * </code>
    * </pre>
+   *
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use 
only, do not use it
+   *   any more.
    */
+  @Deprecated
   CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
 
   /**
    * A helper class for sending checkAndMutate request.
+   *
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use 
only, do not use it
+   *   any more.
    */
+  @Deprecated
   interface CheckAndMutateBuilder {
 
     /**
@@ -309,12 +317,20 @@ public interface AsyncTable<C extends 
ScanResultConsumerBase> {
    *     });
    * </code>
    * </pre>
+   *
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use 
only, do not use it
+   *   any more.
    */
+  @Deprecated
   CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
 
   /**
    * A helper class for sending checkAndMutate request with a filter.
+   *
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use 
only, do not use it
+   *   any more.
    */
+  @Deprecated
   interface CheckAndMutateWithFilterBuilder {
 
     /**
@@ -345,6 +361,37 @@ public interface AsyncTable<C extends 
ScanResultConsumerBase> {
   }
 
   /**
+   * checkAndMutate that atomically checks if a row matches the specified 
condition. If it does,
+   * it performs the specified action.
+   *
+   * @param checkAndMutate The CheckAndMutate object.
+   * @return A {@link CompletableFuture}s that represent the result for the 
CheckAndMutate.
+   */
+  CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate);
+
+  /**
+   * Batch version of checkAndMutate. The specified CheckAndMutates are 
batched only in the sense
+   * that they are sent to a RS in one RPC, but each CheckAndMutate operation 
is still executed
+   * atomically (and thus, each may fail independently of others).
+   *
+   * @param checkAndMutates The list of CheckAndMutate.
+   * @return A list of {@link CompletableFuture}s that represent the result 
for each
+   *   CheckAndMutate.
+   */
+  List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> 
checkAndMutates);
+
+  /**
+   * A simple version of batch checkAndMutate. It will fail if there are any 
failures.
+   *
+   * @param checkAndMutates The list of rows to apply.
+   * @return A {@link CompletableFuture} that wrapper the result boolean list.
+   */
+  default CompletableFuture<List<Boolean>> checkAndMutateAll(
+    List<CheckAndMutate> checkAndMutates) {
+    return allOf(checkAndMutate(checkAndMutates));
+  }
+
+  /**
    * Performs multiple mutations atomically on a single row. Currently {@link 
Put} and
    * {@link Delete} are supported.
    * @param mutation object that specifies the set of mutations to perform 
atomically
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index afd0fac..53a020e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -206,6 +206,17 @@ class AsyncTableImpl implements 
AsyncTable<ScanResultConsumer> {
   }
 
   @Override
+  public CompletableFuture<Boolean> checkAndMutate(CheckAndMutate 
checkAndMutate) {
+    return wrap(rawTable.checkAndMutate(checkAndMutate));
+  }
+
+  @Override
+  public List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> 
checkAndMutates) {
+    return rawTable.checkAndMutate(checkAndMutates).stream()
+      .map(this::wrap).collect(toList());
+  }
+
+  @Override
   public CompletableFuture<Void> mutateRow(RowMutations mutation) {
     return wrap(rawTable.mutateRow(mutation));
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
new file mode 100644
index 0000000..d596093
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * Used to perform CheckAndMutate operations. Currently {@link Put}, {@link 
Delete}
+ * and {@link RowMutations} are supported.
+ * <p>
+ * Use the builder class to instantiate a CheckAndMutate object.
+ * This builder class is fluent style APIs, the code are like:
+ * <pre>
+ * <code>
+ * // A CheckAndMutate operation where do the specified action if the column 
(specified by the
+ * // family and the qualifier) of the row equals to the specified value
+ * CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+ *   .ifEquals(family, qualifier, value)
+ *   .build(put);
+ *
+ * // A CheckAndMutate operation where do the specified action if the column 
(specified by the
+ * // family and the qualifier) of the row doesn't exist
+ * CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+ *   .ifNotExists(family, qualifier)
+ *   .build(put);
+ *
+ * // A CheckAndMutate operation where do the specified action if the row 
matches the filter
+ * CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+ *   .ifMatches(filter)
+ *   .build(delete);
+ * </code>
+ * </pre>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class CheckAndMutate extends Mutation {
+
+  /**
+   * A builder class for building a CheckAndMutate object.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static final class Builder {
+    private final byte[] row;
+    private byte[] family;
+    private byte[] qualifier;
+    private CompareOperator op;
+    private byte[] value;
+    private Filter filter;
+    private TimeRange timeRange;
+
+    private Builder(byte[] row) {
+      this.row = Preconditions.checkNotNull(row, "row is null");
+    }
+
+    /**
+     * Check for lack of column
+     *
+     * @param family family to check
+     * @param qualifier qualifier to check
+     * @return the CheckAndMutate object
+     */
+    public Builder ifNotExists(byte[] family, byte[] qualifier) {
+      return ifEquals(family, qualifier, null);
+    }
+
+    /**
+     * Check for equality
+     *
+     * @param family family to check
+     * @param qualifier qualifier to check
+     * @param value the expected value
+     * @return the CheckAndMutate object
+     */
+    public Builder ifEquals(byte[] family, byte[] qualifier, byte[] value) {
+      return ifMatches(family, qualifier, CompareOperator.EQUAL, value);
+    }
+
+    /**
+     * @param family family to check
+     * @param qualifier qualifier to check
+     * @param compareOp comparison operator to use
+     * @param value the expected value
+     * @return the CheckAndMutate object
+     */
+    public Builder ifMatches(byte[] family, byte[] qualifier, CompareOperator 
compareOp,
+      byte[] value) {
+      this.family = Preconditions.checkNotNull(family, "family is null");
+      this.qualifier = qualifier;
+      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
+      this.value = value;
+      return this;
+    }
+
+    /**
+     * @param filter filter to check
+     * @return the CheckAndMutate object
+     */
+    public Builder ifMatches(Filter filter) {
+      this.filter = Preconditions.checkNotNull(filter, "filter is null");
+      return this;
+    }
+
+    /**
+     * @param timeRange time range to check
+     * @return the CheckAndMutate object
+     */
+    public Builder timeRange(TimeRange timeRange) {
+      this.timeRange = timeRange;
+      return this;
+    }
+
+    private void preCheck(Row action) {
+      Preconditions.checkNotNull(action, "action (Put/Delete/RowMutations) is 
null");
+      if (!Bytes.equals(row, action.getRow())) {
+        throw new IllegalArgumentException("The row of the action 
(Put/Delete/RowMutations) <" +
+          Bytes.toStringBinary(action.getRow()) + "> doesn't match the 
original one <" +
+          Bytes.toStringBinary(this.row) + ">");
+      }
+      Preconditions.checkState(op != null || filter != null, "condition is 
null. You need to"
+        + " specify the condition by calling ifNotExists/ifEquals/ifMatches 
before building a"
+        + " CheckAndMutate object");
+    }
+
+    /**
+     * @param put data to put if check succeeds
+     * @return a CheckAndMutate object
+     */
+    public CheckAndMutate build(Put put) {
+      preCheck(put);
+      if (filter != null) {
+        return new CheckAndMutate(row, filter, timeRange, put);
+      } else {
+        return new CheckAndMutate(row, family, qualifier, op, value, 
timeRange, put);
+      }
+    }
+
+    /**
+     * @param delete data to delete if check succeeds
+     * @return a CheckAndMutate object
+     */
+    public CheckAndMutate build(Delete delete) {
+      preCheck(delete);
+      if (filter != null) {
+        return new CheckAndMutate(row, filter, timeRange, delete);
+      } else {
+        return new CheckAndMutate(row, family, qualifier, op, value, 
timeRange, delete);
+      }
+    }
+
+    /**
+     * @param mutation mutations to perform if check succeeds
+     * @return a CheckAndMutate object
+     */
+    public CheckAndMutate build(RowMutations mutation) {
+      preCheck(mutation);
+      if (filter != null) {
+        return new CheckAndMutate(row, filter, timeRange, mutation);
+      } else {
+        return new CheckAndMutate(row, family, qualifier, op, value, 
timeRange, mutation);
+      }
+    }
+  }
+
+  /**
+   * returns a builder object to build a CheckAndMutate object
+   *
+   * @param row row
+   * @return a builder object
+   */
+  public static Builder newBuilder(byte[] row) {
+    return new Builder(row);
+  }
+
+  private final byte[] family;
+  private final byte[] qualifier;
+  private final CompareOperator op;
+  private final byte[] value;
+  private final Filter filter;
+  private final TimeRange timeRange;
+  private final Row action;
+
+  private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final 
CompareOperator op,
+    byte[] value, TimeRange timeRange, Row action) {
+    super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
+    this.family = family;
+    this.qualifier = qualifier;
+    this.op = op;
+    this.value = value;
+    this.filter = null;
+    this.timeRange = timeRange;
+    this.action = action;
+  }
+
+  private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row 
action) {
+    super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
+    this.family = null;
+    this.qualifier = null;
+    this.op = null;
+    this.value = null;
+    this.filter = filter;
+    this.timeRange = timeRange;
+    this.action = action;
+  }
+
+  /**
+   * @return the family to check
+   */
+  public byte[] getFamily() {
+    return family;
+  }
+
+  /**
+   * @return the qualifier to check
+   */
+  public byte[] getQualifier() {
+    return qualifier;
+  }
+
+  /**
+   * @return the comparison operator
+   */
+  public CompareOperator getCompareOp() {
+    return op;
+  }
+
+  /**
+   * @return the expected value
+   */
+  public byte[] getValue() {
+    return value;
+  }
+
+  /**
+   * @return the filter to check
+   */
+  public Filter getFilter() {
+    return filter;
+  }
+
+  /**
+   * @return the time range to check
+   */
+  public TimeRange getTimeRange() {
+    return timeRange;
+  }
+
+  /**
+   * @return the action done if check succeeds
+   */
+  public Row getAction() {
+    return action;
+  }
+
+  @Override
+  public NavigableMap<byte[], List<Cell>> getFamilyCellMap() {
+    if (action instanceof Mutation) {
+      return ((Mutation) action).getFamilyCellMap();
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public CellBuilder getCellBuilder(CellBuilderType cellBuilderType) {
+    if (action instanceof Mutation) {
+      return ((Mutation) action).getCellBuilder();
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getTimestamp() {
+    if (action instanceof Mutation) {
+      return ((Mutation) action).getTimestamp();
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Mutation setTimestamp(long timestamp) {
+    if (action instanceof Mutation) {
+      return ((Mutation) action).setTimestamp(timestamp);
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Durability getDurability() {
+    if (action instanceof Mutation) {
+      return ((Mutation) action).getDurability();
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Mutation setDurability(Durability d) {
+    if (action instanceof Mutation) {
+      return ((Mutation) action).setDurability(d);
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public byte[] getAttribute(String name) {
+    if (action instanceof Mutation) {
+      return ((Mutation) action).getAttribute(name);
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OperationWithAttributes setAttribute(String name, byte[] value) {
+    if (action instanceof Mutation) {
+      return ((Mutation) action).setAttribute(name, value);
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getPriority() {
+    if (action instanceof Mutation) {
+      return ((Mutation) action).getPriority();
+    }
+    return ((RowMutations) action).getMaxPriority();
+  }
+
+  @Override
+  public OperationWithAttributes setPriority(int priority) {
+    if (action instanceof Mutation) {
+      return ((Mutation) action).setPriority(priority);
+    }
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index 2bfa49d..d575d0b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -153,10 +153,10 @@ public abstract class Mutation extends 
OperationWithAttributes implements Row, C
    * @return a list of Cell objects, returns an empty list if one doesn't 
exist.
    */
   List<Cell> getCellList(byte[] family) {
-    List<Cell> list = this.familyMap.get(family);
+    List<Cell> list = getFamilyCellMap().get(family);
     if (list == null) {
       list = new ArrayList<>();
-      this.familyMap.put(family, list);
+      getFamilyCellMap().put(family, list);
     }
     return list;
   }
@@ -205,11 +205,11 @@ public abstract class Mutation extends 
OperationWithAttributes implements Row, C
   @Override
   public Map<String, Object> getFingerprint() {
     Map<String, Object> map = new HashMap<>();
-    List<String> families = new ArrayList<>(this.familyMap.entrySet().size());
+    List<String> families = new 
ArrayList<>(getFamilyCellMap().entrySet().size());
     // ideally, we would also include table information, but that information
     // is not stored in each Operation instance.
     map.put("families", families);
-    for (Map.Entry<byte [], List<Cell>> entry : this.familyMap.entrySet()) {
+    for (Map.Entry<byte [], List<Cell>> entry : getFamilyCellMap().entrySet()) 
{
       families.add(Bytes.toStringBinary(entry.getKey()));
     }
     return map;
@@ -233,7 +233,7 @@ public abstract class Mutation extends 
OperationWithAttributes implements Row, C
     map.put("row", Bytes.toStringBinary(this.row));
     int colCount = 0;
     // iterate through all column families affected
-    for (Map.Entry<byte [], List<Cell>> entry : this.familyMap.entrySet()) {
+    for (Map.Entry<byte [], List<Cell>> entry : getFamilyCellMap().entrySet()) 
{
       // map from this family to details for each cell affected within the 
family
       List<Map<String, Object>> qualifierDetails = new ArrayList<>();
       columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails);
@@ -310,7 +310,7 @@ public abstract class Mutation extends 
OperationWithAttributes implements Row, C
    * @return true if empty, false otherwise
    */
   public boolean isEmpty() {
-    return familyMap.isEmpty();
+    return getFamilyCellMap().isEmpty();
   }
 
   /**
@@ -441,7 +441,7 @@ public abstract class Mutation extends 
OperationWithAttributes implements Row, C
    */
   public int size() {
     int size = 0;
-    for (List<Cell> cells : this.familyMap.values()) {
+    for (List<Cell> cells : getFamilyCellMap().values()) {
       size += cells.size();
     }
     return size;
@@ -451,7 +451,7 @@ public abstract class Mutation extends 
OperationWithAttributes implements Row, C
    * @return the number of different families
    */
   public int numFamilies() {
-    return familyMap.size();
+    return getFamilyCellMap().size();
   }
 
   /**
@@ -465,8 +465,8 @@ public abstract class Mutation extends 
OperationWithAttributes implements Row, C
 
     // Adding map overhead
     heapsize +=
-      ClassSize.align(this.familyMap.size() * ClassSize.MAP_ENTRY);
-    for(Map.Entry<byte [], List<Cell>> entry : this.familyMap.entrySet()) {
+      ClassSize.align(getFamilyCellMap().size() * ClassSize.MAP_ENTRY);
+    for(Map.Entry<byte [], List<Cell>> entry : getFamilyCellMap().entrySet()) {
       //Adding key overhead
       heapsize +=
         ClassSize.align(ClassSize.ARRAY + entry.getKey().length);
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 0c86161..fa5f7cf 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -358,7 +359,6 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
     return new CheckAndMutateBuilderImpl(row, family);
   }
 
-
   private final class CheckAndMutateWithFilterBuilderImpl
     implements CheckAndMutateWithFilterBuilder {
 
@@ -420,6 +420,54 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
     return new CheckAndMutateWithFilterBuilderImpl(row, filter);
   }
 
+  @Override
+  public CompletableFuture<Boolean> checkAndMutate(CheckAndMutate 
checkAndMutate) {
+    if (checkAndMutate.getAction() instanceof Put) {
+      validatePut((Put) checkAndMutate.getAction(), 
conn.connConf.getMaxKeyValueSize());
+    }
+    if (checkAndMutate.getAction() instanceof Put ||
+      checkAndMutate.getAction() instanceof Delete) {
+      Mutation mutation = (Mutation) checkAndMutate.getAction();
+      if (mutation instanceof Put) {
+        validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
+      }
+      return RawAsyncTableImpl.this.<Boolean> 
newCaller(checkAndMutate.getRow(),
+        mutation.getPriority(), rpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
+          loc, stub, mutation,
+          (rn, m) -> RequestConverter.buildMutateRequest(rn, 
checkAndMutate.getRow(),
+            checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+            checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 
checkAndMutate.getFilter(),
+            checkAndMutate.getTimeRange(), m),
+          (c, r) -> r.getProcessed()))
+        .call();
+    } else if (checkAndMutate.getAction() instanceof RowMutations) {
+      RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
+      return RawAsyncTableImpl.this.<Boolean> 
newCaller(checkAndMutate.getRow(),
+        rowMutations.getMaxPriority(), rpcTimeoutNs)
+        .action((controller, loc, stub) -> 
RawAsyncTableImpl.this.mutateRow(controller,
+          loc, stub, rowMutations,
+          (rn, rm) -> RequestConverter.buildMutateRequest(rn, 
checkAndMutate.getRow(),
+            checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+            checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 
checkAndMutate.getFilter(),
+            checkAndMutate.getTimeRange(), rm),
+          resp -> resp.getExists()))
+        .call();
+    } else {
+      CompletableFuture<Boolean> future = new CompletableFuture<>();
+      future.completeExceptionally(new DoNotRetryIOException(
+        "CheckAndMutate doesn't support " + 
checkAndMutate.getAction().getClass().getName()));
+      return future;
+    }
+  }
+
+  @Override
+  public List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> 
checkAndMutates) {
+    return batch(checkAndMutates, rpcTimeoutNs).stream()
+      .map(f -> f.thenApply(r -> ((Result)r).getExists()))
+      .collect(toList());
+  }
+
   // We need the MultiRequest when constructing the 
org.apache.hadoop.hbase.client.MultiResponse,
   // so here I write a new method as I do not want to change the abstraction 
of call method.
   private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController 
controller,
@@ -556,8 +604,16 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
   }
 
   private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, 
long rpcTimeoutNs) {
-    actions.stream().filter(action -> action instanceof Put).map(action -> 
(Put) action)
-      .forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize()));
+    for (Row action : actions) {
+      if (action instanceof Put) {
+        validatePut((Put) action, conn.connConf.getMaxKeyValueSize());
+      } else if (action instanceof CheckAndMutate) {
+        CheckAndMutate checkAndMutate = (CheckAndMutate) action;
+        if (checkAndMutate.getAction() instanceof Put) {
+          validatePut((Put) checkAndMutate.getAction(), 
conn.connConf.getMaxKeyValueSize());
+        }
+      }
+    }
     return conn.callerFactory.batch().table(tableName).actions(actions)
       .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
       .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, 
TimeUnit.NANOSECONDS)
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 870d83d..bcd045f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -307,14 +307,22 @@ public interface Table extends Closeable {
    * table.checkAndMutate(row, 
family).qualifier(qualifier).ifNotExists().thenPut(put);
    * </code>
    * </pre>
+   *
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use 
only, do not use it
+   *   any more.
    */
+  @Deprecated
   default CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
     throw new NotImplementedException("Add an implementation!");
   }
 
   /**
    * A helper class for sending checkAndMutate request.
+   *
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use 
only, do not use it
+   *   any more.
    */
+  @Deprecated
   interface CheckAndMutateBuilder {
 
     /**
@@ -377,14 +385,22 @@ public interface Table extends Closeable {
    * table.checkAndMutate(row, filter).thenPut(put);
    * </code>
    * </pre>
+   *
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use 
only, do not use it
+   *   any more.
    */
+  @Deprecated
   default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter 
filter) {
     throw new NotImplementedException("Add an implementation!");
   }
 
   /**
    * A helper class for sending checkAndMutate request with a filter.
+   *
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use 
only, do not use it
+   *   any more.
    */
+  @Deprecated
   interface CheckAndMutateWithFilterBuilder {
 
     /**
@@ -412,6 +428,31 @@ public interface Table extends Closeable {
   }
 
   /**
+   * checkAndMutate that atomically checks if a row matches the specified 
condition. If it does,
+   * it performs the specified action.
+   *
+   * @param checkAndMutate The CheckAndMutate object.
+   * @return boolean that represents the result for the CheckAndMutate.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  default boolean checkAndMutate(CheckAndMutate checkAndMutate) throws 
IOException {
+    return checkAndMutate(Collections.singletonList(checkAndMutate))[0];
+  }
+
+  /**
+   * Batch version of checkAndMutate. The specified CheckAndMutates are 
batched only in the sense
+   * that they are sent to a RS in one RPC, but each CheckAndMutate operation 
is still executed
+   * atomically (and thus, each may fail independently of others).
+   *
+   * @param checkAndMutates The list of CheckAndMutate.
+   * @return A array of boolean that represents the result for each 
CheckAndMutate.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  default boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) 
throws IOException {
+    throw new NotImplementedException("Add an implementation!");
+  }
+
+  /**
    * Performs multiple mutations atomically on a single row. Currently
    * {@link Put} and {@link Delete} are supported.
    *
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
index 841f8ba..d33cbe1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
@@ -300,6 +300,16 @@ class TableOverAsyncTable implements Table {
   }
 
   @Override
+  public boolean checkAndMutate(CheckAndMutate checkAndMutate) throws 
IOException {
+    return FutureUtils.get(table.checkAndMutate(checkAndMutate));
+  }
+
+  @Override
+  public boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) throws 
IOException {
+    return 
Booleans.toArray(FutureUtils.get(table.checkAndMutateAll(checkAndMutates)));
+  }
+
+  @Override
   public void mutateRow(RowMutations rm) throws IOException {
     FutureUtils.get(table.mutateRow(rm));
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 74a0493..a524ed3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Action;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -194,37 +195,20 @@ public final class RequestConverter {
   }
 
   /**
-   * Create a protocol buffer MutateRequest for a conditioned put
+   * Create a protocol buffer MutateRequest for a conditioned put/delete
    *
    * @return a mutate request
    * @throws IOException
    */
-  public static MutateRequest buildMutateRequest(
-    final byte[] regionName, final byte[] row, final byte[] family,
-    final byte [] qualifier, final CompareOperator op, final byte[] value, 
final Filter filter,
-    final TimeRange timeRange, final Put put) throws IOException {
-    return buildMutateRequest(regionName, row, family, qualifier, op, value, 
filter, timeRange,
-      put, MutationType.PUT);
-  }
-
-  /**
-   * Create a protocol buffer MutateRequest for a conditioned delete
-   *
-   * @return a mutate request
-   * @throws IOException
-   */
-  public static MutateRequest buildMutateRequest(
-    final byte[] regionName, final byte[] row, final byte[] family,
-    final byte [] qualifier, final CompareOperator op, final byte[] value, 
final Filter filter,
-    final TimeRange timeRange, final Delete delete) throws IOException {
-    return buildMutateRequest(regionName, row, family, qualifier, op, value, 
filter, timeRange,
-      delete, MutationType.DELETE);
-  }
-
   public static MutateRequest buildMutateRequest(final byte[] regionName, 
final byte[] row,
     final byte[] family, final byte[] qualifier, final CompareOperator op, 
final byte[] value,
-    final Filter filter, final TimeRange timeRange, final Mutation mutation,
-    final MutationType type) throws IOException {
+    final Filter filter, final TimeRange timeRange, final Mutation mutation) 
throws IOException {
+    MutationType type;
+    if (mutation instanceof Put) {
+      type = MutationType.PUT;
+    } else {
+      type = MutationType.DELETE;
+    }
     return MutateRequest.newBuilder()
       .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 
regionName))
       .setMutation(ProtobufUtil.toMutation(type, mutation))
@@ -263,9 +247,8 @@ public final class RequestConverter {
       actionBuilder.setMutation(mp);
       builder.addAction(actionBuilder.build());
     }
-    return 
ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
-        .setCondition(buildCondition(row, family, qualifier, op, value, 
filter, timeRange))
-        .build();
+    return 
ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition(
+      buildCondition(row, family, qualifier, op, value, filter, 
timeRange)).build()).build();
   }
 
   /**
@@ -383,42 +366,6 @@ public final class RequestConverter {
     return builder;
   }
 
-  /**
-   * Create a protocol buffer MultiRequest for row mutations that does not 
hold data.  Data/Cells
-   * are carried outside of protobuf.  Return references to the Cells in 
<code>cells</code> param.
-    * Does not propagate Action absolute position.  Does not set atomic action 
on the created
-   * RegionAtomic.  Caller should do that if wanted.
-   * @param regionName
-   * @param rowMutations
-   * @param cells Return in here a list of Cells as CellIterable.
-   * @return a region mutation minus data
-   * @throws IOException
-   */
-  public static RegionAction.Builder buildNoDataRegionAction(final byte[] 
regionName,
-      final RowMutations rowMutations, final List<CellScannable> cells,
-      final RegionAction.Builder regionActionBuilder,
-      final ClientProtos.Action.Builder actionBuilder,
-      final MutationProto.Builder mutationBuilder)
-  throws IOException {
-    for (Mutation mutation: rowMutations.getMutations()) {
-      MutationType type = null;
-      if (mutation instanceof Put) {
-        type = MutationType.PUT;
-      } else if (mutation instanceof Delete) {
-        type = MutationType.DELETE;
-      } else {
-        throw new DoNotRetryIOException("RowMutations supports only put and 
delete, not " +
-          mutation.getClass().getName());
-      }
-      mutationBuilder.clear();
-      MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, 
mutationBuilder);
-      cells.add(mutation);
-      actionBuilder.clear();
-      regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
-    }
-    return regionActionBuilder;
-  }
-
   public static RegionAction.Builder getRegionActionBuilderWithRegion(
       final RegionAction.Builder regionActionBuilder, final byte [] 
regionName) {
     RegionSpecifier region = 
buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
@@ -573,8 +520,8 @@ public final class RequestConverter {
    * @param actionBuilder actionBuilder to be used to build action.
    * @param mutationBuilder mutationBuilder to be used to build mutation.
    * @param nonceGroup nonceGroup to be applied.
-   * @param rowMutationsIndexMap Map of created RegionAction to the original 
index for a
-   *          RowMutations within the original list of actions
+   * @param indexMap Map of created RegionAction to the original index for a
+   *   RowMutations/CheckAndMutate within the original list of actions
    * @throws IOException
    */
   public static void buildNoDataRegionActions(final byte[] regionName,
@@ -583,14 +530,14 @@ public final class RequestConverter {
       final RegionAction.Builder regionActionBuilder,
       final ClientProtos.Action.Builder actionBuilder,
       final MutationProto.Builder mutationBuilder,
-      long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) 
throws IOException {
+      long nonceGroup, final Map<Integer, Integer> indexMap) throws 
IOException {
     regionActionBuilder.clear();
     RegionAction.Builder builder = getRegionActionBuilderWithRegion(
       regionActionBuilder, regionName);
     ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
-    RegionAction.Builder rowMutationsRegionActionBuilder = null;
     boolean hasNonce = false;
     List<Action> rowMutationsList = new ArrayList<>();
+    List<Action> checkAndMutates = new ArrayList<>();
 
     for (Action action: actions) {
       Row row = action.getAction();
@@ -601,26 +548,9 @@ public final class RequestConverter {
         Get g = (Get)row;
         builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
       } else if (row instanceof Put) {
-        Put p = (Put)row;
-        cells.add(p);
-        builder.addAction(actionBuilder.
-          setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p, 
mutationBuilder)));
+        buildNoDataRegionAction((Put) row, cells, builder, actionBuilder, 
mutationBuilder);
       } else if (row instanceof Delete) {
-        Delete d = (Delete)row;
-        int size = d.size();
-        // Note that a legitimate Delete may have a size of zero; i.e. a 
Delete that has nothing
-        // in it but the row to delete.  In this case, the current 
implementation does not make
-        // a KeyValue to represent a delete-of-all-the-row until we 
serialize... For such cases
-        // where the size returned is zero, we will send the Delete fully pb'd 
rather than have
-        // metadata only in the pb and then send the kv along the side in 
cells.
-        if (size > 0) {
-          cells.add(d);
-          builder.addAction(actionBuilder.
-            setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d, 
mutationBuilder)));
-        } else {
-          builder.addAction(actionBuilder.
-            setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d, 
mutationBuilder)));
-        }
+        buildNoDataRegionAction((Delete) row, cells, builder, actionBuilder, 
mutationBuilder);
       } else if (row instanceof Append) {
         Append a = (Append)row;
         cells.add(a);
@@ -651,6 +581,8 @@ public final class RequestConverter {
               .setRequest(value)));
       } else if (row instanceof RowMutations) {
         rowMutationsList.add(action);
+      } else if (row instanceof CheckAndMutate) {
+        checkAndMutates.add(action);
       } else {
         throw new DoNotRetryIOException("Multi doesn't support " + 
row.getClass().getName());
       }
@@ -666,23 +598,104 @@ public final class RequestConverter {
     // on the one row. We do separate RegionAction for each RowMutations.
     // We maintain a map to keep track of this RegionAction and the original 
Action index.
     for (Action action : rowMutationsList) {
-      RowMutations rms = (RowMutations) action.getAction();
-      if (rowMutationsRegionActionBuilder == null) {
-        rowMutationsRegionActionBuilder = 
ClientProtos.RegionAction.newBuilder();
+      builder.clear();
+      getRegionActionBuilderWithRegion(builder, regionName);
+      actionBuilder.clear();
+      mutationBuilder.clear();
+
+      buildNoDataRegionAction((RowMutations) action.getAction(), cells, 
builder, actionBuilder,
+        mutationBuilder);
+      builder.setAtomic(true);
+
+      multiRequestBuilder.addRegionAction(builder.build());
+
+      // This rowMutations region action is at 
(multiRequestBuilder.getRegionActionCount() - 1)
+      // in the overall multiRequest.
+      indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, 
action.getOriginalIndex());
+    }
+
+    // Process CheckAndMutate here. Similar to RowMutations, we do separate 
RegionAction for each
+    // CheckAndMutate and maintain a map to keep track of this RegionAction 
and the original
+    // Action index.
+    for (Action action : checkAndMutates) {
+      builder.clear();
+      getRegionActionBuilderWithRegion(builder, regionName);
+      actionBuilder.clear();
+      mutationBuilder.clear();
+
+      CheckAndMutate cam = (CheckAndMutate) action.getAction();
+      builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), 
cam.getQualifier(),
+        cam.getCompareOp(), cam.getValue(), cam.getFilter(), 
cam.getTimeRange()));
+
+      if (cam.getAction() instanceof Put) {
+        buildNoDataRegionAction((Put) cam.getAction(), cells, builder, 
actionBuilder,
+          mutationBuilder);
+      } else if (cam.getAction() instanceof Delete) {
+        buildNoDataRegionAction((Delete) cam.getAction(), cells, builder, 
actionBuilder,
+          mutationBuilder);
+      } else if (cam.getAction() instanceof RowMutations) {
+        buildNoDataRegionAction((RowMutations) cam.getAction(), cells, 
builder, actionBuilder,
+          mutationBuilder);
+        builder.setAtomic(true);
       } else {
-        rowMutationsRegionActionBuilder.clear();
+        throw new DoNotRetryIOException("CheckAndMutate doesn't support " +
+          cam.getAction().getClass().getName());
       }
-      rowMutationsRegionActionBuilder.setRegion(
-        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 
regionName));
-      rowMutationsRegionActionBuilder = 
RequestConverter.buildNoDataRegionAction(regionName, rms,
-        cells, rowMutationsRegionActionBuilder, actionBuilder, 
mutationBuilder);
-      rowMutationsRegionActionBuilder.setAtomic(true);
-      // Put it in the multiRequestBuilder
-      
multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
-      // This rowMutations region action is at 
(multiRequestBuilder.getRegionActionCount() - 1)
+
+      multiRequestBuilder.addRegionAction(builder.build());
+
+      // This CheckAndMutate region action is at 
(multiRequestBuilder.getRegionActionCount() - 1)
       // in the overall multiRequest.
-      rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
-        action.getOriginalIndex());
+      indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, 
action.getOriginalIndex());
+    }
+  }
+
+  private static void buildNoDataRegionAction(final Put put, final 
List<CellScannable> cells,
+    final RegionAction.Builder regionActionBuilder,
+    final ClientProtos.Action.Builder actionBuilder,
+    final MutationProto.Builder mutationBuilder) throws IOException {
+    cells.add(put);
+    regionActionBuilder.addAction(actionBuilder.
+      setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, put, 
mutationBuilder)));
+  }
+
+  private static void buildNoDataRegionAction(final Delete delete,
+    final List<CellScannable> cells, final RegionAction.Builder 
regionActionBuilder,
+    final ClientProtos.Action.Builder actionBuilder, final 
MutationProto.Builder mutationBuilder)
+    throws IOException {
+    int size = delete.size();
+    // Note that a legitimate Delete may have a size of zero; i.e. a Delete 
that has nothing
+    // in it but the row to delete.  In this case, the current implementation 
does not make
+    // a KeyValue to represent a delete-of-all-the-row until we serialize... 
For such cases
+    // where the size returned is zero, we will send the Delete fully pb'd 
rather than have
+    // metadata only in the pb and then send the kv along the side in cells.
+    if (size > 0) {
+      cells.add(delete);
+      regionActionBuilder.addAction(actionBuilder.
+        setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, delete, 
mutationBuilder)));
+    } else {
+      regionActionBuilder.addAction(actionBuilder.
+        setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete, 
mutationBuilder)));
+    }
+  }
+
+  private static void buildNoDataRegionAction(final RowMutations rowMutations,
+    final List<CellScannable> cells, final RegionAction.Builder 
regionActionBuilder,
+    final ClientProtos.Action.Builder actionBuilder, final 
MutationProto.Builder mutationBuilder)
+    throws IOException {
+    for (Mutation mutation: rowMutations.getMutations()) {
+      MutationType type;
+      if (mutation instanceof Put) {
+        type = MutationType.PUT;
+      } else if (mutation instanceof Delete) {
+        type = MutationType.DELETE;
+      } else {
+        throw new DoNotRetryIOException("RowMutations supports only put and 
delete, not " +
+          mutation.getClass().getName());
+      }
+      MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, 
mutationBuilder);
+      cells.add(mutation);
+      regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
     }
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index d7378a6..19e6735 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -105,14 +105,14 @@ public final class ResponseConverter {
    * Get the results from a protocol buffer MultiResponse
    *
    * @param request the original protocol buffer MultiRequest
-   * @param rowMutationsIndexMap Used to support RowMutations in batch
+   * @param indexMap Used to support RowMutations/CheckAndMutate in batch
    * @param response the protocol buffer MultiResponse to convert
    * @param cells Cells to go with the passed in <code>proto</code>.  Can be 
null.
    * @return the results that were in the MultiResponse (a Result or an 
Exception).
    * @throws IOException
    */
   public static org.apache.hadoop.hbase.client.MultiResponse getResults(final 
MultiRequest request,
-      final Map<Integer, Integer> rowMutationsIndexMap, final MultiResponse 
response,
+      final Map<Integer, Integer> indexMap, final MultiResponse response,
       final CellScanner cells) throws IOException {
     int requestRegionActionCount = request.getRegionActionCount();
     int responseRegionActionResultCount = 
response.getRegionActionResultCount();
@@ -149,18 +149,17 @@ public final class ResponseConverter {
 
       Object responseValue;
 
-      // For RowMutations action, if there is an exception, the exception is 
set
+      // For RowMutations/CheckAndMutate action, if there is an exception, the 
exception is set
       // at the RegionActionResult level and the ResultOrException is null at 
the original index
-      Integer rowMutationsIndex =
-          (rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i));
-      if (rowMutationsIndex != null) {
-        // This RegionAction is from a RowMutations in a batch.
+      Integer index = (indexMap == null ? null : indexMap.get(i));
+      if (index != null) {
+        // This RegionAction is from a RowMutations/CheckAndMutate in a batch.
         // If there is an exception from the server, the exception is set at
         // the RegionActionResult level, which has been handled above.
-        responseValue = response.getProcessed() ?
+        responseValue = actionResult.getProcessed() ?
             ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
             ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
-        results.add(regionName, rowMutationsIndex, responseValue);
+        results.add(regionName, index, responseValue);
         continue;
       }
 
@@ -171,11 +170,11 @@ public final class ResponseConverter {
           responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
         } else if (roe.hasServiceResult()) {
           responseValue = roe.getServiceResult();
-        } else{
+        } else {
           // Sometimes, the response is just "it was processed". Generally, 
this occurs for things
           // like mutateRows where either we get back 'processed' (or not) and 
optionally some
           // statistics about the regions we touched.
-          responseValue = response.getProcessed() ?
+          responseValue = actionResult.getProcessed() ?
                           ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
                           ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
         }
diff --git a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto 
b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto
index fbb0769..7678211 100644
--- a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto
@@ -455,6 +455,7 @@ message RegionAction {
   // When set, run mutations as atomic unit.
   optional bool atomic = 2;
   repeated Action action = 3;
+  optional Condition condition = 4;
 }
 
 /*
@@ -499,6 +500,7 @@ message RegionActionResult {
   repeated ResultOrException resultOrException = 1;
   // If the operation failed globally for this region, this exception is set
   optional NameBytesPair exception = 2;
+  optional bool processed = 3;
 }
 
 /**
@@ -511,13 +513,16 @@ message RegionActionResult {
 message MultiRequest {
   repeated RegionAction regionAction = 1;
   optional uint64 nonceGroup = 2;
-  optional Condition condition = 3;
+  // Moved this to RegionAction in HBASE-8458. Keep it for backward 
compatibility. Need to remove
+  // it in the future.
+  optional Condition condition = 3 [deprecated=true];
 }
 
 message MultiResponse {
   repeated RegionActionResult regionActionResult = 1;
-  // used for mutate to indicate processed only
-  optional bool processed = 2;
+  // Moved this to RegionActionResult in HBASE-8458. Keep it for backward 
compatibility. Need to
+  // remove it in the future.
+  optional bool processed = 2 [deprecated=true];
   optional MultiRegionLoadStats regionStatistics = 3;
 }
 
diff --git 
a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
 
b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index aaf1954..d68ed58 100644
--- 
a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ 
b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -745,6 +746,16 @@ public class RemoteHTable implements Table {
   }
 
   @Override
+  public boolean checkAndMutate(CheckAndMutate checkAndMutate) {
+    throw new NotImplementedException("Implement later");
+  }
+
+  @Override
+  public boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) {
+    throw new NotImplementedException("Implement later");
+  }
+
+  @Override
   public Result increment(Increment increment) throws IOException {
     throw new IOException("Increment not supported");
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 1890a4d..44f4d02 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2814,45 +2814,124 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
 
     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : 
HConstants.NO_NONCE;
 
-    // this will contain all the cells that we need to return. It's created 
later, if needed.
-    List<CellScannable> cellsToReturn = null;
     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
     RegionActionResult.Builder regionActionResultBuilder = 
RegionActionResult.newBuilder();
-    Boolean processed = null;
-    RegionScannersCloseCallBack closeCallBack = null;
-    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
     this.rpcMultiRequestCount.increment();
     this.requestCount.increment();
-    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new 
HashMap<>(request
-      .getRegionActionCount());
     ActivePolicyEnforcement spaceQuotaEnforcement = 
getSpaceQuotaManager().getActiveEnforcements();
-    for (RegionAction regionAction : request.getRegionActionList()) {
+
+    // We no longer use MultiRequest#condition. Instead, we use 
RegionAction#condition. The
+    // following logic is for backward compatibility as old clients still use
+    // MultiRequest#condition in case of checkAndMutate with RowMutations.
+    if (request.hasCondition()) {
+      if (request.getRegionActionList().isEmpty()) {
+        // If the region action list is empty, do nothing.
+        responseBuilder.setProcessed(true);
+        return responseBuilder.build();
+      }
+
+      RegionAction regionAction = request.getRegionAction(0);
+
+      // When request.hasCondition() is true, regionAction.getAtomic() should 
be always true. So
+      // we can assume regionAction.getAtomic() is true here.
+      assert regionAction.getAtomic();
+
       OperationQuota quota;
       HRegion region;
-      regionActionResultBuilder.clear();
       RegionSpecifier regionSpecifier = regionAction.getRegion();
+
       try {
         region = getRegion(regionSpecifier);
         quota = getRpcQuotaManager().checkQuota(region, 
regionAction.getActionList());
       } catch (IOException e) {
         failRegionAction(responseBuilder, regionActionResultBuilder, 
regionAction, cellScanner, e);
-        continue;  // For this region it's a failure.
+        return responseBuilder.build();
       }
-      boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
-      if (regionAction.hasAtomic() && regionAction.getAtomic()) {
+
+      try {
+        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
         // We only allow replication in standby state and it will not set the 
atomic flag.
         if (rejectIfFromClient) {
           failRegionAction(responseBuilder, regionActionResultBuilder, 
regionAction, cellScanner,
-            new DoNotRetryIOException(
-              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY 
state"));
-          quota.close();
-          continue;
+            new 
DoNotRetryIOException(region.getRegionInfo().getRegionNameAsString()
+              + " is in STANDBY state"));
+          return responseBuilder.build();
         }
-        // How does this call happen?  It may need some work to play well w/ 
the surroundings.
-        // Need to return an item per Action along w/ Action index.  TODO.
+
         try {
-          if (request.hasCondition()) {
-            Condition condition = request.getCondition();
+          Condition condition = request.getCondition();
+          byte[] row = condition.getRow().toByteArray();
+          byte[] family = condition.hasFamily() ? 
condition.getFamily().toByteArray() : null;
+          byte[] qualifier =
+            condition.hasQualifier() ? condition.getQualifier().toByteArray() 
: null;
+          CompareOperator op = condition.hasCompareType() ?
+            CompareOperator.valueOf(condition.getCompareType().name()) :
+            null;
+          ByteArrayComparable comparator = condition.hasComparator() ?
+            ProtobufUtil.toComparator(condition.getComparator()) : null;
+          Filter filter =
+            condition.hasFilter() ? 
ProtobufUtil.toFilter(condition.getFilter()) : null;
+          TimeRange timeRange = condition.hasTimeRange() ?
+            ProtobufUtil.toTimeRange(condition.getTimeRange()) :
+            TimeRange.allTime();
+          boolean processed =
+            checkAndRowMutate(region, regionAction.getActionList(), 
cellScanner, row, family,
+              qualifier, op, comparator, filter, timeRange, 
regionActionResultBuilder,
+              spaceQuotaEnforcement);
+          responseBuilder.setProcessed(processed);
+        } catch (IOException e) {
+          rpcServer.getMetrics().exception(e);
+          // As it's an atomic operation with a condition, we may expect it's 
a global failure.
+          
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
+        }
+      } finally {
+        quota.close();
+      }
+
+      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
+      ClientProtos.RegionLoadStats regionLoadStats = 
region.getLoadStatistics();
+      if (regionLoadStats != null) {
+        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
+          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
+      }
+      return responseBuilder.build();
+    }
+
+    // this will contain all the cells that we need to return. It's created 
later, if needed.
+    List<CellScannable> cellsToReturn = null;
+    RegionScannersCloseCallBack closeCallBack = null;
+    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
+    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new 
HashMap<>(request
+      .getRegionActionCount());
+
+    for (RegionAction regionAction : request.getRegionActionList()) {
+      OperationQuota quota;
+      HRegion region;
+      RegionSpecifier regionSpecifier = regionAction.getRegion();
+      regionActionResultBuilder.clear();
+
+      try {
+        region = getRegion(regionSpecifier);
+        quota = getRpcQuotaManager().checkQuota(region, 
regionAction.getActionList());
+      } catch (IOException e) {
+        failRegionAction(responseBuilder, regionActionResultBuilder, 
regionAction, cellScanner, e);
+        continue;  // For this region it's a failure.
+      }
+
+      try {
+        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
+
+        if (regionAction.hasCondition()) {
+          // We only allow replication in standby state and it will not set 
the atomic flag.
+          if (rejectIfFromClient) {
+            failRegionAction(responseBuilder, regionActionResultBuilder, 
regionAction,
+              cellScanner, new DoNotRetryIOException(region.getRegionInfo()
+                .getRegionNameAsString() + " is in STANDBY state"));
+            continue;
+          }
+
+          try {
+            Condition condition = regionAction.getCondition();
             byte[] row = condition.getRow().toByteArray();
             byte[] family = condition.hasFamily() ? 
condition.getFamily().toByteArray() : null;
             byte[] qualifier = condition.hasQualifier() ?
@@ -2864,46 +2943,119 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
             Filter filter = condition.hasFilter() ?
               ProtobufUtil.toFilter(condition.getFilter()) : null;
             TimeRange timeRange = condition.hasTimeRange() ?
-              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
-              TimeRange.allTime();
-            processed =
-              checkAndRowMutate(region, regionAction.getActionList(), 
cellScanner, row, family,
-                qualifier, op, comparator, filter, timeRange, 
regionActionResultBuilder,
-                spaceQuotaEnforcement);
-          } else {
+              ProtobufUtil.toTimeRange(condition.getTimeRange()) : 
TimeRange.allTime();
+
+            boolean processed;
+            if (regionAction.hasAtomic() && regionAction.getAtomic()) {
+              // RowMutations
+              processed =
+                checkAndRowMutate(region, regionAction.getActionList(), 
cellScanner, row, family,
+                  qualifier, op, comparator, filter, timeRange, 
regionActionResultBuilder,
+                  spaceQuotaEnforcement);
+            } else {
+              if (regionAction.getActionList().isEmpty()) {
+                // If the region action list is empty, do nothing.
+                regionActionResultBuilder.setProcessed(true);
+                continue;
+              }
+              Action action = regionAction.getAction(0);
+              if (action.hasGet()) {
+                throw new DoNotRetryIOException("CheckAndMutate doesn't 
support GET="
+                  + action.getGet());
+              }
+              MutationProto mutation = action.getMutation();
+              switch (mutation.getMutateType()) {
+                case PUT:
+                  Put put = ProtobufUtil.toPut(mutation, cellScanner);
+                  checkCellSizeLimit(region, put);
+                  // Throws an exception when violated
+                  
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
+                  quota.addMutation(put);
+
+                  if (filter != null) {
+                    processed = region.checkAndMutate(row, filter, timeRange, 
put);
+                  } else {
+                    processed = region.checkAndMutate(row, family, qualifier, 
op, comparator,
+                      timeRange, put);
+                  }
+                  break;
+
+                case DELETE:
+                  Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+                  checkCellSizeLimit(region, delete);
+                  
spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
+                  quota.addMutation(delete);
+
+                  if (filter != null) {
+                    processed = region.checkAndMutate(row, filter, timeRange, 
delete);
+                  } else {
+                    processed = region.checkAndMutate(row, family, qualifier, 
op, comparator,
+                      timeRange, delete);
+                  }
+                  break;
+
+                default:
+                  throw new DoNotRetryIOException("CheckAndMutate doesn't 
support "
+                    + mutation.getMutateType());
+              }
+
+              // To unify the response format with doNonAtomicRegionMutation 
and read through
+              // client's AsyncProcess we have to add an empty result instance 
per operation
+              regionActionResultBuilder.addResultOrException(
+                
ClientProtos.ResultOrException.newBuilder().setIndex(0).build());
+            }
+            regionActionResultBuilder.setProcessed(processed);
+          } catch (IOException e) {
+            rpcServer.getMetrics().exception(e);
+            // As it's an atomic operation with a condition, we may expect 
it's a global failure.
+            
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
+          }
+        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
+          // We only allow replication in standby state and it will not set 
the atomic flag.
+          if (rejectIfFromClient) {
+            failRegionAction(responseBuilder, regionActionResultBuilder, 
regionAction, cellScanner,
+              new 
DoNotRetryIOException(region.getRegionInfo().getRegionNameAsString()
+                + " is in STANDBY state"));
+            continue;
+          }
+          try {
             doAtomicBatchOp(regionActionResultBuilder, region, quota, 
regionAction.getActionList(),
               cellScanner, spaceQuotaEnforcement);
-            processed = Boolean.TRUE;
+            regionActionResultBuilder.setProcessed(true);
+            // We no longer use MultiResponse#processed. Instead, we use
+            // RegionActionResult#condition. This is for backward 
compatibility for old clients.
+            responseBuilder.setProcessed(true);
+          } catch (IOException e) {
+            rpcServer.getMetrics().exception(e);
+            // As it's atomic, we may expect it's a global failure.
+            
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
           }
-        } catch (IOException e) {
-          rpcServer.getMetrics().exception(e);
-          // As it's atomic, we may expect it's a global failure.
-          
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
-        }
-      } else {
-        if (rejectIfFromClient && regionAction.getActionCount() > 0 &&
-          !isReplicationRequest(regionAction.getAction(0))) {
-          // fail if it is not a replication request
-          failRegionAction(responseBuilder, regionActionResultBuilder, 
regionAction, cellScanner,
-            new DoNotRetryIOException(
-              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY 
state"));
-          quota.close();
-          continue;
-        }
-        // doNonAtomicRegionMutation manages the exception internally
-        if (context != null && closeCallBack == null) {
-          // An RpcCallBack that creates a list of scanners that needs to 
perform callBack
-          // operation on completion of multiGets.
-          // Set this only once
-          closeCallBack = new RegionScannersCloseCallBack();
-          context.setCallBack(closeCallBack);
-        }
-        cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, 
cellScanner,
+        } else {
+          if (rejectIfFromClient && regionAction.getActionCount() > 0 && 
!isReplicationRequest(
+            regionAction.getAction(0))) {
+            // fail if it is not a replication request
+            failRegionAction(responseBuilder, regionActionResultBuilder, 
regionAction, cellScanner,
+              new 
DoNotRetryIOException(region.getRegionInfo().getRegionNameAsString()
+                + " is in STANDBY state"));
+            continue;
+          }
+          // doNonAtomicRegionMutation manages the exception internally
+          if (context != null && closeCallBack == null) {
+            // An RpcCallBack that creates a list of scanners that needs to 
perform callBack
+            // operation on completion of multiGets.
+            // Set this only once
+            closeCallBack = new RegionScannersCloseCallBack();
+            context.setCallBack(closeCallBack);
+          }
+          cellsToReturn = doNonAtomicRegionMutation(region, quota, 
regionAction, cellScanner,
             regionActionResultBuilder, cellsToReturn, nonceGroup, 
closeCallBack, context,
             spaceQuotaEnforcement);
+        }
+      } finally {
+        quota.close();
       }
+
       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
-      quota.close();
       ClientProtos.RegionLoadStats regionLoadStats = 
region.getLoadStatistics();
       if (regionLoadStats != null) {
         regionStats.put(regionSpecifier, regionLoadStats);
@@ -2914,10 +3066,6 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
     }
 
-    if (processed != null) {
-      responseBuilder.setProcessed(processed);
-    }
-
     MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
     for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: 
regionStats.entrySet()){
       builder.addRegion(stat.getKey());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
index 964e929..b545208 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
@@ -113,6 +113,16 @@ public class DummyAsyncTable<C extends 
ScanResultConsumerBase> implements AsyncT
   }
 
   @Override
+  public CompletableFuture<Boolean> checkAndMutate(CheckAndMutate 
checkAndMutate) {
+    return null;
+  }
+
+  @Override
+  public List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> 
checkAndMutates) {
+    return null;
+  }
+
+  @Override
   public CompletableFuture<Void> mutateRow(RowMutations mutation) {
     return null;
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index b9fb811..0de1892 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -246,9 +246,30 @@ public class TestAsyncTable {
     assertArrayEquals(IntStream.range(0, count).toArray(), actual);
   }
 
+  @Test
+  public void testMutateRow() throws InterruptedException, ExecutionException, 
IOException {
+    AsyncTable<?> table = getTable.get();
+    RowMutations mutation = new RowMutations(row);
+    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 
1), VALUE));
+    table.mutateRow(mutation).get();
+    Result result = table.get(new Get(row)).get();
+    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
+
+    mutation = new RowMutations(row);
+    mutation.add((Mutation) new Delete(row).addColumn(FAMILY, 
concat(QUALIFIER, 1)));
+    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 
2), VALUE));
+    table.mutateRow(mutation).get();
+    result = table.get(new Get(row)).get();
+    assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
+    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
+  }
+
+  // Tests for old checkAndMutate API
+
   @SuppressWarnings("FutureReturnValueIgnored")
   @Test
-  public void testCheckAndPut() throws InterruptedException, 
ExecutionException {
+  @Deprecated
+  public void testCheckAndPutForOldApi() throws InterruptedException, 
ExecutionException {
     AsyncTable<?> table = getTable.get();
     AtomicInteger successCount = new AtomicInteger(0);
     AtomicInteger successIndex = new AtomicInteger(-1);
@@ -271,7 +292,8 @@ public class TestAsyncTable {
 
   @SuppressWarnings("FutureReturnValueIgnored")
   @Test
-  public void testCheckAndDelete() throws InterruptedException, 
ExecutionException {
+  @Deprecated
+  public void testCheckAndDeleteForOldApi() throws InterruptedException, 
ExecutionException {
     AsyncTable<?> table = getTable.get();
     int count = 10;
     CountDownLatch putLatch = new CountDownLatch(count + 1);
@@ -307,27 +329,10 @@ public class TestAsyncTable {
     });
   }
 
-  @Test
-  public void testMutateRow() throws InterruptedException, ExecutionException, 
IOException {
-    AsyncTable<?> table = getTable.get();
-    RowMutations mutation = new RowMutations(row);
-    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 
1), VALUE));
-    table.mutateRow(mutation).get();
-    Result result = table.get(new Get(row)).get();
-    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
-
-    mutation = new RowMutations(row);
-    mutation.add((Mutation) new Delete(row).addColumn(FAMILY, 
concat(QUALIFIER, 1)));
-    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 
2), VALUE));
-    table.mutateRow(mutation).get();
-    result = table.get(new Get(row)).get();
-    assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
-    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
-  }
-
   @SuppressWarnings("FutureReturnValueIgnored")
   @Test
-  public void testCheckAndMutate() throws InterruptedException, 
ExecutionException {
+  @Deprecated
+  public void testCheckAndMutateForOldApi() throws InterruptedException, 
ExecutionException {
     AsyncTable<?> table = getTable.get();
     int count = 10;
     CountDownLatch putLatch = new CountDownLatch(count + 1);
@@ -371,7 +376,8 @@ public class TestAsyncTable {
   }
 
   @Test
-  public void testCheckAndMutateWithTimeRange() throws Exception {
+  @Deprecated
+  public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception {
     AsyncTable<?> table = getTable.get();
     final long ts = System.currentTimeMillis() / 2;
     Put put = new Put(row);
@@ -390,6 +396,7 @@ public class TestAsyncTable {
     assertTrue(ok);
 
     RowMutations rm = new RowMutations(row).add((Mutation) put);
+
     ok = table.checkAndMutate(row, 
FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
       .ifEquals(VALUE).thenMutate(rm).get();
     assertFalse(ok);
@@ -410,7 +417,8 @@ public class TestAsyncTable {
   }
 
   @Test
-  public void testCheckAndMutateWithSingleFilter() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
     AsyncTable<?> table = getTable.get();
 
     // Put one row
@@ -465,7 +473,8 @@ public class TestAsyncTable {
   }
 
   @Test
-  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithMultipleFiltersForOldApi() throws 
Throwable {
     AsyncTable<?> table = getTable.get();
 
     // Put one row
@@ -536,7 +545,8 @@ public class TestAsyncTable {
   }
 
   @Test
-  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithTimestampFilterForOldApi() throws 
Throwable {
     AsyncTable<?> table = getTable.get();
 
     // Put with specifying the timestamp
@@ -569,7 +579,8 @@ public class TestAsyncTable {
   }
 
   @Test
-  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws 
Throwable {
     AsyncTable<?> table = getTable.get();
 
     // Put with specifying the timestamp
@@ -599,11 +610,678 @@ public class TestAsyncTable {
   }
 
   @Test(expected = NullPointerException.class)
-  public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithoutConditionForOldApi() {
     getTable.get().checkAndMutate(row, FAMILY)
       .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d")));
   }
 
+  // Tests for new CheckAndMutate API
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  @Test
+  public void testCheckAndPut() throws InterruptedException, 
ExecutionException {
+    AsyncTable<?> table = getTable.get();
+    AtomicInteger successCount = new AtomicInteger(0);
+    AtomicInteger successIndex = new AtomicInteger(-1);
+    int count = 10;
+    CountDownLatch latch = new CountDownLatch(count);
+
+    IntStream.range(0, count)
+      .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
+          .ifNotExists(FAMILY, QUALIFIER)
+          .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))))
+        .thenAccept(x -> {
+          if (x) {
+            successCount.incrementAndGet();
+            successIndex.set(i);
+          }
+          latch.countDown();
+        }));
+    latch.await();
+    assertEquals(1, successCount.get());
+    String actual = Bytes.toString(table.get(new 
Get(row)).get().getValue(FAMILY, QUALIFIER));
+    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  @Test
+  public void testCheckAndDelete() throws InterruptedException, 
ExecutionException {
+    AsyncTable<?> table = getTable.get();
+    int count = 10;
+    CountDownLatch putLatch = new CountDownLatch(count + 1);
+    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> 
putLatch.countDown());
+    IntStream.range(0, count)
+      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 
i), VALUE))
+        .thenRun(() -> putLatch.countDown()));
+    putLatch.await();
+
+    AtomicInteger successCount = new AtomicInteger(0);
+    AtomicInteger successIndex = new AtomicInteger(-1);
+    CountDownLatch deleteLatch = new CountDownLatch(count);
+
+    IntStream.range(0, count)
+      .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
+          .ifEquals(FAMILY, QUALIFIER, VALUE)
+          .build(
+            new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, 
concat(QUALIFIER, i))))
+        .thenAccept(x -> {
+          if (x) {
+            successCount.incrementAndGet();
+            successIndex.set(i);
+          }
+          deleteLatch.countDown();
+        }));
+    deleteLatch.await();
+    assertEquals(1, successCount.get());
+    Result result = table.get(new Get(row)).get();
+    IntStream.range(0, count).forEach(i -> {
+      if (i == successIndex.get()) {
+        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
+      } else {
+        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 
i)));
+      }
+    });
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  @Test
+  public void testCheckAndMutate() throws InterruptedException, 
ExecutionException {
+    AsyncTable<?> table = getTable.get();
+    int count = 10;
+    CountDownLatch putLatch = new CountDownLatch(count + 1);
+    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> 
putLatch.countDown());
+    IntStream.range(0, count)
+      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 
i), VALUE))
+        .thenRun(() -> putLatch.countDown()));
+    putLatch.await();
+
+    AtomicInteger successCount = new AtomicInteger(0);
+    AtomicInteger successIndex = new AtomicInteger(-1);
+    CountDownLatch mutateLatch = new CountDownLatch(count);
+    IntStream.range(0, count).forEach(i -> {
+      RowMutations mutation = new RowMutations(row);
+      try {
+        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
+        mutation
+          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), 
concat(VALUE, i)));
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+
+      table.checkAndMutate(CheckAndMutate.newBuilder(row)
+          .ifEquals(FAMILY, QUALIFIER, VALUE)
+          .build(mutation))
+        .thenAccept(x -> {
+          if (x) {
+            successCount.incrementAndGet();
+            successIndex.set(i);
+          }
+          mutateLatch.countDown();
+        });
+    });
+    mutateLatch.await();
+    assertEquals(1, successCount.get());
+    Result result = table.get(new Get(row)).get();
+    IntStream.range(0, count).forEach(i -> {
+      if (i == successIndex.get()) {
+        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, 
concat(QUALIFIER, i)));
+      } else {
+        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 
i)));
+      }
+    });
+  }
+
+  @Test
+  public void testCheckAndMutateWithTimeRange() throws Exception {
+    AsyncTable<?> table = getTable.get();
+    final long ts = System.currentTimeMillis() / 2;
+    Put put = new Put(row);
+    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
+
+    boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER)
+      .build(put)).get();
+    assertTrue(ok);
+
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, QUALIFIER, VALUE)
+      .timeRange(TimeRange.at(ts + 10000))
+      .build(put)).get();
+    assertFalse(ok);
+
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, QUALIFIER, VALUE)
+      .timeRange(TimeRange.at(ts))
+      .build(put)).get();
+    assertTrue(ok);
+
+    RowMutations rm = new RowMutations(row).add((Mutation) put);
+
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, QUALIFIER, VALUE)
+      .timeRange(TimeRange.at(ts + 10000))
+      .build(rm)).get();
+    assertFalse(ok);
+
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, QUALIFIER, VALUE)
+      .timeRange(TimeRange.at(ts))
+      .build(rm)).get();
+    assertTrue(ok);
+
+    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
+
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, QUALIFIER, VALUE)
+      .timeRange(TimeRange.at(ts + 10000))
+      .build(delete)).get();
+    assertFalse(ok);
+
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, QUALIFIER, VALUE)
+      .timeRange(TimeRange.at(ts))
+      .build(delete)).get();
+    assertTrue(ok);
+  }
+
+  @Test
+  public void testCheckAndMutateWithSingleFilter() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+
+    // Put one row
+    Put put = new Put(row);
+    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+    table.put(put).get();
+
+    // Put with success
+    boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("a")))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d")))).get();
+    assertTrue(ok);
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("D"))).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+    // Put with failure
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("b")))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("e")))).get();
+    assertFalse(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("E"))).get());
+
+    // Delete with success
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("a")))
+      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
+    assertTrue(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("D"))).get());
+
+    // Mutate with success
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
+        CompareOperator.EQUAL, Bytes.toBytes("b")))
+      .build(new RowMutations(row)
+        .add((Mutation) new Put(row)
+          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+        .add((Mutation) new Delete(row).addColumns(FAMILY, 
Bytes.toBytes("A"))))).get();
+    assertTrue(ok);
+
+    result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("D"))).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("A"))).get());
+  }
+
+  @Test
+  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+
+    // Put one row
+    Put put = new Put(row);
+    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+    table.put(put).get();
+
+    // Put with success
+    boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d")))).get();
+    assertTrue(ok);
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("D"))).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+    // Put with failure
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("c"))))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("e")))).get();
+    assertFalse(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("E"))).get());
+
+    // Delete with success
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
+    assertTrue(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("D"))).get());
+
+    // Mutate with success
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new RowMutations(row)
+        .add((Mutation) new Put(row)
+          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+        .add((Mutation) new Delete(row).addColumns(FAMILY, 
Bytes.toBytes("A"))))).get();
+    assertTrue(ok);
+
+    result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("D"))).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("A"))).get());
+  }
+
+  @Test
+  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+
+    // Put with specifying the timestamp
+    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, 
Bytes.toBytes("a"))).get();
+
+    // Put with success
+    boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+        new QualifierFilter(CompareOperator.EQUAL, new 
BinaryComparator(Bytes.toBytes("A"))),
+        new TimestampsFilter(Collections.singletonList(100L))))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b")))).get();
+    assertTrue(ok);
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("B"))).get();
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+    // Put with failure
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+        new QualifierFilter(CompareOperator.EQUAL, new 
BinaryComparator(Bytes.toBytes("A"))),
+        new TimestampsFilter(Collections.singletonList(101L))))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("c")))).get();
+    assertFalse(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("C"))).get());
+  }
+
+  @Test
+  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+
+    // Put with specifying the timestamp
+    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, 
Bytes.toBytes("a")))
+      .get();
+
+    // Put with success
+    boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("a")))
+      .timeRange(TimeRange.between(0, 101))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b")))).get();
+    assertTrue(ok);
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("B"))).get();
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+    // Put with failure
+    ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("a")))
+      .timeRange(TimeRange.between(0, 100))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("c"))))
+      .get();
+    assertFalse(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("C"))).get());
+  }
+
+  // Tests for batch version of checkAndMutate
+
+  @Test
+  public void testCheckAndMutateBatch() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
+    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
+
+    table.putAll(Arrays.asList(
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
+      new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+      new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+      new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d")))).get();
+
+    // Test for Put
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("e")));
+
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
+      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("f")));
+
+    List<Boolean> results =
+      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0));
+    assertFalse(results.get(1));
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("A"))).get();
+    assertEquals("e", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+
+    result = table.get(new Get(row2).addColumn(FAMILY, 
Bytes.toBytes("B"))).get();
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+    // Test for Delete
+    checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))
+      .build(new Delete(row));
+
+    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
+      .build(new Delete(row2));
+
+    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0));
+    assertFalse(results.get(1));
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("A"))).get());
+
+    result = table.get(new Get(row2).addColumn(FAMILY, 
Bytes.toBytes("B"))).get();
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+    // Test for RowMutations
+    checkAndMutate1 = CheckAndMutate.newBuilder(row3)
+      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+      .build(new RowMutations(row3)
+        .add((Mutation) new Put(row3)
+          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
+        .add((Mutation) new Delete(row3).addColumns(FAMILY, 
Bytes.toBytes("C"))));
+
+    checkAndMutate2 = CheckAndMutate.newBuilder(row4)
+      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
+      .build(new RowMutations(row4)
+        .add((Mutation) new Put(row4)
+          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
+        .add((Mutation) new Delete(row4).addColumns(FAMILY, 
Bytes.toBytes("D"))));
+
+    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0));
+    assertFalse(results.get(1));
+
+    result = table.get(new Get(row3)).get();
+    assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+    assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
+
+    result = table.get(new Get(row4)).get();
+    assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+  }
+
+  @Test
+  public void testCheckAndMutateBatch2() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
+    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
+
+    table.putAll(Arrays.asList(
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
+      new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+      new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, 
Bytes.toBytes("c")),
+      new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, 
Bytes.toBytes("d")))).get();
+
+    // Test for ifNotExists()
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, Bytes.toBytes("B"))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("e")));
+
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifNotExists(FAMILY, Bytes.toBytes("B"))
+      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("f")));
+
+    List<Boolean> results =
+      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0));
+    assertFalse(results.get(1));
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("A"))).get();
+    assertEquals("e", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+
+    result = table.get(new Get(row2).addColumn(FAMILY, 
Bytes.toBytes("B"))).get();
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+    // Test for ifMatches()
+    checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, 
Bytes.toBytes("a"))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("a")));
+
+    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, 
Bytes.toBytes("b"))
+      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("f")));
+
+    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0));
+    assertFalse(results.get(1));
+
+    result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("A"))).get();
+    assertEquals("a", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+
+    result = table.get(new Get(row2).addColumn(FAMILY, 
Bytes.toBytes("B"))).get();
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+    // Test for timeRange()
+    checkAndMutate1 = CheckAndMutate.newBuilder(row3)
+      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+      .timeRange(TimeRange.between(0, 101))
+      .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("e")));
+
+    checkAndMutate2 = CheckAndMutate.newBuilder(row4)
+      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
+      .timeRange(TimeRange.between(0, 100))
+      .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("f")));
+
+    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0));
+    assertFalse(results.get(1));
+
+    result = table.get(new Get(row3).addColumn(FAMILY, 
Bytes.toBytes("C"))).get();
+    assertEquals("e", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+
+    result = table.get(new Get(row4).addColumn(FAMILY, 
Bytes.toBytes("D"))).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+  }
+
+  @Test
+  public void testCheckAndMutateBatchWithFilter() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+    table.putAll(Arrays.asList(
+      new Put(row)
+        .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
+        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+      new Put(row2)
+        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
+        .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
+        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
+
+    // Test for Put
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("g")));
+
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("h")));
+
+    List<Boolean> results =
+      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0));
+    assertFalse(results.get(1));
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("C"))).get();
+    assertEquals("g", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+
+    result = table.get(new Get(row2).addColumn(FAMILY, 
Bytes.toBytes("F"))).get();
+    assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+
+    // Test for Delete
+    checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C")));
+
+    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F")));
+
+    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0));
+    assertFalse(results.get(1));
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("C"))).get());
+
+    result = table.get(new Get(row2).addColumn(FAMILY, 
Bytes.toBytes("F"))).get();
+    assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+
+    // Test for RowMutations
+    checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new RowMutations(row)
+        .add((Mutation) new Put(row)
+          .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
+        .add((Mutation) new Delete(row).addColumns(FAMILY, 
Bytes.toBytes("A"))));
+
+    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new RowMutations(row2)
+        .add((Mutation) new Put(row2)
+          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
+        .add((Mutation) new Delete(row2).addColumns(FAMILY, 
Bytes.toBytes("D"))));
+
+    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0));
+    assertFalse(results.get(1));
+
+    result = table.get(new Get(row)).get();
+    assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
+    assertEquals("c", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+
+    result = table.get(new Get(row2)).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+    assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+  }
+
+  @Test
+  public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable 
{
+    AsyncTable<?> table = getTable.get();
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+    table.putAll(Arrays.asList(
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, 
Bytes.toBytes("a"))
+        .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
+        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+      new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, 
Bytes.toBytes("d"))
+        .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
+        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
+
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .timeRange(TimeRange.between(0, 101))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("g")));
+
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("d")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), 
CompareOperator.EQUAL,
+          Bytes.toBytes("e"))))
+      .timeRange(TimeRange.between(0, 100))
+      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("h")));
+
+    List<Boolean> results =
+      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();
+
+    assertTrue(results.get(0));
+    assertFalse(results.get(1));
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, 
Bytes.toBytes("C"))).get();
+    assertEquals("g", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+
+    result = table.get(new Get(row2).addColumn(FAMILY, 
Bytes.toBytes("F"))).get();
+    assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+  }
+
   @Test
   public void testDisabled() throws InterruptedException, ExecutionException {
     ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index 42e61d7..ac82314 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -333,4 +334,57 @@ public class TestAsyncTableBatch {
       assertThat(e.getMessage(), containsString("KeyValue size too large"));
     }
   }
+
+  @Test
+  public void testWithCheckAndMutate() throws Exception {
+    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
+
+    byte[] row1 = Bytes.toBytes("row1");
+    byte[] row2 = Bytes.toBytes("row2");
+    byte[] row3 = Bytes.toBytes("row3");
+    byte[] row4 = Bytes.toBytes("row4");
+    byte[] row5 = Bytes.toBytes("row5");
+
+    table.putAll(Arrays.asList(
+      new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
+      new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+      new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+      new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
+      new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("e")))).get();
+
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+      .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("g")));
+    Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
+    RowMutations mutations = new RowMutations(row3)
+      .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
+      .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("f")));
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
+      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
+      .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("h")));
+    Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("f"));
+
+    List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, 
checkAndMutate2, put);
+    List<Object> results = table.batchAll(actions).get();
+
+    assertTrue(((Result) results.get(0)).getExists());
+    assertEquals("b",
+      Bytes.toString(((Result) results.get(1)).getValue(FAMILY, 
Bytes.toBytes("B"))));
+    assertTrue(((Result) results.get(2)).getExists());
+    assertFalse(((Result) results.get(3)).getExists());
+    assertTrue(((Result) results.get(4)).isEmpty());
+
+    Result result = table.get(new Get(row1)).get();
+    assertEquals("g", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+
+    result = table.get(new Get(row3)).get();
+    assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
+    assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+
+    result = table.get(new Get(row4)).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+    result = table.get(new Get(row5)).get();
+    assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("E"))));
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
index f399e86..f88c769 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.client;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -57,6 +59,9 @@ public class TestCheckAndMutate {
 
   private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
   private static final byte[] ROWKEY = Bytes.toBytes("12345");
+  private static final byte[] ROWKEY2 = Bytes.toBytes("67890");
+  private static final byte[] ROWKEY3 = Bytes.toBytes("abcde");
+  private static final byte[] ROWKEY4 = Bytes.toBytes("fghij");
   private static final byte[] FAMILY = Bytes.toBytes("cf");
 
   @Rule
@@ -131,39 +136,11 @@ public class TestCheckAndMutate {
     return rm;
   }
 
-  @Test
-  public void testCheckAndMutate() throws Throwable {
-    try (Table table = createTable()) {
-      // put one row
-      putOneRow(table);
-      // get row back and assert the values
-      getOneRowAndAssertAllExist(table);
-
-      // put the same row again with C column deleted
-      RowMutations rm = makeRowMutationsWithColumnCDeleted();
-      boolean res = table.checkAndMutate(ROWKEY, 
FAMILY).qualifier(Bytes.toBytes("A"))
-          .ifEquals(Bytes.toBytes("a")).thenMutate(rm);
-      assertTrue(res);
-
-      // get row back and assert the values
-      getOneRowAndAssertAllButCExist(table);
-
-      // Test that we get a region level exception
-      try {
-        rm = getBogusRowMutations();
-        table.checkAndMutate(ROWKEY, FAMILY).qualifier(Bytes.toBytes("A"))
-          .ifEquals(Bytes.toBytes("a")).thenMutate(rm);
-        fail("Expected NoSuchColumnFamilyException");
-      } catch (NoSuchColumnFamilyException e) {
-        // expected
-      } catch (RetriesExhaustedException e) {
-        assertThat(e.getCause(), 
instanceOf(NoSuchColumnFamilyException.class));
-      }
-    }
-  }
+  // Tests for old checkAndMutate API
 
   @Test
-  public void testCheckAndMutateWithBuilder() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateForOldApi() throws Throwable {
     try (Table table = createTable()) {
       // put one row
       putOneRow(table);
@@ -194,7 +171,8 @@ public class TestCheckAndMutate {
   }
 
   @Test
-  public void testCheckAndMutateWithSingleFilter() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
     try (Table table = createTable()) {
       // put one row
       putOneRow(table);
@@ -243,7 +221,8 @@ public class TestCheckAndMutate {
   }
 
   @Test
-  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithMultipleFiltersForOldApi() throws 
Throwable {
     try (Table table = createTable()) {
       // put one row
       putOneRow(table);
@@ -308,7 +287,8 @@ public class TestCheckAndMutate {
   }
 
   @Test
-  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithTimestampFilterForOldApi() throws 
Throwable {
     try (Table table = createTable()) {
       // Put with specifying the timestamp
       table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, 
Bytes.toBytes("a")));
@@ -339,7 +319,8 @@ public class TestCheckAndMutate {
   }
 
   @Test
-  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws 
Throwable {
     try (Table table = createTable()) {
       // Put with specifying the timestamp
       table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, 
Bytes.toBytes("a")));
@@ -366,10 +347,531 @@ public class TestCheckAndMutate {
   }
 
   @Test(expected = NullPointerException.class)
-  public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithoutConditionForOldApi() throws Throwable {
     try (Table table = createTable()) {
       table.checkAndMutate(ROWKEY, FAMILY)
         .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d")));
     }
   }
+
+  // Tests for new CheckAndMutate API
+
+  @Test
+  public void testCheckAndMutate() throws Throwable {
+    try (Table table = createTable()) {
+      // put one row
+      putOneRow(table);
+      // get row back and assert the values
+      getOneRowAndAssertAllExist(table);
+
+      // put the same row again with C column deleted
+      RowMutations rm = makeRowMutationsWithColumnCDeleted();
+      boolean res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .build(rm));
+      assertTrue(res);
+
+      // get row back and assert the values
+      getOneRowAndAssertAllButCExist(table);
+
+      // Test that we get a region level exception
+      try {
+        rm = getBogusRowMutations();
+        table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+          .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+          .build(rm));
+        fail("Expected NoSuchColumnFamilyException");
+      } catch (NoSuchColumnFamilyException e) {
+        // expected
+      } catch (RetriesExhaustedException e) {
+        assertThat(e.getCause(), 
instanceOf(NoSuchColumnFamilyException.class));
+      }
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithSingleFilter() throws Throwable {
+    try (Table table = createTable()) {
+      // put one row
+      putOneRow(table);
+      // get row back and assert the values
+      getOneRowAndAssertAllExist(table);
+
+      // Put with success
+      boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY,
+          Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d"))));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("b")))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("e"))));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("E"))));
+
+      // Delete with success
+      ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+          CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
+      assertTrue(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("D"))));
+
+      // Mutate with success
+      ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
+          CompareOperator.EQUAL, Bytes.toBytes("b")))
+        .build(new RowMutations(ROWKEY)
+          .add((Mutation) new Put(ROWKEY)
+            .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+          .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, 
Bytes.toBytes("A")))));
+      assertTrue(ok);
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("A"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+    try (Table table = createTable()) {
+      // put one row
+      putOneRow(table);
+      // get row back and assert the values
+      getOneRowAndAssertAllExist(table);
+
+      // Put with success
+      boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d"))));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("c"))))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("e"))));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("E"))));
+
+      // Delete with success
+      ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+            new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+              Bytes.toBytes("a")),
+            new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+              Bytes.toBytes("b"))))
+        .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
+      assertTrue(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("D"))));
+
+      // Mutate with success
+      ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .build(new RowMutations(ROWKEY)
+          .add((Mutation) new Put(ROWKEY)
+            .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+          .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, 
Bytes.toBytes("A")))));
+      assertTrue(ok);
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("A"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+    try (Table table = createTable()) {
+      // Put with specifying the timestamp
+      table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, 
Bytes.toBytes("a")));
+
+      // Put with success
+      boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new FamilyFilter(CompareOperator.EQUAL, new 
BinaryComparator(FAMILY)),
+          new QualifierFilter(CompareOperator.EQUAL, new 
BinaryComparator(Bytes.toBytes("A"))),
+          new TimestampsFilter(Collections.singletonList(100L))))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b"))));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new FamilyFilter(CompareOperator.EQUAL, new 
BinaryComparator(FAMILY)),
+          new QualifierFilter(CompareOperator.EQUAL, new 
BinaryComparator(Bytes.toBytes("A"))),
+          new TimestampsFilter(Collections.singletonList(101L))))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("c"))));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("C"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+    try (Table table = createTable()) {
+      // Put with specifying the timestamp
+      table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, 
Bytes.toBytes("a")));
+
+      // Put with success
+      boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY,
+          Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .timeRange(TimeRange.between(0, 101))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b"))));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+          CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .timeRange(TimeRange.between(0, 100))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("c"))));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("C"))));
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testCheckAndMutateBuilderWithoutCondition() {
+    CheckAndMutate.newBuilder(ROWKEY)
+      .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d")));
+  }
+
+  // Tests for batch version of checkAndMutate
+
+  @Test
+  public void testCheckAndMutateBatch() throws Throwable {
+    try (Table table = createTable()) {
+      table.put(Arrays.asList(
+        new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("a")),
+        new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b")),
+        new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("c")),
+        new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d"))));
+
+      // Test for Put
+      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("e")));
+
+      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+        .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
+        .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("f")));
+
+      boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, 
checkAndMutate2));
+
+      assertTrue(results[0]);
+      assertFalse(results[1]);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("A")));
+      assertEquals("e", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+
+      result = table.get(new Get(ROWKEY2).addColumn(FAMILY, 
Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+      // Test for Delete
+      checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))
+        .build(new Delete(ROWKEY));
+
+      checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+        .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
+        .build(new Delete(ROWKEY2));
+
+      results = table.checkAndMutate(Arrays.asList(checkAndMutate1, 
checkAndMutate2));
+
+      assertTrue(results[0]);
+      assertFalse(results[1]);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("A"))));
+
+      result = table.get(new Get(ROWKEY2).addColumn(FAMILY, 
Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+      // Test for RowMutations
+      checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY3)
+        .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+        .build(new RowMutations(ROWKEY3)
+          .add((Mutation) new Put(ROWKEY3)
+            .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
+          .add((Mutation) new Delete(ROWKEY3).addColumns(FAMILY, 
Bytes.toBytes("C"))));
+
+      checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY4)
+        .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
+        .build(new RowMutations(ROWKEY4)
+          .add((Mutation) new Put(ROWKEY4)
+            .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
+          .add((Mutation) new Delete(ROWKEY4).addColumns(FAMILY, 
Bytes.toBytes("D"))));
+
+      results = table.checkAndMutate(Arrays.asList(checkAndMutate1, 
checkAndMutate2));
+
+      assertTrue(results[0]);
+      assertFalse(results[1]);
+
+      result = table.get(new Get(ROWKEY3));
+      assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+      assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
+
+      result = table.get(new Get(ROWKEY4));
+      assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateBatch2() throws Throwable {
+    try (Table table = createTable()) {
+      table.put(Arrays.asList(
+        new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("a")),
+        new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b")),
+        new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), 100, 
Bytes.toBytes("c")),
+        new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), 100, 
Bytes.toBytes("d"))));
+
+      // Test for ifNotExists()
+      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+        .ifNotExists(FAMILY, Bytes.toBytes("B"))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("e")));
+
+      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+        .ifNotExists(FAMILY, Bytes.toBytes("B"))
+        .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("f")));
+
+      boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, 
checkAndMutate2));
+
+      assertTrue(results[0]);
+      assertFalse(results[1]);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("A")));
+      assertEquals("e", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+
+      result = table.get(new Get(ROWKEY2).addColumn(FAMILY, 
Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+      // Test for ifMatches()
+      checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, 
Bytes.toBytes("a"))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("a")));
+
+      checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+        .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, 
Bytes.toBytes("b"))
+        .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("f")));
+
+      results = table.checkAndMutate(Arrays.asList(checkAndMutate1, 
checkAndMutate2));
+
+      assertTrue(results[0]);
+      assertFalse(results[1]);
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("A")));
+      assertEquals("a", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+
+      result = table.get(new Get(ROWKEY2).addColumn(FAMILY, 
Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("B"))));
+
+      // Test for timeRange()
+      checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY3)
+        .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+        .timeRange(TimeRange.between(0, 101))
+        .build(new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("e")));
+
+      checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY4)
+        .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
+        .timeRange(TimeRange.between(0, 100))
+        .build(new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("f")));
+
+      results = table.checkAndMutate(Arrays.asList(checkAndMutate1, 
checkAndMutate2));
+
+      assertTrue(results[0]);
+      assertFalse(results[1]);
+
+      result = table.get(new Get(ROWKEY3).addColumn(FAMILY, 
Bytes.toBytes("C")));
+      assertEquals("e", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+
+      result = table.get(new Get(ROWKEY4).addColumn(FAMILY, 
Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateBatchWithFilter() throws Throwable {
+    try (Table table = createTable()) {
+      table.put(Arrays.asList(
+        new Put(ROWKEY)
+          .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+          .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
+          .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+        new Put(ROWKEY2)
+          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
+          .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
+          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))));
+
+      // Test for Put
+      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("g")));
+
+      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("h")));
+
+      boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, 
checkAndMutate2));
+
+      assertTrue(results[0]);
+      assertFalse(results[1]);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("C")));
+      assertEquals("g", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+
+      result = table.get(new Get(ROWKEY2).addColumn(FAMILY, 
Bytes.toBytes("F")));
+      assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+
+      // Test for Delete
+      checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("C")));
+
+      checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .build(new Delete(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F")));
+
+      results = table.checkAndMutate(Arrays.asList(checkAndMutate1, 
checkAndMutate2));
+
+      assertTrue(results[0]);
+      assertFalse(results[1]);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("C"))));
+
+      result = table.get(new Get(ROWKEY2).addColumn(FAMILY, 
Bytes.toBytes("F")));
+      assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+
+      // Test for RowMutations
+      checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .build(new RowMutations(ROWKEY)
+          .add((Mutation) new Put(ROWKEY)
+            .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
+          .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, 
Bytes.toBytes("A"))));
+
+      checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .build(new RowMutations(ROWKEY2)
+          .add((Mutation) new Put(ROWKEY2)
+            .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
+          .add((Mutation) new Delete(ROWKEY2).addColumns(FAMILY, 
Bytes.toBytes("D"))));
+
+      results = table.checkAndMutate(Arrays.asList(checkAndMutate1, 
checkAndMutate2));
+
+      assertTrue(results[0]);
+      assertFalse(results[1]);
+
+      result = table.get(new Get(ROWKEY));
+      assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
+      assertEquals("c", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+
+      result = table.get(new Get(ROWKEY2));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+      assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable 
{
+    try (Table table = createTable()) {
+      table.put(Arrays.asList(
+        new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, 
Bytes.toBytes("a"))
+          .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
+          .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+        new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D"), 100, 
Bytes.toBytes("d"))
+          .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
+          .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))));
+
+      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .timeRange(TimeRange.between(0, 101))
+        .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("g")));
+
+      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("d")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), 
CompareOperator.EQUAL,
+            Bytes.toBytes("e"))))
+        .timeRange(TimeRange.between(0, 100))
+        .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("h")));
+
+      boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, 
checkAndMutate2));
+
+      assertTrue(results[0]);
+      assertFalse(results[1]);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, 
Bytes.toBytes("C")));
+      assertEquals("g", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("C"))));
+
+      result = table.get(new Get(ROWKEY2).addColumn(FAMILY, 
Bytes.toBytes("F")));
+      assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+    }
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 3de5c1b..1e281fb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -453,6 +453,60 @@ public class TestFromClientSide3 {
   }
 
   @Test
+  public void testBatchWithCheckAndMutate() throws Exception {
+    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY 
})) {
+      byte[] row1 = Bytes.toBytes("row1");
+      byte[] row2 = Bytes.toBytes("row2");
+      byte[] row3 = Bytes.toBytes("row3");
+      byte[] row4 = Bytes.toBytes("row4");
+      byte[] row5 = Bytes.toBytes("row5");
+
+      table.put(Arrays.asList(
+        new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("a")),
+        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), 
Bytes.toBytes("b")),
+        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 
Bytes.toBytes("c")),
+        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 
Bytes.toBytes("d")),
+        new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("e"))));
+
+      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), 
Bytes.toBytes("g")));
+      Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
+      RowMutations mutations = new RowMutations(row3)
+        .add((Mutation) new Delete(row3).addColumns(FAMILY, 
Bytes.toBytes("C")))
+        .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), 
Bytes.toBytes("f")));
+      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
+        .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
+        .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("h")));
+      Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), 
Bytes.toBytes("f"));
+
+      List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, 
checkAndMutate2, put);
+      Object[] results = new Object[actions.size()];
+      table.batch(actions, results);
+
+      assertTrue(((Result) results[0]).getExists());
+      assertEquals("b",
+        Bytes.toString(((Result) results[1]).getValue(FAMILY, 
Bytes.toBytes("B"))));
+      assertTrue(((Result) results[2]).getExists());
+      assertFalse(((Result) results[3]).getExists());
+      assertTrue(((Result) results[4]).isEmpty());
+
+      Result result = table.get(new Get(row1));
+      assertEquals("g", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("A"))));
+
+      result = table.get(new Get(row3));
+      assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
+      assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
+
+      result = table.get(new Get(row4));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("D"))));
+
+      result = table.get(new Get(row5));
+      assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("E"))));
+    }
+  }
+
+  @Test
   public void testHTableExistsMethodSingleRegionSingleGet()
           throws IOException, InterruptedException {
     try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY 
})) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
index 655225a..dc8c4ef 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -252,9 +252,8 @@ public class TestMalformedCellFromClient {
       actionBuilder.setMutation(mp);
       builder.addAction(actionBuilder.build());
     }
-    ClientProtos.MultiRequest request =
-      ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
-        .setCondition(condition).build();
+    ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder()
+        .addRegionAction(builder.setCondition(condition).build()).build();
     return request;
   }
 
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
index 30b1fa1..19154d6 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
@@ -432,6 +433,16 @@ public class ThriftTable implements Table {
   }
 
   @Override
+  public boolean checkAndMutate(CheckAndMutate checkAndMutate) {
+    throw new NotImplementedException("Implement later");
+  }
+
+  @Override
+  public boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) {
+    throw new NotImplementedException("Implement later");
+  }
+
+  @Override
   public void mutateRow(RowMutations rm) throws IOException {
     TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
     try {

Reply via email to