This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit d01145b982517dd7a5aa507543e6ffc1dfa1e584
Author: Li Zhiming <[email protected]>
AuthorDate: Wed Oct 14 15:41:44 2020 +0800

    [java] add method to deserialize scanner token into scanner builder
    
    To support build scanner with extra dynamic runtime filters.
    
    Change-Id: I17b24a8e9dec846583ad0940de19b96b672bd603
    Reviewed-on: http://gerrit.cloudera.org:8080/16598
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <[email protected]>
---
 .../java/org/apache/kudu/client/KuduScanToken.java | 23 +++++++++++-----
 .../java/org/apache/kudu/client/TestScanToken.java | 31 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 6 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index bdf8fd1..2d39c4b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -88,8 +88,8 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
    * @param client a Kudu client for the cluster
    * @return a scanner for the scan token
    */
-  public KuduScanner intoScanner(KuduClient client) throws Exception {
-    return pbIntoScanner(message, client);
+  public KuduScanner intoScanner(KuduClient client) throws IOException {
+    return pbIntoScannerBuilder(message, client).build();
   }
 
   /**
@@ -123,7 +123,18 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
    */
   public static KuduScanner deserializeIntoScanner(byte[] buf, KuduClient 
client)
       throws IOException {
-    return 
pbIntoScanner(ScanTokenPB.parseFrom(CodedInputStream.newInstance(buf)), client);
+    return deserializeIntoScannerBuilder(buf, client).build();
+  }
+
+  /**
+   * Deserializes a {@code KuduScanToken} into a {@link 
KuduScanner.KuduScannerBuilder}.
+   * @param buf a byte array containing the serialized scan token.
+   * @param client a Kudu client for the cluster
+   * @return a scanner builder for the serialized scan token
+   */
+  public static KuduScanner.KuduScannerBuilder deserializeIntoScannerBuilder(
+      byte[] buf, KuduClient client) throws IOException {
+    return 
pbIntoScannerBuilder(ScanTokenPB.parseFrom(CodedInputStream.newInstance(buf)), 
client);
   }
 
   /**
@@ -195,8 +206,8 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
   }
 
   @SuppressWarnings("deprecation")
-  private static KuduScanner pbIntoScanner(ScanTokenPB message,
-                                           KuduClient client) throws 
KuduException {
+  private static KuduScanner.KuduScannerBuilder pbIntoScannerBuilder(
+      ScanTokenPB message, KuduClient client) throws KuduException {
     Preconditions.checkArgument(
         !message.getFeatureFlagsList().contains(ScanTokenPB.Feature.Unknown),
         "Scan token requires an unsupported feature. This Kudu client must be 
updated.");
@@ -340,7 +351,7 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
       builder.keepAlivePeriodMs(message.getKeepAlivePeriodMs());
     }
 
-    return builder.build();
+    return builder;
   }
 
   private static KuduTable getKuduTable(ScanTokenPB message,
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
index 1c75992..490b7dd 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
@@ -35,9 +35,12 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -662,4 +665,32 @@ public class TestScanToken {
     assertTrue(tokenWithTableMetadataBytes.length > 
tokenWithTabletMetadataBytes.length);
     assertTrue(tokenWithTabletMetadataBytes.length > tokenBytes.length);
   }
+
+  @Test
+  public void testScanTokensWithExtraPredicate() throws IOException {
+    final int NUM_ROWS_DESIRED = 100;
+    final int PREDICATE_INDEX = 0;
+    final int PREDICATE_VAL = 1;
+    KuduTable table = createDefaultTable(client, testTableName);
+    loadDefaultTable(client, testTableName, NUM_ROWS_DESIRED);
+    KuduScanToken.KuduScanTokenBuilder builder =
+        new KuduScanToken.KuduScanTokenBuilder(asyncClient, table);
+    List<KuduScanToken> tokens = builder.build();
+    ColumnSchema cs = table.getSchema().getColumnByIndex(PREDICATE_INDEX);
+    KuduPredicate predicate = KuduPredicate.newComparisonPredicate(
+        cs, KuduPredicate.ComparisonOp.EQUAL, PREDICATE_VAL);
+    Set<Integer> resultKeys = new HashSet<>();
+    for (KuduScanToken token : tokens) {
+      byte[] serialized = token.serialize();
+      KuduScanner.KuduScannerBuilder scannerBuilder = 
KuduScanToken.deserializeIntoScannerBuilder(
+          serialized, client);
+      scannerBuilder.addPredicate(predicate);
+      KuduScanner scanner = scannerBuilder.build();
+      for (RowResult rowResult : scanner) {
+        resultKeys.add(rowResult.getInt(PREDICATE_INDEX));
+      }
+    }
+    assertEquals(1, resultKeys.size());
+    assertEquals(PREDICATE_VAL, 
Iterables.getOnlyElement(resultKeys).intValue());
+  }
 }

Reply via email to