Repository: hbase
Updated Branches:
  refs/heads/master bd4007309 -> 096dac2e8


HBASE-18522 Add RowMutations support to Batch


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/096dac2e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/096dac2e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/096dac2e

Branch: refs/heads/master
Commit: 096dac2e83c675f212bad4f91888d8440ba152ca
Parents: bd40073
Author: Jerry He <jerry...@apache.org>
Authored: Mon Aug 14 10:39:46 2017 -0700
Committer: Jerry He <jerry...@apache.org>
Committed: Mon Aug 14 10:39:46 2017 -0700

----------------------------------------------------------------------
 .../hbase/client/MultiServerCallable.java       | 64 +++++++++++++++-----
 .../org/apache/hadoop/hbase/client/Table.java   |  4 +-
 .../hbase/shaded/protobuf/RequestConverter.java |  6 +-
 .../shaded/protobuf/ResponseConverter.java      | 37 ++++++++++-
 .../hbase/client/TestFromClientSide3.java       | 46 ++++++++++++++
 .../hadoop/hbase/client/TestMultiParallel.java  | 34 ++++++++++-
 6 files changed, 168 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 33c9a0b..7f6052e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -93,30 +94,64 @@ class MultiServerCallable extends 
CancellableRegionServerCallable<MultiResponse>
     RegionAction.Builder regionActionBuilder = RegionAction.newBuilder();
     ClientProtos.Action.Builder actionBuilder = 
ClientProtos.Action.newBuilder();
     MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
-    List<CellScannable> cells = null;
-    // The multi object is a list of Actions by region.  Iterate by region.
+
+    // Pre-size. Presume at least a KV per Action. There are likely more.
+    List<CellScannable> cells =
+        (this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null);
+
     long nonceGroup = multiAction.getNonceGroup();
     if (nonceGroup != HConstants.NO_NONCE) {
       multiRequestBuilder.setNonceGroup(nonceGroup);
     }
+    // Index to track RegionAction within the MultiRequest
+    int regionActionIndex = -1;
+    // Map from a created RegionAction to the original index for a 
RowMutations within
+    // its original list of actions
+    Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
+    // The multi object is a list of Actions by region. Iterate by region.
     for (Map.Entry<byte[], List<Action>> e: 
this.multiAction.actions.entrySet()) {
       final byte [] regionName = e.getKey();
       final List<Action> actions = e.getValue();
       regionActionBuilder.clear();
       regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
           HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, 
regionName));
-      if (this.cellBlock) {
-        // Pre-size. Presume at least a KV per Action.  There are likely more.
-        if (cells == null) cells = new ArrayList<>(countOfActions);
-        // Send data in cellblocks. The call to buildNoDataMultiRequest will 
skip RowMutations.
-        // They have already been handled above. Guess at count of cells
-        regionActionBuilder = 
RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
-          regionActionBuilder, actionBuilder, mutationBuilder);
-      } else {
-        regionActionBuilder = RequestConverter.buildRegionAction(regionName, 
actions,
-          regionActionBuilder, actionBuilder, mutationBuilder);
+
+      int rowMutations = 0;
+      for (Action action : actions) {
+        Row row = action.getAction();
+        // Row Mutations are a set of Puts and/or Deletes all to be applied 
atomically
+        // on the one row. We do separate RegionAction for each RowMutations.
+        // We maintain a map to keep track of this RegionAction and the 
original Action index.
+        if (row instanceof RowMutations) {
+          RowMutations rms = (RowMutations)row;
+          if (this.cellBlock) {
+            // Build a multi request absent its Cell payload. Send data in 
cellblocks.
+            regionActionBuilder = 
RequestConverter.buildNoDataRegionAction(regionName, rms, cells,
+              regionActionBuilder, actionBuilder, mutationBuilder);
+          } else {
+            regionActionBuilder = 
RequestConverter.buildRegionAction(regionName, rms);
+          }
+          regionActionBuilder.setAtomic(true);
+          multiRequestBuilder.addRegionAction(regionActionBuilder.build());
+          regionActionIndex++;
+          rowMutationsIndexMap.put(regionActionIndex, 
action.getOriginalIndex());
+          rowMutations++;
+        }
+      }
+
+      if (actions.size() > rowMutations) {
+        if (this.cellBlock) {
+          // Send data in cellblocks. The call to buildNoDataRegionAction will 
skip RowMutations.
+          // They have already been handled above. Guess at count of cells
+          regionActionBuilder = 
RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
+            regionActionBuilder, actionBuilder, mutationBuilder);
+        } else {
+          regionActionBuilder = RequestConverter.buildRegionAction(regionName, 
actions,
+            regionActionBuilder, actionBuilder, mutationBuilder);
+        }
+        multiRequestBuilder.addRegionAction(regionActionBuilder.build());
+        regionActionIndex++;
       }
