Repository: kudu Updated Branches: refs/heads/master 1277f69a1 -> 72c83c571
[java client] update propagated TS for AUTO_FLUSH_SYNC flush mode Commit be6b81057 updated propagated timestamp for all write modes except AUTO_FLUSH_SYNC for the java client. This patch adds timestamp propagation for this mode and updates TestHybridTime to ensure propagated timestamp is updated for all flush modes. Change-Id: Ibf0ca58b10842cb15ed5db7bcd4694c4d8cc3a89 Reviewed-on: http://gerrit.cloudera.org:8080/8837 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/72c83c57 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/72c83c57 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/72c83c57 Branch: refs/heads/master Commit: 72c83c571120b0949c0e576fb8af0089861436e1 Parents: 1277f69 Author: hahao <[email protected]> Authored: Wed Dec 13 17:19:45 2017 -0800 Committer: Hao Hao <[email protected]> Committed: Mon Jan 8 20:01:19 2018 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/AsyncKuduClient.java | 10 ++++++++ .../apache/kudu/client/AsyncKuduSession.java | 11 ++++++++ .../java/org/apache/kudu/client/KuduClient.java | 10 ++++++++ .../org/apache/kudu/client/TestHybridTime.java | 27 +++++++++++--------- .../kudu/client/TestScannerMultiTablet.java | 2 ++ 5 files changed, 48 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/72c83c57/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index f23b342..dd282ff 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -333,6 +333,16 @@ public class AsyncKuduClient implements AutoCloseable { } /** + * Checks if the client received any timestamps from a server. Used for + * CLIENT_PROPAGATED external consistency. + * + * @return true if last propagated timestamp has been set + */ + public synchronized boolean hasLastPropagatedTimestamp() { + return lastPropagatedTimestamp != NO_TIMESTAMP; + } + + /** * Returns a synchronous {@link KuduClient} which wraps this asynchronous client. * Calling {@link KuduClient#close} on the returned client will close this client. * If this asynchronous client should outlive the returned synchronous client, http://git-wip-us.apache.org/repos/asf/kudu/blob/72c83c57/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java index b23cbba..4ee3ba3 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java @@ -530,7 +530,18 @@ public class AsyncKuduSession implements SessionConfiguration { } operation.setExternalConsistencyMode(this.consistencyMode); operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows); + + // Add a callback to update the propagated timestamp returned from the server. + Callback<Deferred<OperationResponse>, OperationResponse> cb = + new Callback<Deferred<OperationResponse>, OperationResponse>() { + @Override + public Deferred<OperationResponse> call(OperationResponse resp) throws Exception { + client.updateLastPropagatedTimestamp(resp.getWriteTimestampRaw()); + return Deferred.fromResult(resp); + } + }; return client.sendRpcToTablet(operation) + .addCallbackDeferring(cb) .addErrback(new SingleOperationErrCallback(operation)); } http://git-wip-us.apache.org/repos/asf/kudu/blob/72c83c57/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java index b4b1d80..0539d21 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java @@ -73,6 +73,16 @@ public class KuduClient implements AutoCloseable { } /** + * Checks if the client received any timestamps from a server. Used for + * CLIENT_PROPAGATED external consistency. + * + * @return true if last propagated timestamp has been set + */ + public boolean hasLastPropagatedTimestamp() { + return asyncClient.hasLastPropagatedTimestamp(); + } + + /** * Create a table on the cluster with the specified name, schema, and table configurations. * @param name the table's name * @param schema the table's schema http://git-wip-us.apache.org/repos/asf/kudu/blob/72c83c57/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java index 2655134..a19861c 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java @@ -87,23 +87,25 @@ public class TestHybridTime extends BaseKuduTest { */ @Test(timeout = 100000) public void test() throws Exception { - AsyncKuduSession session = client.newSession(); - session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_SYNC); + KuduSession session = syncClient.newSession(); + + // Test timestamp propagation with AUTO_FLUSH_SYNC flush mode. + session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC); session.setExternalConsistencyMode(CLIENT_PROPAGATED); long[] clockValues; long previousLogicalValue = 0; long previousPhysicalValue = 0; - // Test timestamp propagation with single operations String[] keys = new String[] {"1", "2", "3"}; for (int i = 0; i < keys.length; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); row.addString(schema.getColumnByIndex(0).getName(), keys[i]); - Deferred<OperationResponse> d = session.apply(insert); - OperationResponse response = d.join(DEFAULT_SLEEP); - assertTrue(response.getWriteTimestampRaw() != 0); - clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestampRaw()); + OperationResponse response = session.apply(insert); + assertTrue(client.hasLastPropagatedTimestamp()); + assertEquals(client.getLastPropagatedTimestamp(), + response.getWriteTimestampRaw()); + clockValues = HTTimestampToPhysicalAndLogical(client.getLastPropagatedTimestamp()); LOG.debug("Clock value after write[" + i + "]: " + new Date(clockValues[0] / 1000).toString() + " Logical value: " + clockValues[1]); // on the very first write we update the clock into the future @@ -123,7 +125,7 @@ public class TestHybridTime extends BaseKuduTest { } } - // Test timestamp propagation with Batches + // Test timestamp propagation with MANUAL_FLUSH flush mode. session.setFlushMode(AsyncKuduSession.FlushMode.MANUAL_FLUSH); keys = new String[] {"11", "22", "33"}; for (int i = 0; i < keys.length; i++) { @@ -131,14 +133,15 @@ public class TestHybridTime extends BaseKuduTest { PartialRow row = insert.getRow(); row.addString(schema.getColumnByIndex(0).getName(), keys[i]); session.apply(insert); - Deferred<List<OperationResponse>> d = session.flush(); - List<OperationResponse> responses = d.join(DEFAULT_SLEEP); + List<OperationResponse> responses = session.flush(); assertEquals("Response was not of the expected size: " + responses.size(), 1, responses.size()); OperationResponse response = responses.get(0); - assertTrue(response.getWriteTimestampRaw() != 0); - clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestampRaw()); + assertTrue(client.hasLastPropagatedTimestamp()); + assertEquals(client.getLastPropagatedTimestamp(), + response.getWriteTimestampRaw()); + clockValues = HTTimestampToPhysicalAndLogical(client.getLastPropagatedTimestamp()); LOG.debug("Clock value after write[" + i + "]: " + new Date(clockValues[0] / 1000).toString() + " Logical value: " + clockValues[1]); assertEquals(clockValues[0], previousPhysicalValue); http://git-wip-us.apache.org/repos/asf/kudu/blob/72c83c57/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java index 036f857..0365387 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java @@ -283,6 +283,8 @@ public class TestScannerMultiTablet extends BaseKuduTest { @Test(timeout = 100000) public void testScanTokenPropagatesTimestamp() throws Exception { + resetClients(); + // Initially, the client does not have the timestamp set. assertEquals(AsyncKuduClient.NO_TIMESTAMP, client.getLastPropagatedTimestamp()); assertEquals(KuduClient.NO_TIMESTAMP, syncClient.getLastPropagatedTimestamp());
