This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 570936fa84f3d652a88830425e29cc486526d72a
Author: xinghuayu007 <[email protected]>
AuthorDate: Mon Feb 20 15:58:07 2023 +0800

    [Client] Add query id to trace the whole query process
    
    This is a follow up patch of 834de7fccdb5faadb2ca9e1d1e07d4c7882ae0fa.
    
    Query id is used to trace a query from upstream applications to Kudu
    for troubleshooting and debugging.
    
    Change-Id: Iddbd348e766bad1b5648b4091635679319b1e2fd
    Reviewed-on: http://gerrit.cloudera.org:8080/19518
    Reviewed-by: Yingchun Lai <[email protected]>
    Tested-by: Yingchun Lai <[email protected]>
---
 .../kudu/client/AbstractKuduScannerBuilder.java    | 16 +++++++
 .../org/apache/kudu/client/AsyncKuduScanner.java   | 27 ++++++++++-
 .../java/org/apache/kudu/client/KuduScanToken.java |  4 ++
 .../java/org/apache/kudu/client/KuduScanner.java   |  2 +-
 .../org/apache/kudu/client/TestKuduScanner.java    | 39 ++++++++++++++++
 .../java/org/apache/kudu/client/TestScanToken.java | 53 ++++++++++++++++++++++
 6 files changed, 139 insertions(+), 2 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index 2c9b4aa36..db843efca 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -58,6 +58,7 @@ public abstract class AbstractKuduScannerBuilder
   long scanRequestTimeout;
   ReplicaSelection replicaSelection = ReplicaSelection.LEADER_ONLY;
   long keepAlivePeriodMs = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS;
+  String queryId = "";
 
   AbstractKuduScannerBuilder(AsyncKuduClient client, KuduTable table) {
     this.client = client;
@@ -412,5 +413,20 @@ public abstract class AbstractKuduScannerBuilder
     return (S) this;
   }
 
