Repository: hbase
Updated Branches:
  refs/heads/master 0ec96a5ff -> 5bb2a2498


HBASE-18962 Support atomic (all or none) BatchOperations through batchMutate()

Signed-off-by: Apekshit Sharma <a...@apache.org>
Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5bb2a249
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5bb2a249
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5bb2a249

Branch: refs/heads/master
Commit: 5bb2a249847d391c9008590f2f92b95ff327cd0d
Parents: 0ec96a5
Author: Umesh Agashe <uaga...@cloudera.com>
Authored: Thu Oct 19 11:05:01 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Thu Nov 9 12:14:33 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  55 +++++--
 .../hbase/regionserver/RSRpcServices.java       |  34 +++--
 .../hadoop/hbase/regionserver/TestHRegion.java  | 147 +++++++++++++------
 3 files changed, 166 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5bb2a249/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f6890af..492325e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2952,6 +2952,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     protected final ObservedExceptionsInBatch observedExceptions;
     //Durability of the batch (highest durability of all operations)
     protected Durability durability;
+    protected boolean atomic = false;
 
     public BatchOperation(final HRegion region, T[] operations) {
       this.operations = operations;
@@ -3067,6 +3068,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       return getMutation(0).getClusterIds();
     }
 
+    boolean isAtomic() {
+      return atomic;
+    }
+
     /**
      * Helper method that checks and prepares only one mutation. This can be 
used to implement
      * {@link #checkAndPrepare()} for entire Batch.
@@ -3097,16 +3102,19 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         if (tmpDur.ordinal() > durability.ordinal()) {
           durability = tmpDur;
         }
-      } catch (NoSuchColumnFamilyException nscf) {
+      } catch (NoSuchColumnFamilyException nscfe) {
         final String msg = "No such column family in batch mutation. ";
         if (observedExceptions.hasSeenNoSuchFamily()) {
-          LOG.warn(msg + nscf.getMessage());
+          LOG.warn(msg + nscfe.getMessage());
         } else {
-          LOG.warn(msg, nscf);
+          LOG.warn(msg, nscfe);
           observedExceptions.sawNoSuchFamily();
         }
         retCodeDetails[index] = new OperationStatus(
-            OperationStatusCode.BAD_FAMILY, nscf.getMessage());
+            OperationStatusCode.BAD_FAMILY, nscfe.getMessage());
+        if (isAtomic()) { // fail, atomic means all or none
+          throw nscfe;
+        }
       } catch (FailedSanityCheckException fsce) {
         final String msg = "Batch Mutation did not pass sanity check. ";
         if (observedExceptions.hasSeenFailedSanityCheck()) {
@@ -3117,6 +3125,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         }
         retCodeDetails[index] = new OperationStatus(
             OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
+        if (isAtomic()) {
+          throw fsce;
+        }
       } catch (WrongRegionException we) {
         final String msg = "Batch mutation had a row that does not belong to 
this region. ";
         if (observedExceptions.hasSeenWrongRegion()) {
@@ -3127,6 +3138,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         }
         retCodeDetails[index] = new OperationStatus(
             OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
+        if (isAtomic()) {
+          throw we;
+        }
       }
     }
 
@@ -3150,15 +3164,22 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         // If we haven't got any rows in our batch, we should block to get the 
next one.
         RowLock rowLock = null;
         try {
-          rowLock = region.getRowLockInternal(mutation.getRow(), true);
+          // if atomic then get exclusive lock, else shared lock
+          rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic());
         } catch (TimeoutIOException e) {
           // We will retry when other exceptions, but we should stop if we 
timeout .
           throw e;
         } catch (IOException ioe) {
           LOG.warn("Failed getting lock, row=" + 
Bytes.toStringBinary(mutation.getRow()), ioe);
+          if (isAtomic()) { // fail, atomic means all or none
+            throw ioe;
+          }
         }
         if (rowLock == null) {
           // We failed to grab another lock
+          if (isAtomic()) {
+            throw new IOException("Can't apply all operations atomically!");
+          }
           break; // Stop acquiring more rows for this batch
         } else {
           acquiredRowLocks.add(rowLock);
@@ -3279,12 +3300,13 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * Batch of mutation operations. Base class is shared with {@link 
ReplayBatchOperation} as most
    * of the logic is same.
    */
