[java] KUDU-1679 Propagate timestamps for scans This is Java counterpart for 06bb52d2acc6d311144aa905101ec5d846096611.
Change-Id: I84d45ba395f2a3fc6b54591f4a45bb4f10435910 Reviewed-on: http://gerrit.cloudera.org:8080/5248 Reviewed-by: David Ribeiro Alves <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2f1a2a06 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2f1a2a06 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2f1a2a06 Branch: refs/heads/master Commit: 2f1a2a06d4fe719ef9f35338bc7403915517718a Parents: 16ce5e0 Author: Alexey Serbin <[email protected]> Authored: Mon Nov 28 12:15:09 2016 -0800 Committer: Alexey Serbin <[email protected]> Committed: Tue Nov 29 03:19:28 2016 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/AsyncKuduClient.java | 7 ++--- .../apache/kudu/client/AsyncKuduScanner.java | 18 +++++++++-- .../kudu/client/TestScannerMultiTablet.java | 33 ++++++++++++++++++-- 3 files changed, 49 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/2f1a2a06/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index 0de1fb8..5d7f912 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -207,20 +207,17 @@ public class AsyncKuduClient implements AutoCloseable { /** * Updates the last timestamp received from a server. Used for CLIENT_PROPAGATED - * external consistency. This is only publicly visible so that it can be set - * on tests, users should generally disregard this method. + * external consistency. * * @param lastPropagatedTimestamp the last timestamp received from a server */ - @VisibleForTesting public synchronized void updateLastPropagatedTimestamp(long lastPropagatedTimestamp) { - if (this.lastPropagatedTimestamp == -1 || + if (this.lastPropagatedTimestamp == NO_TIMESTAMP || this.lastPropagatedTimestamp < lastPropagatedTimestamp) { this.lastPropagatedTimestamp = lastPropagatedTimestamp; } } - @VisibleForTesting public synchronized long getLastPropagatedTimestamp() { return lastPropagatedTimestamp; } http://git-wip-us.apache.org/repos/asf/kudu/blob/2f1a2a06/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index 2642def..40906ae 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -380,6 +380,9 @@ public final class AsyncKuduScanner { // context of the same scan. htTimestamp = resp.scanTimestamp; } + if (resp.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) { + client.updateLastPropagatedTimestamp(resp.propagatedTimestamp); + } if (!resp.more || resp.scannerId == null) { scanFinished(); return Deferred.fromResult(resp.data); // there might be data to return @@ -657,14 +660,23 @@ public final class AsyncKuduScanner { */ private final long scanTimestamp; + /** + * The server timestamp to propagate, if set. If the server response does + * not contain propagation timestamp, this field is set to special value + * AsyncKuduClient.NO_TIMESTAMP + */ + private final long propagatedTimestamp; + Response(final byte[] scannerId, final RowResultIterator data, final boolean more, - final long scanTimestamp) { + final long scanTimestamp, + final long propagatedTimestamp) { this.scannerId = scannerId; this.data = data; this.more = more; this.scanTimestamp = scanTimestamp; + this.propagatedTimestamp = propagatedTimestamp; } public String toString() { @@ -817,7 +829,9 @@ public final class AsyncKuduScanner { } Response response = new Response(id, iterator, hasMore, resp.hasSnapTimestamp() ? resp.getSnapTimestamp() - : AsyncKuduClient.NO_TIMESTAMP); + : AsyncKuduClient.NO_TIMESTAMP, + resp.hasPropagatedTimestamp() ? resp.getPropagatedTimestamp() + : AsyncKuduClient.NO_TIMESTAMP); if (LOG.isDebugEnabled()) { LOG.debug(response.toString()); } http://git-wip-us.apache.org/repos/asf/kudu/blob/2f1a2a06/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java index ef62660..f8b14aa 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java @@ -200,8 +200,8 @@ public class TestScannerMultiTablet extends BaseKuduTest { // specified. Verify that the scanner timestamp is set from the tablet // server response. AsyncKuduScanner scanner = client.newScannerBuilder(table) - .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) - .build(); + .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) + .build(); assertEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp()); KuduScanner syncScanner = new KuduScanner(scanner); assertEquals(scanner.getReadMode(), syncScanner.getReadMode()); @@ -223,6 +223,35 @@ public class TestScannerMultiTablet extends BaseKuduTest { assertEquals(9, rowCount); } + @Test(timeout = 100000) + public void testScanPropagatesLatestTimestamp() throws Exception { + AsyncKuduScanner scanner = client.newScannerBuilder(table).build(); + // Initially, the client does not have the timestamp set. + assertEquals(AsyncKuduClient.NO_TIMESTAMP, client.getLastPropagatedTimestamp()); + KuduScanner syncScanner = new KuduScanner(scanner); + + assertTrue(syncScanner.hasMoreRows()); + assertEquals(AsyncKuduClient.NO_TIMESTAMP, client.getLastPropagatedTimestamp()); + + int rowCount = syncScanner.nextRows().getNumRows(); + // At this point, the call to the first tablet server should have been + // done already, so the client should have received the propagated timestamp + // in the scanner response. + long tsRef = client.getLastPropagatedTimestamp(); + assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, tsRef); + + assertTrue(syncScanner.hasMoreRows()); + while (syncScanner.hasMoreRows()) { + rowCount += syncScanner.nextRows().getNumRows(); + final long ts = client.getLastPropagatedTimestamp(); + // Next scan responses from tablet servers should move the propagated + // timestamp further. + assertTrue(ts > tsRef); + tsRef = ts; + } + assertNotEquals(0, rowCount); + } + private AsyncKuduScanner getScanner(String lowerBoundKeyOne, String lowerBoundKeyTwo, String exclusiveUpperBoundKeyOne,
