Repository: kudu Updated Branches: refs/heads/master 3f194c3a1 -> a95441846
KUDU-16: implement Java client limits This patch implements the scanner limits for the Java client. Each AsyncKuduScanner will maintain a count of the number of rows already returned (per table scan), and based on this value, will update the per-tablet scan request. Change-Id: I83635fbbc7714318f8b95d91b7f178e9ca7ebff7 Reviewed-on: http://gerrit.cloudera.org:8080/9926 Reviewed-by: Grant Henke <granthe...@apache.org> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a9544184 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a9544184 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a9544184 Branch: refs/heads/master Commit: a95441846bb1381ff2543ffcddf3854184a111ea Parents: 3f194c3 Author: Andrew Wong <aw...@cloudera.com> Authored: Wed Apr 4 12:48:53 2018 -0700 Committer: Andrew Wong <aw...@cloudera.com> Committed: Sat Apr 7 00:15:23 2018 +0000 ---------------------------------------------------------------------- .../kudu/client/AbstractKuduScannerBuilder.java | 4 -- .../apache/kudu/client/AsyncKuduScanner.java | 12 +++-- .../org/apache/kudu/client/TestKuduClient.java | 48 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/a9544184/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java index f27c62b..4485eec 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java @@ -197,10 +197,6 @@ public abstract class AbstractKuduScannerBuilder * Sets a limit on the number of rows that will be returned by the scanner. There's no limit * by default. * - * WARNING: Currently setting the limit has no effect. - * See <a href="https://issues.apache.org/jira/browse/KUDU-16">KUDU-16</a> for more - * information. - * * @param limit a positive long * @return this instance */ http://git-wip-us.apache.org/repos/asf/kudu/blob/a9544184/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index 24581af..a2dd160 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -209,6 +209,8 @@ public final class AsyncKuduScanner { private boolean hasMore = true; + private long numRowsReturned = 0; + /** * The tabletSlice currently being scanned. * If null, we haven't started scanning. @@ -436,6 +438,8 @@ public final class AsyncKuduScanner { lastPrimaryKey = resp.lastPrimaryKey; } + numRowsReturned += resp.data.getNumRows(); + if (!resp.more || resp.scannerId == null) { scanFinished(); return Deferred.fromResult(resp.data); // there might be data to return @@ -519,6 +523,7 @@ public final class AsyncKuduScanner { private final Callback<RowResultIterator, Response> gotNextRow = new Callback<RowResultIterator, Response>() { public RowResultIterator call(final Response resp) { + numRowsReturned += resp.data.getNumRows(); if (!resp.more) { // We're done scanning this tablet. scanFinished(); return resp.data; @@ -566,8 +571,9 @@ public final class AsyncKuduScanner { void scanFinished() { Partition partition = tablet.getPartition(); pruner.removePartitionKeyRange(partition.getPartitionKeyEnd()); - // Stop scanning if we have scanned until or past the end partition key. - if (!pruner.hasMorePartitionKeyRanges()) { + // Stop scanning if we have scanned until or past the end partition key, or + // if we have fulfilled the limit. + if (!pruner.hasMorePartitionKeyRanges() || numRowsReturned >= limit) { hasMore = false; closed = true; // the scanner is closed on the other side at this point return; @@ -819,7 +825,7 @@ public final class AsyncKuduScanner { // is the easiest way. AsyncKuduScanner.this.tablet = super.getTablet(); NewScanRequestPB.Builder newBuilder = NewScanRequestPB.newBuilder(); - newBuilder.setLimit(limit); // currently ignored + newBuilder.setLimit(limit - AsyncKuduScanner.this.numRowsReturned); newBuilder.addAllProjectedColumns(ProtobufHelper.schemaToListPb(schema)); newBuilder.setTabletId(UnsafeByteOperations.unsafeWrap(tablet.getTabletIdAsBytes())); newBuilder.setOrderMode(AsyncKuduScanner.this.getOrderMode()); http://git-wip-us.apache.org/repos/asf/kudu/blob/a9544184/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java index d527bd0..0d61e58 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java @@ -490,6 +490,54 @@ public class TestKuduClient extends BaseKuduTest { } /** + * Test scanning with limits. + */ + @Test + public void testScanWithLimit() throws Exception { + syncClient.createTable(tableName, basicSchema, getBasicTableOptionsWithNonCoveredRange()); + KuduTable table = syncClient.openTable(tableName); + KuduSession session = syncClient.newSession(); + int num_rows = 100; + for (int key = 0; key < num_rows; key++) { + session.apply(createBasicSchemaInsert(table, key)); + } + + // Test with some non-positive limits, expecting to raise an exception. + int non_positives[] = { -1, 0 }; + for (int limit : non_positives) { + try { + KuduScanner scanner = syncClient.newScannerBuilder(table) + .limit(limit) + .build(); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Need a strictly positive number")); + } + } + + // Test with a limit and ensure we get the expected number of rows. + int limits[] = { num_rows - 1, num_rows, num_rows + 1 }; + for (int limit : limits) { + KuduScanner scanner = syncClient.newScannerBuilder(table) + .limit(limit) + .build(); + int count = 0; + while (scanner.hasMoreRows()) { + count += scanner.nextRows().getNumRows(); + } + assertEquals(Math.min(num_rows, limit), count); + } + + // Now test with limits for async scanners. + for (int limit : limits) { + AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(client, table) + .limit(limit) + .build(); + assertEquals(Math.min(limit, num_rows), countRowsInScan(scanner)); + } + } + + /** * Test scanning with predicates. */ @Test