Repository: kudu Updated Branches: refs/heads/master 1a0099f91 -> 2f1a2a06d
[java] Reuse snapshot scan timestamp across tablets KUDU-1189 On reads at a snapshot that touch multiple tablets, without the user setting a timestamp, use the timestamp from the first server for following scans. For a READ_AT_SNAPSHOT scan operation with no snapshot timestamp specified, store the snapshot timestamp returned from the first tablet server into the scan configuration object. Then reuse it when continuing the scan on other tablet servers operations performed at other tablet servers. Added corresponding unit test as well. Change-Id: I7207672f7b0cf1307bfa861bda3291b278618016 Reviewed-on: http://gerrit.cloudera.org:8080/5188 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/16ce5e02 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/16ce5e02 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/16ce5e02 Branch: refs/heads/master Commit: 16ce5e02466a18e591672a84bc9a4bd6db7d00ea Parents: 1a0099f Author: Alexey Serbin <[email protected]> Authored: Tue Nov 22 11:03:50 2016 -0800 Committer: Alexey Serbin <[email protected]> Committed: Tue Nov 29 02:23:50 2016 +0000 ---------------------------------------------------------------------- .../apache/kudu/client/AsyncKuduScanner.java | 39 +++++++++++++++++--- .../kudu/client/TestScannerMultiTablet.java | 31 ++++++++++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/16ce5e02/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index dadcb34..2642def 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -178,7 +178,7 @@ public final class AsyncKuduScanner { private final Common.OrderMode orderMode; - private final long htTimestamp; + private long htTimestamp; private final ReplicaSelection replicaSelection; @@ -368,11 +368,18 @@ public final class AsyncKuduScanner { if (closed) { // We're already done scanning. return Deferred.fromResult(null); } else if (tablet == null) { - Callback<Deferred<RowResultIterator>, AsyncKuduScanner.Response> cb = new Callback<Deferred<RowResultIterator>, Response>() { @Override public Deferred<RowResultIterator> call(Response resp) throws Exception { + if (htTimestamp == AsyncKuduClient.NO_TIMESTAMP && + resp.scanTimestamp != AsyncKuduClient.NO_TIMESTAMP) { + // If the server-assigned timestamp is present in the tablet + // server's response, store it in the scanner. The stored value + // is used for read operations at other tablet servers in the + // context of the same scan. + htTimestamp = resp.scanTimestamp; + } if (!resp.more || resp.scannerId == null) { scanFinished(); return Deferred.fromResult(resp.data); // there might be data to return @@ -639,17 +646,35 @@ public final class AsyncKuduScanner { */ private final boolean more; + /** + * Server-assigned timestamp for the scan operation. It's used when + * the scan operates in READ_AT_SNAPSHOT mode and the timestamp is not + * specified explicitly. The field is set with the snapshot timestamp sent + * in the response from the very first tablet server contacted while + * fetching data from corresponding tablets. If the tablet server does not + * send the snapshot timestamp in its response, this field is assigned + * a special value AsyncKuduClient.NO_TIMESTAMP. + */ + private final long scanTimestamp; + Response(final byte[] scannerId, final RowResultIterator data, - final boolean more) { + final boolean more, + final long scanTimestamp) { this.scannerId = scannerId; this.data = data; this.more = more; + this.scanTimestamp = scanTimestamp; } public String toString() { - return "AsyncKuduScanner$Response(scannerId=" + Bytes.pretty(scannerId) + - ", data=" + data + ", more=" + more + ") "; + String ret = "AsyncKuduScanner$Response(scannerId=" + Bytes.pretty(scannerId) + ", data=" + data + + ", more=" + more; + if (scanTimestamp != AsyncKuduClient.NO_TIMESTAMP) { + ret += ", responseScanTimestamp =" + scanTimestamp; + } + ret += ")"; + return ret; } } @@ -790,7 +815,9 @@ public final class AsyncKuduScanner { Bytes.pretty(scannerId)); throw new NonRecoverableException(statusIllegalState); } - Response response = new Response(id, iterator, hasMore); + Response response = new Response(id, iterator, hasMore, + resp.hasSnapTimestamp() ? resp.getSnapTimestamp() + : AsyncKuduClient.NO_TIMESTAMP); if (LOG.isDebugEnabled()) { LOG.debug(response.toString()); } http://git-wip-us.apache.org/repos/asf/kudu/blob/16ce5e02/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java index 1165ea4..ef62660 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java @@ -18,7 +18,9 @@ package org.apache.kudu.client; import static org.apache.kudu.Type.STRING; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -192,6 +194,35 @@ public class TestScannerMultiTablet extends BaseKuduTest { assertEquals(9, countRowsInScan(scanner)); } + @Test(timeout = 100000) + public void testReadAtSnapshotNoTimestamp() throws Exception { + // Perform scan in READ_AT_SNAPSHOT mode with no snapshot timestamp + // specified. Verify that the scanner timestamp is set from the tablet + // server response. + AsyncKuduScanner scanner = client.newScannerBuilder(table) + .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) + .build(); + assertEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp()); + KuduScanner syncScanner = new KuduScanner(scanner); + assertEquals(scanner.getReadMode(), syncScanner.getReadMode()); + + assertTrue(syncScanner.hasMoreRows()); + assertEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp()); + + int rowCount = syncScanner.nextRows().getNumRows(); + // At this point, the call to the first tablet server should have been + // done already, so check the snapshot timestamp. + final long tsRef = scanner.getSnapshotTimestamp(); + assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, tsRef); + + assertTrue(syncScanner.hasMoreRows()); + while (syncScanner.hasMoreRows()) { + rowCount += syncScanner.nextRows().getNumRows(); + assertEquals(tsRef, scanner.getSnapshotTimestamp()); + } + assertEquals(9, rowCount); + } + private AsyncKuduScanner getScanner(String lowerBoundKeyOne, String lowerBoundKeyTwo, String exclusiveUpperBoundKeyOne,
