Repository: hbase Updated Branches: refs/heads/master bd4007309 -> 096dac2e8
HBASE-18522 Add RowMutations support to Batch Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/096dac2e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/096dac2e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/096dac2e Branch: refs/heads/master Commit: 096dac2e83c675f212bad4f91888d8440ba152ca Parents: bd40073 Author: Jerry He <jerry...@apache.org> Authored: Mon Aug 14 10:39:46 2017 -0700 Committer: Jerry He <jerry...@apache.org> Committed: Mon Aug 14 10:39:46 2017 -0700 ---------------------------------------------------------------------- .../hbase/client/MultiServerCallable.java | 64 +++++++++++++++----- .../org/apache/hadoop/hbase/client/Table.java | 4 +- .../hbase/shaded/protobuf/RequestConverter.java | 6 +- .../shaded/protobuf/ResponseConverter.java | 37 ++++++++++- .../hbase/client/TestFromClientSide3.java | 46 ++++++++++++++ .../hadoop/hbase/client/TestMultiParallel.java | 34 ++++++++++- 6 files changed, 168 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 33c9a0b..7f6052e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -93,30 +94,64 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse> RegionAction.Builder regionActionBuilder = RegionAction.newBuilder(); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); - List<CellScannable> cells = null; - // The multi object is a list of Actions by region. Iterate by region. + + // Pre-size. Presume at least a KV per Action. There are likely more. + List<CellScannable> cells = + (this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null); + long nonceGroup = multiAction.getNonceGroup(); if (nonceGroup != HConstants.NO_NONCE) { multiRequestBuilder.setNonceGroup(nonceGroup); } + // Index to track RegionAction within the MultiRequest + int regionActionIndex = -1; + // Map from a created RegionAction to the original index for a RowMutations within + // its original list of actions + Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>(); + // The multi object is a list of Actions by region. Iterate by region. for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) { final byte [] regionName = e.getKey(); final List<Action> actions = e.getValue(); regionActionBuilder.clear(); regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier( HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName)); - if (this.cellBlock) { - // Pre-size. Presume at least a KV per Action. There are likely more. - if (cells == null) cells = new ArrayList<>(countOfActions); - // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations. - // They have already been handled above. Guess at count of cells - regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells, - regionActionBuilder, actionBuilder, mutationBuilder); - } else { - regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions, - regionActionBuilder, actionBuilder, mutationBuilder); + + int rowMutations = 0; + for (Action action : actions) { + Row row = action.getAction(); + // Row Mutations are a set of Puts and/or Deletes all to be applied atomically + // on the one row. We do separate RegionAction for each RowMutations. + // We maintain a map to keep track of this RegionAction and the original Action index. + if (row instanceof RowMutations) { + RowMutations rms = (RowMutations)row; + if (this.cellBlock) { + // Build a multi request absent its Cell payload. Send data in cellblocks. + regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, cells, + regionActionBuilder, actionBuilder, mutationBuilder); + } else { + regionActionBuilder = RequestConverter.buildRegionAction(regionName, rms); + } + regionActionBuilder.setAtomic(true); + multiRequestBuilder.addRegionAction(regionActionBuilder.build()); + regionActionIndex++; + rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex()); + rowMutations++; + } + } + + if (actions.size() > rowMutations) { + if (this.cellBlock) { + // Send data in cellblocks. The call to buildNoDataRegionAction will skip RowMutations. + // They have already been handled above. Guess at count of cells + regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells, + regionActionBuilder, actionBuilder, mutationBuilder); + } else { + regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions, + regionActionBuilder, actionBuilder, mutationBuilder); + } + multiRequestBuilder.addRegionAction(regionActionBuilder.build()); + regionActionIndex++; } - multiRequestBuilder.addRegionAction(regionActionBuilder.build()); } if (cells != null) { @@ -125,7 +160,8 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse> ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); ClientProtos.MultiResponse responseProto = getStub().multi(getRpcController(), requestProto); if (responseProto == null) return null; // Occurs on cancel - return ResponseConverter.getResults(requestProto, responseProto, getRpcControllerCellScanner()); + return ResponseConverter.getResults(requestProto, rowMutationsIndexMap, responseProto, + getRpcControllerCellScanner()); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index cfe435e..a215903 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -111,12 +111,12 @@ public interface Table extends Closeable { boolean[] existsAll(List<Get> gets) throws IOException; /** - * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. + * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends, RowMutations. * The ordering of execution of the actions is not defined. Meaning if you do a Put and a * Get in the same {@link #batch} call, you will not necessarily be * guaranteed that the Get returns what the Put had put. * - * @param actions list of Get, Put, Delete, Increment, Append objects + * @param actions list of Get, Put, Delete, Increment, Append, RowMutations. * @param results Empty Object[], same size as actions. Provides access to partial * results, in case an exception is thrown. A null in the result array means that * the call for that action failed, even after retries. The order of the objects http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index be46c19..08ed3dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -668,7 +668,8 @@ public final class RequestConverter { .setMethodName(exec.getMethod().getName()) .setRequest(value))); } else if (row instanceof RowMutations) { - throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow"); + // Skip RowMutations, which has been separately converted to RegionAction + continue; } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } @@ -756,7 +757,8 @@ public final class RequestConverter { .setMethodName(exec.getMethod().getName()) .setRequest(value))); } else if (row instanceof RowMutations) { - throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow"); + // Skip RowMutations, which has been separately converted to RegionAction + continue; } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java index c489628..98e6f69 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java @@ -89,7 +89,8 @@ public final class ResponseConverter { /** * Get the results from a protocol buffer MultiResponse * - * @param request the protocol buffer MultiResponse to convert + * @param request the original protocol buffer MultiRequest + * @param response the protocol buffer MultiResponse to convert * @param cells Cells to go with the passed in <code>proto</code>. Can be null. * @return the results that were in the MultiResponse (a Result or an Exception). * @throws IOException @@ -97,6 +98,22 @@ public final class ResponseConverter { public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request, final MultiResponse response, final CellScanner cells) throws IOException { + return getResults(request, null, response, cells); + } + + /** + * Get the results from a protocol buffer MultiResponse + * + * @param request the original protocol buffer MultiRequest + * @param rowMutationsIndexMap Used to support RowMutations in batch + * @param response the protocol buffer MultiResponse to convert + * @param cells Cells to go with the passed in <code>proto</code>. Can be null. + * @return the results that were in the MultiResponse (a Result or an Exception). + * @throws IOException + */ + public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request, + final Map<Integer, Integer> rowMutationsIndexMap, final MultiResponse response, + final CellScanner cells) throws IOException { int requestRegionActionCount = request.getRegionActionCount(); int responseRegionActionResultCount = response.getRegionActionResultCount(); if (requestRegionActionCount != responseRegionActionResultCount) { @@ -130,8 +147,24 @@ public final class ResponseConverter { actionResult.getResultOrExceptionCount() + " for region " + actions.getRegion()); } + Object responseValue; + + // For RowMutations action, if there is an exception, the exception is set + // at the RegionActionResult level and the ResultOrException is null at the original index + Integer rowMutationsIndex = + (rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i)); + if (rowMutationsIndex != null) { + // This RegionAction is from a RowMutations in a batch. + // If there is an exception from the server, the exception is set at + // the RegionActionResult level, which has been handled above. + responseValue = response.getProcessed() ? + ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : + ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE; + results.add(regionName, rowMutationsIndex, responseValue); + continue; + } + for (ResultOrException roe : actionResult.getResultOrExceptionList()) { - Object responseValue; if (roe.hasException()) { responseValue = ProtobufUtil.toException(roe.getException()); } else if (roe.hasResult()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 668bfbb..f20c050 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -310,6 +310,52 @@ public class TestFromClientSide3 { } @Test + public void testBatchWithRowMutation() throws Exception { + LOG.info("Starting testBatchWithRowMutation"); + final TableName TABLENAME = TableName.valueOf("testBatchWithRowMutation"); + try (Table t = TEST_UTIL.createTable(TABLENAME, FAMILY)) { + byte [][] QUALIFIERS = new byte [][] { + Bytes.toBytes("a"), Bytes.toBytes("b") + }; + RowMutations arm = new RowMutations(ROW); + Put p = new Put(ROW); + p.addColumn(FAMILY, QUALIFIERS[0], VALUE); + arm.add(p); + Object[] batchResult = new Object[1]; + t.batch(Arrays.asList(arm), batchResult); + + Get g = new Get(ROW); + Result r = t.get(g); + assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); + + arm = new RowMutations(ROW); + p = new Put(ROW); + p.addColumn(FAMILY, QUALIFIERS[1], VALUE); + arm.add(p); + Delete d = new Delete(ROW); + d.addColumns(FAMILY, QUALIFIERS[0]); + arm.add(d); + t.batch(Arrays.asList(arm), batchResult); + r = t.get(g); + assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); + assertNull(r.getValue(FAMILY, QUALIFIERS[0])); + + // Test that we get the correct remote exception for RowMutations from batch() + try { + arm = new RowMutations(ROW); + p = new Put(ROW); + p.addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE); + arm.add(p); + t.batch(Arrays.asList(arm), batchResult); + fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); + } catch(RetriesExhaustedWithDetailsException e) { + String msg = e.getMessage(); + assertTrue(msg.contains("NoSuchColumnFamilyException")); + } + } + } + + @Test public void testHTableExistsMethodSingleRegionSingleGet() throws Exception { // Test with a single region table. Table table = TEST_UTIL.createTable( http://git-wip-us.apache.org/repos/asf/hbase/blob/096dac2e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index a3c9649..62b6ae5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -642,6 +642,23 @@ public class TestMultiParallel { put.addColumn(BYTES_FAMILY, qual2, val2); actions.add(put); + // 6 RowMutations + RowMutations rm = new RowMutations(KEYS[50]); + put = new Put(KEYS[50]); + put.addColumn(BYTES_FAMILY, qual2, val2); + rm.add(put); + byte[] qual3 = Bytes.toBytes("qual3"); + byte[] val3 = Bytes.toBytes("putvalue3"); + put = new Put(KEYS[50]); + put.addColumn(BYTES_FAMILY, qual3, val3); + rm.add(put); + actions.add(rm); + + // 7 Add another Get to the mixed sequence after RowMutations + get = new Get(KEYS[10]); + get.addColumn(BYTES_FAMILY, QUALIFIER); + actions.add(get); + results = new Object[actions.size()]; table.batch(actions, results); @@ -649,10 +666,11 @@ public class TestMultiParallel { validateResult(results[0]); validateResult(results[1]); - validateEmpty(results[2]); validateEmpty(results[3]); validateResult(results[4]); validateEmpty(results[5]); + validateEmpty(results[6]); + validateResult(results[7]); // validate last put, externally from the batch get = new Get(KEYS[40]); @@ -660,6 +678,17 @@ public class TestMultiParallel { Result r = table.get(get); validateResult(r, qual2, val2); + // validate last RowMutations, externally from the batch + get = new Get(KEYS[50]); + get.addColumn(BYTES_FAMILY, qual2); + r = table.get(get); + validateResult(r, qual2, val2); + + get = new Get(KEYS[50]); + get.addColumn(BYTES_FAMILY, qual3); + r = table.get(get); + validateResult(r, qual3, val3); + table.close(); } @@ -736,8 +765,7 @@ public class TestMultiParallel { private void validateEmpty(Object r1) { Result result = (Result)r1; Assert.assertTrue(result != null); - Assert.assertTrue(result.getRow() == null); - Assert.assertEquals(0, result.rawCells().length); + Assert.assertTrue(result.isEmpty()); } private void validateSizeAndEmpty(Object[] results, int expectedSize) {