-  private static class MutationBatchOperation extends BatchOperation<Mutation> 
{
+  static class MutationBatchOperation extends BatchOperation<Mutation> {
     private long nonceGroup;
     private long nonce;
-    public MutationBatchOperation(final HRegion region, Mutation[] operations, 
long nonceGroup,
-        long nonce) {
+    public MutationBatchOperation(final HRegion region, Mutation[] operations, 
boolean atomic,
+        long nonceGroup, long nonce) {
       super(region, operations);
+      this.atomic = atomic;
       this.nonceGroup = nonceGroup;
       this.nonce = nonce;
     }
@@ -3522,11 +3544,15 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           retCodeDetails[index] = OperationStatus.SUCCESS;
         }
       } else {
+        String msg = "Put/Delete mutations only supported in a batch";
         // In case of passing Append mutations along with the Puts and Deletes 
in batchMutate
         // mark the operation return code as failure so that it will not be 
considered in
         // the doMiniBatchMutation
-        retCodeDetails[index] = new 
OperationStatus(OperationStatusCode.FAILURE,
-            "Put/Delete mutations only supported in batchMutate() now");
+        retCodeDetails[index] = new 
OperationStatus(OperationStatusCode.FAILURE, msg);
+
+        if (isAtomic()) { // fail, atomic means all or none
+          throw new IOException(msg);
+        }
       }
     }
 
@@ -3582,7 +3608,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * Batch of mutations for replay. Base class is shared with {@link 
MutationBatchOperation} as most
    * of the logic is same.
    */
