Repository: kudu Updated Branches: refs/heads/master 31a720fc3 -> 6626e109d
KUDU-2261: The order of the responses after flush should match the order we call apply The response list of flush() should have the same order of we apply operations, so it's easier to know which operation failed and which succeeded. Change-Id: Ib37c9e85ad03731bb7d5b83be77d40fcd95e803a Reviewed-on: http://gerrit.cloudera.org:8080/9029 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b6889d7a Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b6889d7a Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b6889d7a Branch: refs/heads/master Commit: b6889d7aced7e4b6ad40b63fbe2da117294f3803 Parents: 31a720f Author: zhangzhen <[email protected]> Authored: Tue Jan 16 18:49:53 2018 +0800 Committer: Todd Lipcon <[email protected]> Committed: Fri Jan 19 00:24:43 2018 +0000 ---------------------------------------------------------------------- .../apache/kudu/client/AsyncKuduSession.java | 25 ++++++++++++---- .../main/java/org/apache/kudu/client/Batch.java | 8 +++-- .../org/apache/kudu/client/BatchResponse.java | 17 +++++++++-- .../org/apache/kudu/client/TestKuduSession.java | 31 ++++++++++++++++++++ 4 files changed, 71 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/b6889d7a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java index 4ee3ba3..66b58b4 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java @@ -20,6 +20,7 @@ package org.apache.kudu.client; import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -342,7 +343,9 @@ public class AsyncKuduSession implements SessionConfiguration { // Group the operations by tablet. Map<Slice, Batch> batches = new HashMap<>(); List<OperationResponse> opsFailedInLookup = new ArrayList<>(); + List<Integer> opsFailedIndexesList = new ArrayList<>(); + int currentIndex = 0; for (BufferedOperation bufferedOp : buffer.getOperations()) { Operation operation = bufferedOp.getOperation(); if (bufferedOp.tabletLookupFailed()) { @@ -366,6 +369,7 @@ public class AsyncKuduSession implements SessionConfiguration { } operation.callback(response); opsFailedInLookup.add(response); + opsFailedIndexesList.add(currentIndex++); continue; } LocatedTablet tablet = bufferedOp.getTablet(); @@ -376,12 +380,13 @@ public class AsyncKuduSession implements SessionConfiguration { batch = new Batch(operation.getTable(), tablet, ignoreAllDuplicateRows); batches.put(tabletId, batch); } - batch.add(operation); + batch.add(operation, currentIndex++); } List<Deferred<BatchResponse>> batchResponses = new ArrayList<>(batches.size() + 1); if (!opsFailedInLookup.isEmpty()) { - batchResponses.add(Deferred.fromResult(new BatchResponse(opsFailedInLookup))); + batchResponses.add( + Deferred.fromResult(new BatchResponse(opsFailedInLookup, opsFailedIndexesList))); } for (Batch batch : batches.values()) { @@ -479,12 +484,18 @@ public class AsyncKuduSession implements SessionConfiguration { size += batchResponse.getIndividualResponses().size(); } - ArrayList<OperationResponse> responses = new ArrayList<>(size); + OperationResponse[] responses = new OperationResponse[size]; for (BatchResponse batchResponse : batchResponses) { - responses.addAll(batchResponse.getIndividualResponses()); + List<OperationResponse> responseList = batchResponse.getIndividualResponses(); + List<Integer> indexList = batchResponse.getResponseIndexes(); + for (int i = 0; i < indexList.size(); i++) { + int index = indexList.get(i); + assert responses[index] == null; + responses[index] = responseList.get(i); + } } - return responses; + return Arrays.asList(responses); } @Override @@ -690,6 +701,7 @@ public class AsyncKuduSession implements SessionConfiguration { */ private void addBatchCallbacks(final Batch request) { final class BatchCallback implements Callback<BatchResponse, BatchResponse> { + @Override public BatchResponse call(final BatchResponse response) { LOG.trace("Got a Batch response for {} rows", request.operations.size()); if (response.getWriteTimestamp() != 0) { @@ -746,7 +758,7 @@ public class AsyncKuduSession implements SessionConfiguration { // Note that returning an object that's not an exception will make us leave the // errback chain. Effectively, the BatchResponse below will end up as part of the list // passed to ConvertBatchToListOfResponsesCB. - return handleKuduException ? new BatchResponse(responses) : e; + return handleKuduException ? new BatchResponse(responses, request.operationIndexes) : e; } @Override @@ -786,6 +798,7 @@ public class AsyncKuduSession implements SessionConfiguration { * {@link FlushMode#AUTO_FLUSH_BACKGROUND}. */ private final class FlusherTask implements TimerTask { + @Override public void run(final Timeout timeout) { Buffer buffer = null; synchronized (monitor) { http://git-wip-us.apache.org/repos/asf/kudu/blob/b6889d7a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java index ff49137..f5ebd24 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java @@ -43,6 +43,8 @@ class Batch extends KuduRpc<BatchResponse> { /** Holds batched operations. */ final List<Operation> operations = new ArrayList<>(); + /** Holds indexes of operations in the original user's batch. */ + final List<Integer> operationIndexes = new ArrayList<>(); /** The tablet this batch will be routed to. */ private final LocatedTablet tablet; @@ -75,7 +77,7 @@ class Batch extends KuduRpc<BatchResponse> { return this.rowOperationsSizeBytes; } - public void add(Operation operation) { + public void add(Operation operation, int index) { assert Bytes.memcmp(operation.partitionKey(), tablet.getPartition().getPartitionKeyStart()) >= 0 && (tablet.getPartition().getPartitionKeyEnd().length == 0 || @@ -83,6 +85,7 @@ class Batch extends KuduRpc<BatchResponse> { tablet.getPartition().getPartitionKeyEnd()) < 0); operations.add(operation); + operationIndexes.add(index); } @Override @@ -127,7 +130,8 @@ class Batch extends KuduRpc<BatchResponse> { } BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(), tsUUID, - builder.getTimestamp(), errorsPB, operations); + builder.getTimestamp(), errorsPB, operations, + operationIndexes); if (injectedError != null) { if (injectedlatencyMs > 0) { http://git-wip-us.apache.org/repos/asf/kudu/blob/b6889d7a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java index d32793f..a426ac4 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java @@ -36,6 +36,7 @@ public class BatchResponse extends KuduRpcResponse { private final long writeTimestamp; private final List<RowError> rowErrors; private final List<OperationResponse> individualResponses; + private final List<Integer> responsesIndexes; /** * Package-private constructor to be used by the RPCs. @@ -43,13 +44,15 @@ public class BatchResponse extends KuduRpcResponse { * @param writeTimestamp HT's write timestamp * @param errorsPB a list of row errors, can be empty * @param operations the list of operations which created this response + * @param indexes the list of operations' order index */ BatchResponse(long elapsedMillis, String tsUUID, long writeTimestamp, List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB, - List<Operation> operations) { + List<Operation> operations, List<Integer> indexes) { super(elapsedMillis, tsUUID); this.writeTimestamp = writeTimestamp; individualResponses = new ArrayList<>(operations.size()); + this.responsesIndexes = indexes; if (errorsPB.isEmpty()) { rowErrors = Collections.emptyList(); } else { @@ -77,13 +80,15 @@ public class BatchResponse extends KuduRpcResponse { } assert (rowErrors.size() == errorsPB.size()); assert (individualResponses.size() == operations.size()); + assert (individualResponses.size() == responsesIndexes.size()); } - BatchResponse(List<OperationResponse> individualResponses) { + BatchResponse(List<OperationResponse> individualResponses, List<Integer> indexes) { super(0, null); writeTimestamp = 0; rowErrors = ImmutableList.of(); this.individualResponses = individualResponses; + this.responsesIndexes = indexes; } /** @@ -103,4 +108,12 @@ public class BatchResponse extends KuduRpcResponse { return individualResponses; } + /** + * Package-private method to get the responses' order index. + * @return a list of indexes + */ + List<Integer> getResponseIndexes() { + return responsesIndexes; + } + } http://git-wip-us.apache.org/repos/asf/kudu/blob/b6889d7a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java index 122ed21..582cda3 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java @@ -275,6 +275,37 @@ public class TestKuduSession extends BaseKuduTest { } @Test(timeout = 10000) + public void testInsertManualFlushResponseOrder() throws Exception { + String tableName = name.getMethodName(); + CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange(); + createOptions.setNumReplicas(1); + syncClient.createTable(tableName, basicSchema, createOptions); + KuduTable table = syncClient.openTable(tableName); + + KuduSession session = syncClient.newSession(); + session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); + + // Insert a batch of some valid and some invalid. + for (int i = 0; i < 10; i++) { + assertNull(session.apply(createBasicSchemaInsert(table, 100 + i * 10))); + assertNull(session.apply(createBasicSchemaInsert(table, 200 + i * 10))); + } + List<OperationResponse> results = session.flush(); + + assertEquals(20, results.size()); + + for (int i = 0; i < 20; i++) { + OperationResponse result = results.get(i); + if (i % 2 == 0) { + assertTrue(result.hasRowError()); + assertTrue(result.getRowError().getErrorStatus().isNotFound()); + } else { + assertTrue(!result.hasRowError()); + } + } + } + + @Test(timeout = 10000) public void testInsertAutoFlushSyncNonCoveredRange() throws Exception { String tableName = name.getMethodName(); CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();