+  /**
+   * Set a query id for the scan to trace the whole scanning process.
+   * Query id is posted by the user or generated automatically by the
+   * client library code. It is used to trace the whole query process
+   * for debugging.
+   *
+   * @param queryId query id to trace a query.
+   * @return this instance
+   */
+  @SuppressWarnings("unchecked")
+  public S setQueryId(String queryId) {
+    this.queryId = queryId;
+    return (S) this;
+  }
+
   public abstract T build();
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 4929dd4d8..a688224ab 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.ImmutableList;
@@ -284,6 +285,8 @@ public final class AsyncKuduScanner {
 
   final long scanRequestTimeout;
 
+  private String queryId;
+
   /**
    * The prefetching result is cached in memory. This atomic reference is used 
to avoid
    * two concurrent prefetchings occur and the latest one overrides the 
previous one.
@@ -399,6 +402,27 @@ public final class AsyncKuduScanner {
     }
   }
 
+  AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> 
projectedNames,
+                   List<Integer> projectedIndexes, ReadMode readMode, boolean 
isFaultTolerant,
+                   long scanRequestTimeout,
+                   Map<String, KuduPredicate> predicates, long limit,
+                   boolean cacheBlocks, boolean prefetching,
+                   byte[] startPrimaryKey, byte[] endPrimaryKey,
+                   long startTimestamp, long htTimestamp,
+                   int batchSizeBytes, PartitionPruner pruner,
+                   ReplicaSelection replicaSelection, long keepAlivePeriodMs, 
String queryId) {
+    this(
+        client, table, projectedNames, projectedIndexes, readMode, 
isFaultTolerant,
+        scanRequestTimeout, predicates, limit, cacheBlocks, prefetching, 
startPrimaryKey,
+        endPrimaryKey, startTimestamp, htTimestamp, batchSizeBytes,
+        pruner, replicaSelection, keepAlivePeriodMs);
+    if (queryId.isEmpty()) {
+      this.queryId = UUID.randomUUID().toString().replace("-", "");
+    } else {
+      this.queryId = queryId;
+    }
+  }
+
   /**
    * Generates and returns a ColumnSchema for the virtual IS_DELETED column.
    * The column name is generated to ensure there is never a collision.
@@ -1214,6 +1238,7 @@ public final class AsyncKuduScanner {
         default:
           throw new RuntimeException("unreachable!");
       }
+      
builder.setQueryId(UnsafeByteOperations.unsafeWrap(queryId.getBytes(UTF_8)));
 
       return builder.build();
     }
@@ -1326,7 +1351,7 @@ public final class AsyncKuduScanner {
           client, table, projectedColumnNames, projectedColumnIndexes, 
readMode, isFaultTolerant,
           scanRequestTimeout, predicates, limit, cacheBlocks, prefetching, 
lowerBoundPrimaryKey,
           upperBoundPrimaryKey, startTimestamp, htTimestamp, batchSizeBytes,
-          PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs);
+          PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs, 
queryId);
     }
   }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index e86408951..3a5878a15 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -366,6 +366,9 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
     if (message.hasKeepAlivePeriodMs()) {
       builder.keepAlivePeriodMs(message.getKeepAlivePeriodMs());
     }
+    if (message.hasQueryId()) {
+      builder.setQueryId(message.getQueryId());
+    }
 
     return builder;
   }
@@ -724,6 +727,7 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
               builder.setTabletMetadata(tabletMetadataPB);
             }
           }
+          builder.setQueryId(queryId);
 
           tokens.add(new KuduScanToken(keyRange.getTablet(), builder.build()));
         }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 6e16a7be9..02142630b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -224,7 +224,7 @@ public class KuduScanner implements Iterable<RowResult> {
           client, table, projectedColumnNames, projectedColumnIndexes, 
readMode, isFaultTolerant,
           scanRequestTimeout, predicates, limit, cacheBlocks, prefetching, 
lowerBoundPrimaryKey,
           upperBoundPrimaryKey, startTimestamp, htTimestamp, batchSizeBytes,
-          PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs));
+          PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs, 
queryId));
     }
   }
 }
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
index 54e7d6577..8ea7a1e08 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
@@ -233,6 +233,45 @@ public class TestKuduScanner {
     }
   }
 
+  @Test(timeout = 100000)
+  public void testScanWithQueryId() throws Exception {
+    KuduTable table = client.createTable(tableName, getBasicSchema(), 
getBasicCreateTableOptions());
+    DataGenerator generator = new DataGenerator.DataGeneratorBuilder()
+        .random(RandomUtils.getRandom())
+        .build();
+    KuduSession session = client.newSession();
+    int numRows = 10;
+    for (int i = 0; i < numRows; i++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      generator.randomizeRow(row);
+      session.apply(insert);
+    }
+    // Scan with specified query id.
+    {
+      int rowsScanned = 0;
+      KuduScanner scanner = client.newScannerBuilder(table)
+          .batchSizeBytes(100)
+          .setQueryId("request-id-for-test")
+          .build();
+      while (scanner.hasMoreRows()) {
+        rowsScanned += scanner.nextRows().getNumRows();
+      }
+      assertEquals(numRows, rowsScanned);
+    }
+    // Scan with default query id.
+    {
+      int rowsScanned = 0;
+      KuduScanner scanner = client.newScannerBuilder(table)
+          .batchSizeBytes(100)
+          .build();
+      while (scanner.hasMoreRows()) {
+        rowsScanned += scanner.nextRows().getNumRows();
+      }
+      assertEquals(numRows, rowsScanned);
+    }
+  }
+
   @Test(timeout = 100000)
   public void testOpenScanWithDroppedPartition() throws Exception {
     // Create a table with 2 range partitions.
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
index 4ed897085..22aedbd6c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
@@ -142,6 +142,59 @@ public class TestScanToken {
     }
   }
 
+  @Test
+  public void testScanTokenWithQueryId() throws Exception {
+    // Prepare the table for testing.
+    Schema schema = createManyStringsSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    final int buckets = 8;
+    createOptions.addHashPartitions(ImmutableList.of("key"), buckets);
+    client.createTable(testTableName, schema, createOptions);
+
+    KuduSession session = client.newSession();
+    KuduTable table = client.openTable(testTableName);
+    final int totalRows = 100;
+    for (int i = 0; i < totalRows; i++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", String.format("key_%02d", i));
+      row.addString("c1", "c1_" + i);
+      row.addString("c2", "c2_" + i);
+      assertEquals(session.apply(insert).hasRowError(), false);
+    }
+    // Scan with specified query id.
+    {
+      int rowsScanned = 0;
+      KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
client.newScanTokenBuilder(table);
+      tokenBuilder.setProjectedColumnIndexes(ImmutableList.of());
+      tokenBuilder.setQueryId("query-id-for-test");
+      List<KuduScanToken> tokens = tokenBuilder.build();
+      assertEquals(buckets, tokens.size());
+      for (int i = 0; i < tokens.size(); i++) {
+        KuduScanner scanner = tokens.get(i).intoScanner(client);
+        while (scanner.hasMoreRows()) {
+          rowsScanned += scanner.nextRows().getNumRows();
+        }
+      }
+      assertEquals(totalRows, rowsScanned);
+    }
+    // Scan with default query id.
+    {
+      int rowsScanned = 0;
+      KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
client.newScanTokenBuilder(table);
+      tokenBuilder.setProjectedColumnIndexes(ImmutableList.of());
+      List<KuduScanToken> tokens = tokenBuilder.build();
+      assertEquals(buckets, tokens.size());
+      for (int i = 0; i < tokens.size(); i++) {
+        KuduScanner scanner = tokens.get(i).intoScanner(client);
+        while (scanner.hasMoreRows()) {
+          rowsScanned += scanner.nextRows().getNumRows();
+        }
+      }
+      assertEquals(totalRows, rowsScanned);
+    }
+  }
+
   /**
    * Regression test for KUDU-3349
    */

Reply via email to