This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit d01145b982517dd7a5aa507543e6ffc1dfa1e584 Author: Li Zhiming <[email protected]> AuthorDate: Wed Oct 14 15:41:44 2020 +0800 [java] add method to deserialize scanner token into scanner builder To support build scanner with extra dynamic runtime filters. Change-Id: I17b24a8e9dec846583ad0940de19b96b672bd603 Reviewed-on: http://gerrit.cloudera.org:8080/16598 Tested-by: Kudu Jenkins Reviewed-by: Grant Henke <[email protected]> --- .../java/org/apache/kudu/client/KuduScanToken.java | 23 +++++++++++----- .../java/org/apache/kudu/client/TestScanToken.java | 31 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java index bdf8fd1..2d39c4b 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java @@ -88,8 +88,8 @@ public class KuduScanToken implements Comparable<KuduScanToken> { * @param client a Kudu client for the cluster * @return a scanner for the scan token */ - public KuduScanner intoScanner(KuduClient client) throws Exception { - return pbIntoScanner(message, client); + public KuduScanner intoScanner(KuduClient client) throws IOException { + return pbIntoScannerBuilder(message, client).build(); } /** @@ -123,7 +123,18 @@ public class KuduScanToken implements Comparable<KuduScanToken> { */ public static KuduScanner deserializeIntoScanner(byte[] buf, KuduClient client) throws IOException { - return pbIntoScanner(ScanTokenPB.parseFrom(CodedInputStream.newInstance(buf)), client); + return deserializeIntoScannerBuilder(buf, client).build(); + } + + /** + * Deserializes a {@code KuduScanToken} into a {@link KuduScanner.KuduScannerBuilder}. + * @param buf a byte array containing the serialized scan token. + * @param client a Kudu client for the cluster + * @return a scanner builder for the serialized scan token + */ + public static KuduScanner.KuduScannerBuilder deserializeIntoScannerBuilder( + byte[] buf, KuduClient client) throws IOException { + return pbIntoScannerBuilder(ScanTokenPB.parseFrom(CodedInputStream.newInstance(buf)), client); } /** @@ -195,8 +206,8 @@ public class KuduScanToken implements Comparable<KuduScanToken> { } @SuppressWarnings("deprecation") - private static KuduScanner pbIntoScanner(ScanTokenPB message, - KuduClient client) throws KuduException { + private static KuduScanner.KuduScannerBuilder pbIntoScannerBuilder( + ScanTokenPB message, KuduClient client) throws KuduException { Preconditions.checkArgument( !message.getFeatureFlagsList().contains(ScanTokenPB.Feature.Unknown), "Scan token requires an unsupported feature. This Kudu client must be updated."); @@ -340,7 +351,7 @@ public class KuduScanToken implements Comparable<KuduScanToken> { builder.keepAlivePeriodMs(message.getKeepAlivePeriodMs()); } - return builder.build(); + return builder; } private static KuduTable getKuduTable(ScanTokenPB message, diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java index 1c75992..490b7dd 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java @@ -35,9 +35,12 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -662,4 +665,32 @@ public class TestScanToken { assertTrue(tokenWithTableMetadataBytes.length > tokenWithTabletMetadataBytes.length); assertTrue(tokenWithTabletMetadataBytes.length > tokenBytes.length); } + + @Test + public void testScanTokensWithExtraPredicate() throws IOException { + final int NUM_ROWS_DESIRED = 100; + final int PREDICATE_INDEX = 0; + final int PREDICATE_VAL = 1; + KuduTable table = createDefaultTable(client, testTableName); + loadDefaultTable(client, testTableName, NUM_ROWS_DESIRED); + KuduScanToken.KuduScanTokenBuilder builder = + new KuduScanToken.KuduScanTokenBuilder(asyncClient, table); + List<KuduScanToken> tokens = builder.build(); + ColumnSchema cs = table.getSchema().getColumnByIndex(PREDICATE_INDEX); + KuduPredicate predicate = KuduPredicate.newComparisonPredicate( + cs, KuduPredicate.ComparisonOp.EQUAL, PREDICATE_VAL); + Set<Integer> resultKeys = new HashSet<>(); + for (KuduScanToken token : tokens) { + byte[] serialized = token.serialize(); + KuduScanner.KuduScannerBuilder scannerBuilder = KuduScanToken.deserializeIntoScannerBuilder( + serialized, client); + scannerBuilder.addPredicate(predicate); + KuduScanner scanner = scannerBuilder.build(); + for (RowResult rowResult : scanner) { + resultKeys.add(rowResult.getInt(PREDICATE_INDEX)); + } + } + assertEquals(1, resultKeys.size()); + assertEquals(PREDICATE_VAL, Iterables.getOnlyElement(resultKeys).intValue()); + } }
