KUDU-1704: add java client support for READ_YOUR_WRITES mode Change-Id: I6239521c022147257859e399f55c6f3f945af465 Reviewed-on: http://gerrit.cloudera.org:8080/8847 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0c05e837 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0c05e837 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0c05e837 Branch: refs/heads/master Commit: 0c05e8375d005a91d37acd3102a22ddbe92382d5 Parents: 8aa75d8 Author: hahao <[email protected]> Authored: Thu Dec 14 17:20:23 2017 -0800 Committer: Hao Hao <[email protected]> Committed: Mon Mar 12 21:12:36 2018 +0000 ---------------------------------------------------------------------- .../apache/kudu/client/AsyncKuduScanner.java | 61 +++++++++-- .../org/apache/kudu/client/KuduScanToken.java | 4 + .../org/apache/kudu/client/TestKuduClient.java | 109 +++++++++++++++++++ .../kudu/client/TestScannerMultiTablet.java | 69 ++++++++++++ 4 files changed, 236 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/0c05e837/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index 9f5c137..0863148 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -118,7 +118,21 @@ public final class AsyncKuduScanner { * are sometimes not externally consistent even when action was taken to make them so. * In these cases Isolation may degenerate to mode "Read Committed". See KUDU-430. */ - READ_AT_SNAPSHOT(Common.ReadMode.READ_AT_SNAPSHOT); + READ_AT_SNAPSHOT(Common.ReadMode.READ_AT_SNAPSHOT), + + /** + * When @c READ_YOUR_WRITES is specified, the client will perform a read + * such that it follows all previously known writes and reads from this client. + * Specifically this mode: + * (1) ensures read-your-writes and read-your-reads session guarantees, + * (2) minimizes latency caused by waiting for outstanding write + * transactions to complete. + * + * Reads in this mode are not repeatable: two READ_YOUR_WRITES reads, even if + * they provide the same propagated timestamp bound, can execute at different + * timestamps and thus may return different results. + */ + READ_YOUR_WRITES(Common.ReadMode.READ_YOUR_WRITES); private Common.ReadMode pbVersion; ReadMode(Common.ReadMode pbVersion) { @@ -183,6 +197,8 @@ public final class AsyncKuduScanner { private long htTimestamp; + private long lowerBoundPropagationTimestamp = AsyncKuduClient.NO_TIMESTAMP; + private final ReplicaSelection replicaSelection; ///////////////////// @@ -293,6 +309,13 @@ public final class AsyncKuduScanner { } this.replicaSelection = replicaSelection; + + // For READ_YOUR_WRITES scan mode, get the latest observed timestamp + // and store it. Always use this one as the propagated timestamp for + // the duration of the scan to avoid unnecessary wait. + if (readMode == ReadMode.READ_YOUR_WRITES) { + this.lowerBoundPropagationTimestamp = this.client.getLastPropagatedTimestamp(); + } } /** @@ -389,8 +412,24 @@ public final class AsyncKuduScanner { // context of the same scan. htTimestamp = resp.scanTimestamp; } - if (resp.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) { - client.updateLastPropagatedTimestamp(resp.propagatedTimestamp); + + long lastPropagatedTimestamp = AsyncKuduClient.NO_TIMESTAMP; + if (readMode == ReadMode.READ_YOUR_WRITES && + resp.scanTimestamp != AsyncKuduClient.NO_TIMESTAMP) { + // For READ_YOUR_WRITES mode, update the latest propagated timestamp + // with the chosen snapshot timestamp sent back from the server, to + // avoid unnecessarily wait for subsequent reads. Since as long as + // the chosen snapshot timestamp of the next read is greater than + // the previous one, the scan does not violate READ_YOUR_WRITES + // session guarantees. + lastPropagatedTimestamp = resp.scanTimestamp; + } else if (resp.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) { + // Otherwise we just use the propagated timestamp returned from + // the server as the latest propagated timestamp. + lastPropagatedTimestamp = resp.propagatedTimestamp; + } + if (lastPropagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) { + client.updateLastPropagatedTimestamp(lastPropagatedTimestamp); } if (isFaultTolerant && resp.lastPrimaryKey != null) { @@ -694,7 +733,7 @@ public final class AsyncKuduScanner { /** * The server timestamp to propagate, if set. If the server response does - * not contain propagation timestamp, this field is set to special value + * not contain propagated timestamp, this field is set to special value * AsyncKuduClient.NO_TIMESTAMP */ private final long propagatedTimestamp; @@ -785,9 +824,17 @@ public final class AsyncKuduScanner { newBuilder.setTabletId(UnsafeByteOperations.unsafeWrap(tablet.getTabletIdAsBytes())); newBuilder.setOrderMode(AsyncKuduScanner.this.getOrderMode()); newBuilder.setCacheBlocks(cacheBlocks); - // if the last propagated timestamp is set send it with the scan - if (table.getAsyncClient().getLastPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP) { - newBuilder.setPropagatedTimestamp(table.getAsyncClient().getLastPropagatedTimestamp()); + // If the last propagated timestamp is set, send it with the scan. + // For READ_YOUR_WRITES scan, use the propagated timestamp from + // the scanner. + long timestamp; + if (readMode == ReadMode.READ_YOUR_WRITES) { + timestamp = lowerBoundPropagationTimestamp; + } else { + timestamp = table.getAsyncClient().getLastPropagatedTimestamp(); + } + if (timestamp != AsyncKuduClient.NO_TIMESTAMP) { + newBuilder.setPropagatedTimestamp(timestamp); } newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion()); http://git-wip-us.apache.org/repos/asf/kudu/blob/0c05e837/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java index 0f520f9..a4ee2ab 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java @@ -219,6 +219,10 @@ public class KuduScanToken implements Comparable<KuduScanToken> { builder.readMode(AsyncKuduScanner.ReadMode.READ_LATEST); break; } + case READ_YOUR_WRITES: { + builder.readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES); + break; + } default: throw new IllegalArgumentException("unknown read mode"); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/0c05e837/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java index f655222..52e8fdb 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java @@ -25,6 +25,7 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -35,7 +36,10 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableList; @@ -1038,4 +1042,109 @@ public class TestKuduClient extends BaseKuduTest { // If createTable() was disrupted by the alterTable(), this will throw. d.join(); } + + // This is a test that verifies, when multiple clients run + // simultaneously, a client can get read-your-writes and + // read-your-reads session guarantees using READ_YOUR_WRITES + // scan mode, from leader replica. In this test writes are + // performed in AUTO_FLUSH_SYNC (single operation) flush modes. + @Test(timeout = 100000) + public void testReadYourWritesSyncLeaderReplica() throws Exception { + readYourWrites(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC, + ReplicaSelection.LEADER_ONLY); + } + + // Similar test as above but scan from the closest replica. + @Test(timeout = 100000) + public void testReadYourWritesSyncClosestReplica() throws Exception { + readYourWrites(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC, + ReplicaSelection.CLOSEST_REPLICA); + } + + // Similar to testReadYourWritesSyncLeaderReplica, but in this + // test writes are performed in MANUAL_FLUSH (batches) flush modes. + @Test(timeout = 100000) + public void testReadYourWritesBatchLeaderReplica() throws Exception { + readYourWrites(SessionConfiguration.FlushMode.MANUAL_FLUSH, + ReplicaSelection.LEADER_ONLY); + } + + // Similar test as above but scan from the closest replica. + @Test(timeout = 100000) + public void testReadYourWritesBatchClosestReplica() throws Exception { + readYourWrites(SessionConfiguration.FlushMode.MANUAL_FLUSH, + ReplicaSelection.CLOSEST_REPLICA); + } + + private void readYourWrites(final SessionConfiguration.FlushMode flushMode, + final ReplicaSelection replicaSelection) + throws Exception { + Schema schema = createManyStringsSchema(); + syncClient.createTable(tableName, schema, createTableOptions()); + + final int tasksNum = 4; + List<Callable<Void>> callables = new ArrayList<>(); + for (int t = 0; t < tasksNum; t++) { + Callable<Void> callable = new Callable<Void>() { + @Override + public Void call() throws Exception { + // From the same client continuously performs inserts to a tablet + // in the given flush mode. + KuduSession session = syncClient.newSession(); + session.setFlushMode(flushMode); + KuduTable table = syncClient.openTable(tableName); + for (int i = 0; i < 3; i++) { + for (int j = 100 * i; j < 100 * (i + 1); j++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addString("key", String.format("key_%02d", j)); + row.addString("c1", "c1_" + j); + row.addString("c2", "c2_" + j); + row.addString("c3", "c3_" + j); + session.apply(insert); + } + session.flush(); + session.close(); + + // Perform a bunch of READ_YOUR_WRITES scans to all the replicas + // that count the rows. And verify that the count of the rows + // never go down from what previously observed, to ensure subsequent + // reads will not "go back in time" regarding writes that other + // clients have done. + for (int k = 0; k < 3; k++) { + AsyncKuduScanner scanner = client.newScannerBuilder(table) + .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES) + .replicaSelection(replicaSelection) + .build(); + KuduScanner syncScanner = new KuduScanner(scanner); + long preTs = client.getLastPropagatedTimestamp(); + assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, + client.getLastPropagatedTimestamp()); + + long row_count = countRowsInScan(syncScanner); + long expected_count = 100 * (i + 1); + assertTrue(expected_count <= row_count); + + // After the scan, verify that the chosen snapshot timestamp is + // returned from the server and it is larger than the previous + // propagated timestamp. + assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp()); + assertTrue(preTs < scanner.getSnapshotTimestamp()); + syncScanner.close(); + } + } + return null; + } + }; + callables.add(callable); + } + ExecutorService executor = Executors.newFixedThreadPool(tasksNum); + List<Future<Void>> futures = executor.invokeAll(callables); + + // Waits for the spawn tasks to complete, and then retrieves the results. + // Any exceptions or assertion errors in the spawn tasks will be thrown here. + for (Future<Void> future : futures) { + future.get(); + } + } } http://git-wip-us.apache.org/repos/asf/kudu/blob/0c05e837/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java index 0365387..22a5bed 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java @@ -240,6 +240,62 @@ public class TestScannerMultiTablet extends BaseKuduTest { assertEquals(9, rowCount); } + // Test multi tablets scan in READ_YOUR_WRITES mode for both AUTO_FLUSH_SYNC + // (single operation) and MANUAL_FLUSH (batches) flush modes to ensure + // client-local read-your-writes. + @Test(timeout = 100000) + public void testReadYourWrites() throws Exception { + // Perform scan in READ_YOUR_WRITES mode. Before the scan, verify that the + // propagated timestamp is set via previous write session while snapshot + // timestamp is not. + AsyncKuduScanner scanner = client.newScannerBuilder(table) + .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES) + .build(); + KuduScanner syncScanner = new KuduScanner(scanner); + assertEquals(scanner.getReadMode(), syncScanner.getReadMode()); + long preTs = client.getLastPropagatedTimestamp(); + assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, client.getLastPropagatedTimestamp()); + assertEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp()); + + assertEquals(9, countRowsInScan(syncScanner)); + + // After the scan, verify that the chosen snapshot timestamp is + // returned from the server and it is larger than the previous + // propagated timestamp. + assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp()); + assertTrue(preTs < scanner.getSnapshotTimestamp()); + syncScanner.close(); + + // Perform write in MANUAL_FLUSH (batch) mode. + KuduSession session = syncClient.newSession(); + session.setFlushMode(KuduSession.FlushMode.MANUAL_FLUSH); + String[] keys = new String[] {"11", "22", "33"}; + for (int i = 0; i < keys.length; i++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addString(schema.getColumnByIndex(0).getName(), keys[i]); + row.addString(schema.getColumnByIndex(1).getName(), keys[i]); + session.apply(insert); + } + session.flush(); + session.close(); + + scanner = client.newScannerBuilder(table) + .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES) + .build(); + syncScanner = new KuduScanner(scanner); + assertTrue(preTs < client.getLastPropagatedTimestamp()); + preTs = client.getLastPropagatedTimestamp(); + + assertEquals(12, countRowsInScan(syncScanner)); + + // After the scan, verify that the chosen snapshot timestamp is + // returned from the server and it is larger than the previous + // propagated timestamp. + assertTrue(preTs < scanner.getSnapshotTimestamp()); + syncScanner.close(); + } + @Test(timeout = 100000) public void testScanPropagatesLatestTimestamp() throws Exception { // Reset the clients in order to clear the propagated timestamp, which may @@ -309,6 +365,19 @@ public class TestScannerMultiTablet extends BaseKuduTest { assertEquals(tsPropagated, client.getLastPropagatedTimestamp()); } + @Test(timeout = 100000) + public void testScanTokenReadMode() throws Exception { + ScanTokenPB.Builder pbBuilder = ScanTokenPB.newBuilder(); + pbBuilder.setTableName(table.getName()); + pbBuilder.setReadMode(Common.ReadMode.READ_YOUR_WRITES); + Client.ScanTokenPB scanTokenPB = pbBuilder.build(); + final byte[] serializedToken = KuduScanToken.serialize(scanTokenPB); + + // Deserialize scan tokens and make sure the read mode is updated accordingly. + KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, syncClient); + assertEquals(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES, scanner.getReadMode()); + } + private AsyncKuduScanner getScanner(String lowerBoundKeyOne, String lowerBoundKeyTwo, String exclusiveUpperBoundKeyOne,