-  private static class ReplayBatchOperation extends 
BatchOperation<MutationReplay> {
+  static class ReplayBatchOperation extends BatchOperation<MutationReplay> {
     private long origLogSeqNum = 0;
     public ReplayBatchOperation(final HRegion region, MutationReplay[] 
operations,
         long origLogSeqNum) {
@@ -3695,11 +3721,16 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
   public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, 
long nonce)
       throws IOException {
+    return batchMutate(mutations, false, nonceGroup, nonce);
+  }
+
+  public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, 
long nonceGroup,
+      long nonce) throws IOException {
     // As it stands, this is used for 3 things
     //  * batchMutate with single mutation - put/delete, separate or from 
checkAndMutate.
     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
     // So nonces are not really ever used by HBase. They could be by coprocs, 
and checkAnd...
-    return batchMutate(new MutationBatchOperation(this, mutations, nonceGroup, 
nonce));
+    return batchMutate(new MutationBatchOperation(this, mutations, atomic, 
nonceGroup, nonce));
   }
 
   public OperationStatus[] batchMutate(Mutation[] mutations) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5bb2a249/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index e104936..4b3fa50 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1,4 +1,4 @@
-/**
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -604,9 +604,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
   /**
    * Mutate a list of rows atomically.
-   *
    * @param cellScanner if non-null, the mutation data -- the Cell content.
-   * @param comparator @throws IOException
    */
   private boolean checkAndRowMutate(final HRegion region, final 
List<ClientProtos.Action> actions,
                                     final CellScanner cellScanner, byte[] row, 
byte[] family, byte[] qualifier,
@@ -757,10 +755,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   /**
    * Run through the regionMutation <code>rm</code> and per Mutation, do the 
work, and then when
    * done, add an instance of a {@link ResultOrException} that corresponds to 
each Mutation.
-   * @param region
-   * @param actions
-   * @param cellScanner
-   * @param builder
    * @param cellsToReturn  Could be null. May be allocated in this method.  
This is what this
    * method returns as a 'result'.
    * @param closeCallBack the callback to be used with multigets
@@ -864,7 +858,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           if (type != MutationType.PUT && type != MutationType.DELETE && 
mutations != null &&
               !mutations.isEmpty()) {
             // Flush out any Puts or Deletes already collected.
-            doBatchOp(builder, region, quota, mutations, cellScanner, 
spaceQuotaEnforcement);
+            doBatchOp(builder, region, quota, mutations, cellScanner, 
spaceQuotaEnforcement, false);
             mutations.clear();
           }
           switch (type) {
@@ -925,7 +919,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
     // Finish up any outstanding mutations
     if (mutations != null && !mutations.isEmpty()) {
-      doBatchOp(builder, region, quota, mutations, cellScanner, 
spaceQuotaEnforcement);
+      try {
+        doBatchOp(builder, region, quota, mutations, cellScanner, 
spaceQuotaEnforcement, false);
+      } catch (IOException ioe) {
+        rpcServer.getMetrics().exception(ioe);
+        NameBytesPair pair = ResponseConverter.buildException(ioe);
+        resultOrExceptionBuilder.setException(pair);
+        context.incrementResponseExceptionSize(pair.getSerializedSize());
+        builder.addResultOrException(resultOrExceptionBuilder.build());
+      }
     }
     return cellsToReturn;
   }
@@ -955,7 +957,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   private void doBatchOp(final RegionActionResult.Builder builder, final 
HRegion region,
       final OperationQuota quota, final List<ClientProtos.Action> mutations,
-      final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
+      final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, 
boolean atomic)
+      throws IOException {
     Mutation[] mArray = new Mutation[mutations.size()];
     long before = EnvironmentEdgeManager.currentTime();
     boolean batchContainsPuts = false, batchContainsDelete = false;
@@ -967,7 +970,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
        * is the mutation belong to. We can't sort ClientProtos.Action array, 
since they
        * are bonded to cellscanners.
        */
-      Map<Mutation, ClientProtos.Action> mutationActionMap = new 
HashMap<Mutation, ClientProtos.Action>();
+      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
       int i = 0;
       for (ClientProtos.Action action: mutations) {
         MutationProto m = action.getMutation();
@@ -995,7 +998,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       // sort to improve lock efficiency
       Arrays.sort(mArray);
 
-      OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
+      OperationStatus[] codes = region.batchMutate(mArray, atomic, 
HConstants.NO_NONCE,
         HConstants.NO_NONCE);
       for (i = 0; i < codes.length; i++) {
         Mutation currentMutation = mArray[i];
@@ -1025,6 +1028,9 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         }
       }
     } catch (IOException ie) {
+      if (atomic) {
+        throw ie;
+      }
       for (int i = 0; i < mutations.size(); i++) {
         builder.addResultOrException(getResultOrException(ie, 
mutations.get(i).getIndex()));
       }
@@ -1130,7 +1136,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
   }
 
   // Exposed for testing
-  static interface LogDelegate {
+  interface LogDelegate {
     void logBatchWarning(String firstRegionName, int sum, int 
rowSizeWarnThreshold);
   }
 
@@ -3229,7 +3235,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
     } catch (IOException e) {
       addScannerLeaseBack(lease);
       throw new ServiceException(e);
-    };
+    }
     try {
       checkScanNextCallSeq(request, rsh);
     } catch (OutOfOrderScannerNextException e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5bb2a249/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 127f949..d538b15 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -1,4 +1,4 @@
-/**
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -130,6 +130,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
 import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
@@ -165,6 +166,7 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TestName;
 import org.junit.rules.TestRule;
 import org.mockito.ArgumentCaptor;
@@ -200,6 +202,7 @@ public class TestHRegion {
   @ClassRule
   public static final TestRule timeout =
       CategoryBasedTimeout.forClass(TestHRegion.class);
+  @Rule public final ExpectedException thrown = ExpectedException.none();
 
   private static final String COLUMN_FAMILY = "MyCF";
   private static final byte [] COLUMN_FAMILY_BYTES = 
Bytes.toBytes(COLUMN_FAMILY);
@@ -215,9 +218,11 @@ public class TestHRegion {
   // Test names
   protected TableName tableName;
   protected String method;
+  protected final byte[] qual = Bytes.toBytes("qual");
   protected final byte[] qual1 = Bytes.toBytes("qual1");
   protected final byte[] qual2 = Bytes.toBytes("qual2");
   protected final byte[] qual3 = Bytes.toBytes("qual3");
+  protected final byte[] value = Bytes.toBytes("value");
   protected final byte[] value1 = Bytes.toBytes("value1");
   protected final byte[] value2 = Bytes.toBytes("value2");
   protected final byte[] row = Bytes.toBytes("rowA");
@@ -1522,21 +1527,10 @@ public class TestHRegion {
 
   @Test
   public void testBatchPut_whileNoRowLocksHeld() throws IOException {
-    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
-    byte[] qual = Bytes.toBytes("qual");
-    byte[] val = Bytes.toBytes("val");
-    this.region = initHRegion(tableName, method, CONF, cf);
+    final Put[] puts = new Put[10];
     MetricsWALSource source = 
CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
     try {
-      long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
-      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
-
-      LOG.info("First a batch put with all valid puts");
-      final Put[] puts = new Put[10];
-      for (int i = 0; i < 10; i++) {
-        puts[i] = new Put(Bytes.toBytes("row_" + i));
-        puts[i].addColumn(cf, qual, val);
-      }
+      long syncs = prepareRegionForBachPut(puts, source, false);
 
       OperationStatus[] codes = this.region.batchMutate(puts);
       assertEquals(10, codes.length);
@@ -1546,7 +1540,7 @@ public class TestHRegion {
       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
 
       LOG.info("Next a batch put with one invalid family");
-      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val);
+      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
       codes = this.region.batchMutate(puts);
       assertEquals(10, codes.length);
       for (int i = 0; i < 10; i++) {
@@ -1563,21 +1557,12 @@ public class TestHRegion {
 
   @Test
   public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
-    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
-    byte[] qual = Bytes.toBytes("qual");
-    byte[] val = Bytes.toBytes("val");
-    this.region = initHRegion(tableName, method, CONF, cf);
+    final Put[] puts = new Put[10];
     MetricsWALSource source = 
CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
     try {
-      long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
-      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
+      long syncs = prepareRegionForBachPut(puts, source, false);
 
-      final Put[] puts = new Put[10];
-      for (int i = 0; i < 10; i++) {
-        puts[i] = new Put(Bytes.toBytes("row_" + i));
-        puts[i].addColumn(cf, qual, val);
-      }
-      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val);
+      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
 
       LOG.info("batchPut will have to break into four batches to avoid row 
locks");
       RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
@@ -1585,7 +1570,6 @@ public class TestHRegion {
       RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
       RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
 
-
       MultithreadedTestUtil.TestContext ctx = new 
MultithreadedTestUtil.TestContext(CONF);
       final AtomicReference<OperationStatus[]> retFromThread = new 
AtomicReference<>();
       final CountDownLatch startingPuts = new CountDownLatch(1);
@@ -1658,31 +1642,89 @@ public class TestHRegion {
       Thread.sleep(100);
       if (System.currentTimeMillis() - startWait > 10000) {
         fail(String.format("Timed out waiting for '%s' >= '%s', 
currentCount=%s", metricName,
-          expectedCount, currentCount));
+            expectedCount, currentCount));
       }
     }
   }
 
   @Test
-  public void testBatchPutWithTsSlop() throws Exception {
-    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
-    byte[] qual = Bytes.toBytes("qual");
-    byte[] val = Bytes.toBytes("val");
+  public void testAtomicBatchPut() throws IOException {
+    final Put[] puts = new Put[10];
+    MetricsWALSource source = 
CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
+    try {
+      long syncs = prepareRegionForBachPut(puts, source, false);
+
+      // 1. Straight forward case, should succeed
+      MutationBatchOperation batchOp = new MutationBatchOperation(region, 
puts, true,
+          HConstants.NO_NONCE, HConstants.NO_NONCE);
+      OperationStatus[] codes = this.region.batchMutate(batchOp);
+      assertEquals(10, codes.length);
+      for (int i = 0; i < 10; i++) {
+        assertEquals(OperationStatusCode.SUCCESS, 
codes[i].getOperationStatusCode());
+      }
+      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
 
+      // 2. Failed to get lock
+      RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
+      // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is 
locked in this
+      // thread, need to run {@link 
HRegion#batchMutate(HRegion.BatchOperation)} in different thread
+      MultithreadedTestUtil.TestContext ctx = new 
MultithreadedTestUtil.TestContext(CONF);
+      final AtomicReference<IOException> retFromThread = new 
AtomicReference<>();
+      final CountDownLatch finishedPuts = new CountDownLatch(1);
+      final MutationBatchOperation finalBatchOp = new 
MutationBatchOperation(region, puts, true,
+          HConstants
+          .NO_NONCE,
+          HConstants.NO_NONCE);
+      TestThread putter = new TestThread(ctx) {
+        @Override
+        public void doWork() throws IOException {
+          try {
+            region.batchMutate(finalBatchOp);
+          } catch (IOException ioe) {
+            LOG.error("test failed!", ioe);
+            retFromThread.set(ioe);
+          }
+          finishedPuts.countDown();
+        }
+      };
+      LOG.info("...starting put thread while holding locks");
+      ctx.addThread(putter);
+      ctx.startThreads();
+      LOG.info("...waiting for batch puts while holding locks");
+      try {
+        finishedPuts.await();
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted!", e);
+      } finally {
+        if (lock != null) {
+          lock.release();
+        }
+      }
+      assertNotNull(retFromThread.get());
+      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
+
+      // 3. Exception thrown in validation
+      LOG.info("Next a batch put with one invalid family");
+      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
+      batchOp = new MutationBatchOperation(region, puts, true, 
HConstants.NO_NONCE,
+          HConstants.NO_NONCE);
+      thrown.expect(NoSuchColumnFamilyException.class);
+      this.region.batchMutate(batchOp);
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(this.region);
+      this.region = null;
+    }
+  }
+
+  @Test
+  public void testBatchPutWithTsSlop() throws Exception {
     // add data with a timestamp that is too recent for range. Ensure assert
     CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
-    this.region = initHRegion(tableName, method, CONF, cf);
+    final Put[] puts = new Put[10];
+    MetricsWALSource source = 
CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
 
     try {
-      MetricsWALSource source = 
CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
-      long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
-      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
-
-      final Put[] puts = new Put[10];
-      for (int i = 0; i < 10; i++) {
-        puts[i] = new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100);
-        puts[i].addColumn(cf, qual, val);
-      }
+      long syncs = prepareRegionForBachPut(puts, source, true);
 
       OperationStatus[] codes = this.region.batchMutate(puts);
       assertEquals(10, codes.length);
@@ -1690,12 +1732,29 @@ public class TestHRegion {
         assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, 
codes[i].getOperationStatusCode());
       }
       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
-
     } finally {
       HBaseTestingUtility.closeRegionAndWAL(this.region);
       this.region = null;
     }
+  }
+
+  /**
+   * @return syncs initial syncTimeNumOps
+   */
+  private long prepareRegionForBachPut(final Put[] puts, final 
MetricsWALSource source,
+      boolean slop) throws IOException {
+    this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
+
+    LOG.info("First a batch put with all valid puts");
+    for (int i = 0; i < puts.length; i++) {
+      puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 
100) :
+          new Put(Bytes.toBytes("row_" + i));
+      puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
+    }
 
+    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
+    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
+    return syncs;
   }
 
   // 
////////////////////////////////////////////////////////////////////////////

Reply via email to