HBASE-19876 The exception happening in converting pb mutation to hbase.mutation 
messes up the CellScanner


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2f48fdbb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2f48fdbb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2f48fdbb

Branch: refs/heads/HBASE-19064
Commit: 2f48fdbb26ff555485b4aa3393d835b7dd8797a0
Parents: 16f1f5b
Author: Chia-Ping Tsai <chia7...@gmail.com>
Authored: Sun Feb 11 03:49:53 2018 +0800
Committer: Chia-Ping Tsai <chia7...@gmail.com>
Committed: Tue Feb 13 21:08:59 2018 +0800

----------------------------------------------------------------------
 .../hbase/shaded/protobuf/RequestConverter.java |   4 +-
 .../hbase/regionserver/RSRpcServices.java       | 138 +++++++------
 .../client/TestMalformedCellFromClient.java     | 203 +++++++++++++++++--
 3 files changed, 262 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2f48fdbb/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 8ac7058..0afcfe1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -473,7 +473,7 @@ public final class RequestConverter {
     return regionActionBuilder;
   }
 
-  private static RegionAction.Builder getRegionActionBuilderWithRegion(
+  public static RegionAction.Builder getRegionActionBuilderWithRegion(
       final RegionAction.Builder regionActionBuilder, final byte [] 
regionName) {
     RegionSpecifier region = 
buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
     regionActionBuilder.setRegion(region);
@@ -1099,7 +1099,7 @@ public final class RequestConverter {
    * @return a Condition
    * @throws IOException
    */
-  private static Condition buildCondition(final byte[] row, final byte[] 
family,
+  public static Condition buildCondition(final byte[] row, final byte[] family,
       final byte[] qualifier, final ByteArrayComparable comparator, final 
CompareType compareType)
       throws IOException {
     Condition.Builder builder = Condition.newBuilder();

http://git-wip-us.apache.org/repos/asf/hbase/blob/2f48fdbb/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 44934a6..5b4e3b8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -560,67 +560,60 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
    * Mutate a list of rows atomically.
    * @param cellScanner if non-null, the mutation data -- the Cell content.
    */
-  private void mutateRows(final HRegion region, final OperationQuota quota,
-      final List<ClientProtos.Action> actions, final CellScanner cellScanner,
-      RegionActionResult.Builder builder, final ActivePolicyEnforcement 
spaceQuotaEnforcement)
-      throws IOException {
-    for (ClientProtos.Action action: actions) {
-      if (action.hasGet()) {
-        throw new DoNotRetryIOException("Atomic put and/or delete only, not a 
Get=" +
-          action.getGet());
-      }
-    }
-    doBatchOp(builder, region, quota, actions, cellScanner, 
spaceQuotaEnforcement, true);
-  }
-
-  /**
-   * Mutate a list of rows atomically.
-   * @param cellScanner if non-null, the mutation data -- the Cell content.
-   */
   private boolean checkAndRowMutate(final HRegion region, final 
List<ClientProtos.Action> actions,
                                     final CellScanner cellScanner, byte[] row, 
byte[] family, byte[] qualifier,
                                     CompareOperator op, ByteArrayComparable 
comparator, RegionActionResult.Builder builder,
                                     ActivePolicyEnforcement 
spaceQuotaEnforcement) throws IOException {
-    if (!region.getRegionInfo().isMetaRegion()) {
-      regionServer.cacheFlusher.reclaimMemStoreMemory();
-    }
-    RowMutations rm = null;
-    int i = 0;
-    ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
+    int countOfCompleteMutation = 0;
+    try {
+      if (!region.getRegionInfo().isMetaRegion()) {
+        regionServer.cacheFlusher.reclaimMemStoreMemory();
+      }
+      RowMutations rm = null;
+      int i = 0;
+      ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
         ClientProtos.ResultOrException.newBuilder();
-    for (ClientProtos.Action action: actions) {
-      if (action.hasGet()) {
-        throw new DoNotRetryIOException("Atomic put and/or delete only, not a 
Get=" +
+      for (ClientProtos.Action action: actions) {
+        if (action.hasGet()) {
+          throw new DoNotRetryIOException("Atomic put and/or delete only, not 
a Get=" +
             action.getGet());
+        }
+        MutationType type = action.getMutation().getMutateType();
+        if (rm == null) {
+          rm = new RowMutations(action.getMutation().getRow().toByteArray(), 
actions.size());
+        }
+        switch (type) {
+          case PUT:
+            Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
+            ++countOfCompleteMutation;
+            checkCellSizeLimit(region, put);
+            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
+            rm.add(put);
+            break;
+          case DELETE:
+            Delete del = ProtobufUtil.toDelete(action.getMutation(), 
cellScanner);
+            ++countOfCompleteMutation;
+            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
+            rm.add(del);
+            break;
+          default:
+            throw new DoNotRetryIOException("Atomic put and/or delete only, 
not " + type.name());
+        }
+        // To unify the response format with doNonAtomicRegionMutation and 
read through client's
+        // AsyncProcess we have to add an empty result instance per operation
+        resultOrExceptionOrBuilder.clear();
+        resultOrExceptionOrBuilder.setIndex(i++);
+        builder.addResultOrException(
+          resultOrExceptionOrBuilder.build());
       }
-      MutationType type = action.getMutation().getMutateType();
-      if (rm == null) {
-        rm = new RowMutations(action.getMutation().getRow().toByteArray(), 
actions.size());
-      }
-      switch (type) {
-        case PUT:
-          Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
-          checkCellSizeLimit(region, put);
-          spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
-          rm.add(put);
-          break;
-        case DELETE:
-          Delete del = ProtobufUtil.toDelete(action.getMutation(), 
cellScanner);
-          spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
-          rm.add(del);
-          break;
-        default:
-          throw new DoNotRetryIOException("Atomic put and/or delete only, not 
" + type.name());
+      return region.checkAndRowMutate(row, family, qualifier, op, comparator, 
rm);
+    } finally {
+      // Currently, the checkAndMutate isn't supported by batch so it won't 
mess up the cell scanner
+      // even if the malformed cells are not skipped.
+      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
+        skipCellsForMutation(actions.get(i), cellScanner);
       }
-      // To unify the response format with doNonAtomicRegionMutation and read 
through client's
-      // AsyncProcess we have to add an empty result instance per operation
-      resultOrExceptionOrBuilder.clear();
-      resultOrExceptionOrBuilder.setIndex(i++);
-      builder.addResultOrException(
-          resultOrExceptionOrBuilder.build());
     }
-    return region.checkAndRowMutate(row, family, qualifier, op,
-        comparator, rm);
   }
 
   /**
@@ -787,9 +780,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           context.incrementResponseExceptionSize(pair.getSerializedSize());
           resultOrExceptionBuilder.setIndex(action.getIndex());
           builder.addResultOrException(resultOrExceptionBuilder.build());
-          if (cellScanner != null) {
-            skipCellsForMutation(action, cellScanner);
-          }
+          skipCellsForMutation(action, cellScanner);
           continue;
         }
         if (action.hasGet()) {
@@ -896,6 +887,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       try {
         doBatchOp(builder, region, quota, mutations, cellScanner, 
spaceQuotaEnforcement, false);
       } catch (IOException ioe) {
+        // TODO do the refactor to avoid this catch as it is useless
+        // doBatchOp has handled the IOE for all non-atomic operations.
         rpcServer.getMetrics().exception(ioe);
         NameBytesPair pair = ResponseConverter.buildException(ioe);
         resultOrExceptionBuilder.setException(pair);
@@ -947,6 +940,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
       int i = 0;
       for (ClientProtos.Action action: mutations) {
+        if (action.hasGet()) {
+          throw new DoNotRetryIOException("Atomic put and/or delete only, not 
a Get=" +
+            action.getGet());
+        }
         MutationProto m = action.getMutation();
         Mutation mutation;
         if (m.getMutateType() == MutationType.PUT) {
@@ -969,8 +966,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
 
       // HBASE-17924
-      // Sort to improve lock efficiency for non-atomic batch of operations. 
If atomic (mostly
-      // called from mutateRows()), order is preserved as its expected from 
the client
+      // Sort to improve lock efficiency for non-atomic batch of operations. 
If atomic
+      // order is preserved as its expected from the client
       if (!atomic) {
         Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
       }
@@ -1005,12 +1002,19 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         }
       }
     } catch (IOException ie) {
+      int processedMutationIndex = 0;
+      for (Action mutation : mutations) {
+        // The non-null mArray[i] means the cell scanner has been read.
+        if (mArray[processedMutationIndex++] == null) {
+          skipCellsForMutation(mutation, cells);
+        }
+        if (!atomic) {
+          builder.addResultOrException(getResultOrException(ie, 
mutation.getIndex()));
+        }
+      }
       if (atomic) {
         throw ie;
       }
-      for (Action mutation : mutations) {
-        builder.addResultOrException(getResultOrException(ie, 
mutation.getIndex()));
-      }
     }
     if (regionServer.metricsRegionServer != null) {
       long after = EnvironmentEdgeManager.currentTime();
@@ -2550,9 +2554,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         // All Mutations in this RegionAction not executed as we can not see 
the Region online here
         // in this RS. Will be retried from Client. Skipping all the Cells in 
CellScanner
         // corresponding to these Mutations.
-        if (cellScanner != null) {
-          skipCellsForMutations(regionAction.getActionList(), cellScanner);
-        }
+        skipCellsForMutations(regionAction.getActionList(), cellScanner);
         continue;  // For this region it's a failure.
       }
 
@@ -2573,8 +2575,8 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
                   cellScanner, row, family, qualifier, op,
                   comparator, regionActionResultBuilder, 
spaceQuotaEnforcement);
           } else {
-            mutateRows(region, quota, regionAction.getActionList(), 
cellScanner,
-                regionActionResultBuilder, spaceQuotaEnforcement);
+            doBatchOp(regionActionResultBuilder, region, quota, 
regionAction.getActionList(),
+              cellScanner, spaceQuotaEnforcement, true);
             processed = Boolean.TRUE;
           }
         } catch (IOException e) {
@@ -2621,12 +2623,18 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
   }
 
   private void skipCellsForMutations(List<Action> actions, CellScanner 
cellScanner) {
+    if (cellScanner == null) {
+      return;
+    }
     for (Action action : actions) {
       skipCellsForMutation(action, cellScanner);
     }
   }
 
   private void skipCellsForMutation(Action action, CellScanner cellScanner) {
+    if (cellScanner == null) {
+      return;
+    }
     try {
       if (action.hasMutation()) {
         MutationProto m = action.getMutation();

http://git-wip-us.apache.org/repos/asf/hbase/blob/2f48fdbb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
index e44a2e9..6b57b89 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -29,11 +30,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -44,23 +49,21 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 
 /**
- * The purpose of this test is to make sure the region exception won't corrupt 
the results
- * of batch. The prescription is shown below.
- * 1) honor the action result rather than region exception. If the action have 
both of true result
- * and region exception, the action is fine as the exception is caused by 
other actions
- * which are in the same region.
- * 2) honor the action exception rather than region exception. If the action 
have both of action
- * exception and region exception, we deal with the action exception only. If 
we also
- * handle the region exception for the same action, it will introduce the 
negative count of
- * actions in progress. The AsyncRequestFuture#waitUntilDone will block 
forever.
- *
- * The no-cluster test is in TestAsyncProcessWithRegionException.
+ * The purpose of this test is to ensure whether rs deals with the malformed 
cells correctly.
  */
 @Category({ MediumTests.class, ClientTests.class })
 public class TestMalformedCellFromClient {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestMalformedCellFromClient.class);
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestMalformedCellFromClient.class);
@@ -139,10 +142,16 @@ public class TestMalformedCellFromClient {
   }
 
   /**
-   * The purpose of this ut is to check the consistency between the exception 
and results.
-   * If the RetriesExhaustedWithDetailsException contains the whole batch,
-   * each result should be of IOE. Otherwise, the row operation which is not 
in the exception
-   * should have a true result.
+   * This test verifies region exception doesn't corrupt the results of batch. 
The prescription is
+   * shown below. 1) honor the action result rather than region exception. If 
the action have both
+   * of true result and region exception, the action is fine as the exception 
is caused by other
+   * actions which are in the same region. 2) honor the action exception 
rather than region
+   * exception. If the action have both of action exception and region 
exception, we deal with the
+   * action exception only. If we also handle the region exception for the 
same action, it will
+   * introduce the negative count of actions in progress. The 
AsyncRequestFuture#waitUntilDone will
+   * block forever. If the RetriesExhaustedWithDetailsException contains the 
whole batch, each
+   * result should be of IOE. Otherwise, the row operation which is not in the 
exception should have
+   * a true result. The no-cluster test is in 
TestAsyncProcessWithRegionException.
    */
   @Test
   public void testRegionExceptionByAsync() throws Exception {
@@ -170,4 +179,166 @@ public class TestMalformedCellFromClient {
       assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
     }
   }
+
+  /**
+   * The invalid cells is in rm. The rm should fail but the subsequent 
mutations should succeed.
+   * Currently, we have no client api to submit the request consisting of 
condition-rm and mutation.
+   * Hence, this test build the request manually.
+   */
+  @Test
+  public void testAtomicOperations() throws Exception {
+    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
+    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
+    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10]));
+    Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new 
byte[10]);
+
+    // build the request
+    HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+    ClientProtos.MultiRequest request =
+      ClientProtos.MultiRequest.newBuilder(createRequest(rm, 
r.getRegionInfo().getRegionName()))
+        
.addRegionAction(ClientProtos.RegionAction.newBuilder().setRegion(RequestConverter
+          
.buildRegionSpecifier(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
+            
r.getRegionInfo().getRegionName())).addAction(ClientProtos.Action.newBuilder()
+          .setMutation(
+            
ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, 
put))))
+        .build();
+
+    List<Cell> cells = new ArrayList<>();
+    for (Mutation m : rm.getMutations()) {
+      cells.addAll(m.getCellList(FAMILY));
+    }
+    cells.addAll(put.getCellList(FAMILY));
+    assertEquals(3, cells.size());
+    HBaseRpcController controller = Mockito.mock(HBaseRpcController.class);
+    
Mockito.when(controller.cellScanner()).thenReturn(CellUtil.createCellScanner(cells));
+    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(
+      TEST_UTIL.getMiniHBaseCluster()
+        .getServerHoldingRegion(TABLE_NAME, 
r.getRegionInfo().getRegionName()));
+
+    ClientProtos.MultiResponse response = 
rs.getRSRpcServices().multi(controller, request);
+    assertEquals(2, response.getRegionActionResultCount());
+    assertTrue(response.getRegionActionResultList().get(0).hasException());
+    assertFalse(response.getRegionActionResultList().get(1).hasException());
+    assertEquals(1, 
response.getRegionActionResultList().get(1).getResultOrExceptionCount());
+    assertTrue(
+      
response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult());
+    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+      Result result = table.get(new Get(Bytes.toBytes("good")));
+      assertEquals(1, result.size());
+      Cell cell = result.getColumnLatestCell(FAMILY, null);
+      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
+    }
+  }
+
+  private static ClientProtos.MultiRequest createRequest(RowMutations rm, 
byte[] regionName)
+    throws IOException {
+    ClientProtos.RegionAction.Builder builder = RequestConverter
+      
.getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), 
regionName);
+    builder.setAtomic(true);
+    ClientProtos.Action.Builder actionBuilder = 
ClientProtos.Action.newBuilder();
+    ClientProtos.MutationProto.Builder mutationBuilder = 
ClientProtos.MutationProto.newBuilder();
+    ClientProtos.Condition condition = RequestConverter
+      .buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new 
byte[10]),
+        HBaseProtos.CompareType.EQUAL);
+    for (Mutation mutation : rm.getMutations()) {
+      ClientProtos.MutationProto.MutationType mutateType = null;
+      if (mutation instanceof Put) {
+        mutateType = ClientProtos.MutationProto.MutationType.PUT;
+      } else if (mutation instanceof Delete) {
+        mutateType = ClientProtos.MutationProto.MutationType.DELETE;
+      } else {
+        throw new DoNotRetryIOException(
+          "RowMutations supports only put and delete, not " + 
mutation.getClass().getName());
+      }
+      mutationBuilder.clear();
+      ClientProtos.MutationProto mp =
+        ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder);
+      actionBuilder.clear();
+      actionBuilder.setMutation(mp);
+      builder.addAction(actionBuilder.build());
+    }
+    ClientProtos.MultiRequest request =
+      ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
+        .setCondition(condition).build();
+    return request;
+  }
+
+  /**
+   * This test depends on how regionserver process the batch ops.
+   * 1) group the put/delete until meeting the increment
+   * 2) process the batch of put/delete
+   * 3) process the increment
+   * see RSRpcServices#doNonAtomicRegionMutation
+   */
+  @Test
+  public void testNonAtomicOperations() throws InterruptedException, 
IOException {
+    Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, 
null, 100);
+    List<Row> batches = new ArrayList<>();
+    // the first and second puts will be group by regionserver
+    batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new 
byte[CELL_SIZE]));
+    batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new 
byte[CELL_SIZE]));
+    // this Increment should succeed
+    batches.add(inc);
+    // this put should succeed
+    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new 
byte[1]));
+    Object[] objs = new Object[batches.size()];
+    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+      table.batch(batches, objs);
+      fail("Where is the exception? We put the malformed cells!!!");
+    } catch (RetriesExhaustedWithDetailsException e) {
+      assertEquals(2, e.getNumExceptions());
+      for (int i = 0; i != e.getNumExceptions(); ++i) {
+        assertNotNull(e.getCause(i));
+        assertEquals(DoNotRetryIOException.class, e.getCause(i).getClass());
+        assertEquals("fail", Bytes.toString(e.getRow(i).getRow()));
+      }
+    } finally {
+      assertObjects(objs, batches.size());
+      assertTrue(objs[0] instanceof IOException);
+      assertTrue(objs[1] instanceof IOException);
+      assertEquals(Result.class, objs[2].getClass());
+      assertEquals(Result.class, objs[3].getClass());
+    }
+  }
+
+  @Test
+  public void testRowMutations() throws InterruptedException, IOException {
+    Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new 
byte[1]);
+    List<Row> batches = new ArrayList<>();
+    RowMutations mutations = new RowMutations(Bytes.toBytes("fail"));
+    // the first and second puts will be group by regionserver
+    mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new 
byte[CELL_SIZE]));
+    mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new 
byte[CELL_SIZE]));
+    batches.add(mutations);
+    // this bm should succeed
+    mutations = new RowMutations(Bytes.toBytes("good"));
+    mutations.add(put);
+    mutations.add(put);
+    batches.add(mutations);
+    Object[] objs = new Object[batches.size()];
+    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+      table.batch(batches, objs);
+      fail("Where is the exception? We put the malformed cells!!!");
+    } catch (RetriesExhaustedWithDetailsException e) {
+      assertEquals(1, e.getNumExceptions());
+      for (int i = 0; i != e.getNumExceptions(); ++i) {
+        assertNotNull(e.getCause(i));
+        assertTrue(e.getCause(i) instanceof IOException);
+        assertEquals("fail", Bytes.toString(e.getRow(i).getRow()));
+      }
+    } finally {
+      assertObjects(objs, batches.size());
+      assertTrue(objs[0] instanceof IOException);
+      assertEquals(Result.class, objs[1].getClass());
+    }
+  }
+
+  private static void assertObjects(Object[] objs, int expectedSize) {
+    int count = 0;
+    for (Object obj : objs) {
+      assertNotNull(obj);
+      ++count;
+    }
+    assertEquals(expectedSize, count);
+  }
 }

Reply via email to