-      multiRequestBuilder.addRegionAction(regionActionBuilder.build());
     }
 
     if (cells != null) {
@@ -125,7 +160,8 @@ class MultiServerCallable extends 
CancellableRegionServerCallable<MultiResponse>
     ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
     ClientProtos.MultiResponse responseProto = 
getStub().multi(getRpcController(), requestProto);
     if (responseProto == null) return null; // Occurs on cancel
-    return ResponseConverter.getResults(requestProto, responseProto, 
getRpcControllerCellScanner());
+    return ResponseConverter.getResults(requestProto, rowMutationsIndexMap, 
responseProto,
+      getRpcControllerCellScanner());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index cfe435e..a215903 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -111,12 +111,12 @@ public interface Table extends Closeable {
   boolean[] existsAll(List<Get> gets) throws IOException;
 
   /**
-   * Method that does a batch call on Deletes, Gets, Puts, Increments and 
Appends.
+   * Method that does a batch call on Deletes, Gets, Puts, Increments, 
Appends, RowMutations.
    * The ordering of execution of the actions is not defined. Meaning if you 
do a Put and a
    * Get in the same {@link #batch} call, you will not necessarily be
    * guaranteed that the Get returns what the Put had put.
    *
-   * @param actions list of Get, Put, Delete, Increment, Append objects
+   * @param actions list of Get, Put, Delete, Increment, Append, RowMutations.
    * @param results Empty Object[], same size as actions. Provides access to 
partial
    *                results, in case an exception is thrown. A null in the 
result array means that
    *                the call for that action failed, even after retries. The 
order of the objects

http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index be46c19..08ed3dc 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -668,7 +668,8 @@ public final class RequestConverter {
               .setMethodName(exec.getMethod().getName())
               .setRequest(value)));
       } else if (row instanceof RowMutations) {
-        throw new UnsupportedOperationException("No RowMutations in multi 
calls; use mutateRow");
+        // Skip RowMutations, which has been separately converted to 
RegionAction
+        continue;
       } else {
         throw new DoNotRetryIOException("Multi doesn't support " + 
row.getClass().getName());
       }
@@ -756,7 +757,8 @@ public final class RequestConverter {
               .setMethodName(exec.getMethod().getName())
               .setRequest(value)));
       } else if (row instanceof RowMutations) {
-        throw new UnsupportedOperationException("No RowMutations in multi 
calls; use mutateRow");
+        // Skip RowMutations, which has been separately converted to 
RegionAction
+        continue;
       } else {
         throw new DoNotRetryIOException("Multi doesn't support " + 
row.getClass().getName());
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index c489628..98e6f69 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -89,7 +89,8 @@ public final class ResponseConverter {
   /**
    * Get the results from a protocol buffer MultiResponse
    *
-   * @param request the protocol buffer MultiResponse to convert
+   * @param request the original protocol buffer MultiRequest
+   * @param response the protocol buffer MultiResponse to convert
    * @param cells Cells to go with the passed in <code>proto</code>.  Can be 
null.
    * @return the results that were in the MultiResponse (a Result or an 
Exception).
    * @throws IOException
@@ -97,6 +98,22 @@ public final class ResponseConverter {
   public static org.apache.hadoop.hbase.client.MultiResponse getResults(final 
MultiRequest request,
       final MultiResponse response, final CellScanner cells)
   throws IOException {
+    return getResults(request, null, response, cells);
+  }
+
+  /**
+   * Get the results from a protocol buffer MultiResponse
+   *
+   * @param request the original protocol buffer MultiRequest
+   * @param rowMutationsIndexMap Used to support RowMutations in batch
+   * @param response the protocol buffer MultiResponse to convert
+   * @param cells Cells to go with the passed in <code>proto</code>.  Can be 
null.
+   * @return the results that were in the MultiResponse (a Result or an 
Exception).
+   * @throws IOException
+   */
+  public static org.apache.hadoop.hbase.client.MultiResponse getResults(final 
MultiRequest request,
+      final Map<Integer, Integer> rowMutationsIndexMap, final MultiResponse 
response,
+      final CellScanner cells) throws IOException {
     int requestRegionActionCount = request.getRegionActionCount();
     int responseRegionActionResultCount = 
response.getRegionActionResultCount();
     if (requestRegionActionCount != responseRegionActionResultCount) {
@@ -130,8 +147,24 @@ public final class ResponseConverter {
             actionResult.getResultOrExceptionCount() + " for region " + 
actions.getRegion());
       }
 
+      Object responseValue;
+
+      // For RowMutations action, if there is an exception, the exception is 
set
+      // at the RegionActionResult level and the ResultOrException is null at 
the original index
+      Integer rowMutationsIndex =
+          (rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i));
+      if (rowMutationsIndex != null) {
+        // This RegionAction is from a RowMutations in a batch.
+        // If there is an exception from the server, the exception is set at
+        // the RegionActionResult level, which has been handled above.
+        responseValue = response.getProcessed() ?
+            ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
+            ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
+        results.add(regionName, rowMutationsIndex, responseValue);
+        continue;
+      }
+
       for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
-        Object responseValue;
         if (roe.hasException()) {
           responseValue = ProtobufUtil.toException(roe.getException());
         } else if (roe.hasResult()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 668bfbb..f20c050 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -310,6 +310,52 @@ public class TestFromClientSide3 {
   }
 
   @Test
+  public void testBatchWithRowMutation() throws Exception {
+    LOG.info("Starting testBatchWithRowMutation");
+    final TableName TABLENAME = TableName.valueOf("testBatchWithRowMutation");
+    try (Table t = TEST_UTIL.createTable(TABLENAME, FAMILY)) {
+      byte [][] QUALIFIERS = new byte [][] {
+        Bytes.toBytes("a"), Bytes.toBytes("b")
+      };
+      RowMutations arm = new RowMutations(ROW);
+      Put p = new Put(ROW);
+      p.addColumn(FAMILY, QUALIFIERS[0], VALUE);
+      arm.add(p);
+      Object[] batchResult = new Object[1];
+      t.batch(Arrays.asList(arm), batchResult);
+
+      Get g = new Get(ROW);
+      Result r = t.get(g);
+      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, 
QUALIFIERS[0])));
+
+      arm = new RowMutations(ROW);
+      p = new Put(ROW);
+      p.addColumn(FAMILY, QUALIFIERS[1], VALUE);
+      arm.add(p);
+      Delete d = new Delete(ROW);
+      d.addColumns(FAMILY, QUALIFIERS[0]);
+      arm.add(d);
+      t.batch(Arrays.asList(arm), batchResult);
+      r = t.get(g);
+      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, 
QUALIFIERS[1])));
+      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
+
+      // Test that we get the correct remote exception for RowMutations from 
batch()
+      try {
+        arm = new RowMutations(ROW);
+        p = new Put(ROW);
+        p.addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE);
+        arm.add(p);
+        t.batch(Arrays.asList(arm), batchResult);
+        fail("Expected RetriesExhaustedWithDetailsException with 
NoSuchColumnFamilyException");
+      } catch(RetriesExhaustedWithDetailsException e) {
+        String msg = e.getMessage();
+        assertTrue(msg.contains("NoSuchColumnFamilyException"));
+      }
+    }
+  }
+
+  @Test
   public void testHTableExistsMethodSingleRegionSingleGet() throws Exception {
       // Test with a single region table.
       Table table = TEST_UTIL.createTable(

http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index a3c9649..62b6ae5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -642,6 +642,23 @@ public class TestMultiParallel {
     put.addColumn(BYTES_FAMILY, qual2, val2);
     actions.add(put);
 
+    // 6 RowMutations
+    RowMutations rm = new RowMutations(KEYS[50]);
+    put = new Put(KEYS[50]);
+    put.addColumn(BYTES_FAMILY, qual2, val2);
+    rm.add(put);
+    byte[] qual3 = Bytes.toBytes("qual3");
+    byte[] val3 = Bytes.toBytes("putvalue3");
+    put = new Put(KEYS[50]);
+    put.addColumn(BYTES_FAMILY, qual3, val3);
+    rm.add(put);
+    actions.add(rm);
+
+    // 7 Add another Get to the mixed sequence after RowMutations
+    get = new Get(KEYS[10]);
+    get.addColumn(BYTES_FAMILY, QUALIFIER);
+    actions.add(get);
+
     results = new Object[actions.size()];
     table.batch(actions, results);
 
@@ -649,10 +666,11 @@ public class TestMultiParallel {
 
     validateResult(results[0]);
     validateResult(results[1]);
-    validateEmpty(results[2]);
     validateEmpty(results[3]);
     validateResult(results[4]);
     validateEmpty(results[5]);
+    validateEmpty(results[6]);
+    validateResult(results[7]);
 
     // validate last put, externally from the batch
     get = new Get(KEYS[40]);
@@ -660,6 +678,17 @@ public class TestMultiParallel {
     Result r = table.get(get);
     validateResult(r, qual2, val2);
 
+    // validate last RowMutations, externally from the batch
+    get = new Get(KEYS[50]);
+    get.addColumn(BYTES_FAMILY, qual2);
+    r = table.get(get);
+    validateResult(r, qual2, val2);
+
+    get = new Get(KEYS[50]);
+    get.addColumn(BYTES_FAMILY, qual3);
+    r = table.get(get);
+    validateResult(r, qual3, val3);
+
     table.close();
   }
 
@@ -736,8 +765,7 @@ public class TestMultiParallel {
   private void validateEmpty(Object r1) {
     Result result = (Result)r1;
     Assert.assertTrue(result != null);
-    Assert.assertTrue(result.getRow() == null);
-    Assert.assertEquals(0, result.rawCells().length);
+    Assert.assertTrue(result.isEmpty());
   }
 
   private void validateSizeAndEmpty(Object[] results, int expectedSize) {

Reply via email to