This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 570936fa84f3d652a88830425e29cc486526d72a Author: xinghuayu007 <[email protected]> AuthorDate: Mon Feb 20 15:58:07 2023 +0800 [Client] Add query id to trace the whole query process This is a follow up patch of 834de7fccdb5faadb2ca9e1d1e07d4c7882ae0fa. Query id is used to trace a query from upstream applications to Kudu for troubleshooting and debugging. Change-Id: Iddbd348e766bad1b5648b4091635679319b1e2fd Reviewed-on: http://gerrit.cloudera.org:8080/19518 Reviewed-by: Yingchun Lai <[email protected]> Tested-by: Yingchun Lai <[email protected]> --- .../kudu/client/AbstractKuduScannerBuilder.java | 16 +++++++ .../org/apache/kudu/client/AsyncKuduScanner.java | 27 ++++++++++- .../java/org/apache/kudu/client/KuduScanToken.java | 4 ++ .../java/org/apache/kudu/client/KuduScanner.java | 2 +- .../org/apache/kudu/client/TestKuduScanner.java | 39 ++++++++++++++++ .../java/org/apache/kudu/client/TestScanToken.java | 53 ++++++++++++++++++++++ 6 files changed, 139 insertions(+), 2 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java index 2c9b4aa36..db843efca 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java @@ -58,6 +58,7 @@ public abstract class AbstractKuduScannerBuilder long scanRequestTimeout; ReplicaSelection replicaSelection = ReplicaSelection.LEADER_ONLY; long keepAlivePeriodMs = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS; + String queryId = ""; AbstractKuduScannerBuilder(AsyncKuduClient client, KuduTable table) { this.client = client; @@ -412,5 +413,20 @@ public abstract class AbstractKuduScannerBuilder return (S) this; } + /** + * Set a query id for the scan to trace the whole scanning process. + * Query id is posted by the user or generated automatically by the + * client library code. It is used to trace the whole query process + * for debugging. + * + * @param queryId query id to trace a query. + * @return this instance + */ + @SuppressWarnings("unchecked") + public S setQueryId(String queryId) { + this.queryId = queryId; + return (S) this; + } + public abstract T build(); } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index 4929dd4d8..a688224ab 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.ImmutableList; @@ -284,6 +285,8 @@ public final class AsyncKuduScanner { final long scanRequestTimeout; + private String queryId; + /** * The prefetching result is cached in memory. This atomic reference is used to avoid * two concurrent prefetchings occur and the latest one overrides the previous one. @@ -399,6 +402,27 @@ public final class AsyncKuduScanner { } } + AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames, + List<Integer> projectedIndexes, ReadMode readMode, boolean isFaultTolerant, + long scanRequestTimeout, + Map<String, KuduPredicate> predicates, long limit, + boolean cacheBlocks, boolean prefetching, + byte[] startPrimaryKey, byte[] endPrimaryKey, + long startTimestamp, long htTimestamp, + int batchSizeBytes, PartitionPruner pruner, + ReplicaSelection replicaSelection, long keepAlivePeriodMs, String queryId) { + this( + client, table, projectedNames, projectedIndexes, readMode, isFaultTolerant, + scanRequestTimeout, predicates, limit, cacheBlocks, prefetching, startPrimaryKey, + endPrimaryKey, startTimestamp, htTimestamp, batchSizeBytes, + pruner, replicaSelection, keepAlivePeriodMs); + if (queryId.isEmpty()) { + this.queryId = UUID.randomUUID().toString().replace("-", ""); + } else { + this.queryId = queryId; + } + } + /** * Generates and returns a ColumnSchema for the virtual IS_DELETED column. * The column name is generated to ensure there is never a collision. @@ -1214,6 +1238,7 @@ public final class AsyncKuduScanner { default: throw new RuntimeException("unreachable!"); } + builder.setQueryId(UnsafeByteOperations.unsafeWrap(queryId.getBytes(UTF_8))); return builder.build(); } @@ -1326,7 +1351,7 @@ public final class AsyncKuduScanner { client, table, projectedColumnNames, projectedColumnIndexes, readMode, isFaultTolerant, scanRequestTimeout, predicates, limit, cacheBlocks, prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey, startTimestamp, htTimestamp, batchSizeBytes, - PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs); + PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs, queryId); } } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java index e86408951..3a5878a15 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java @@ -366,6 +366,9 @@ public class KuduScanToken implements Comparable<KuduScanToken> { if (message.hasKeepAlivePeriodMs()) { builder.keepAlivePeriodMs(message.getKeepAlivePeriodMs()); } + if (message.hasQueryId()) { + builder.setQueryId(message.getQueryId()); + } return builder; } @@ -724,6 +727,7 @@ public class KuduScanToken implements Comparable<KuduScanToken> { builder.setTabletMetadata(tabletMetadataPB); } } + builder.setQueryId(queryId); tokens.add(new KuduScanToken(keyRange.getTablet(), builder.build())); } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java index 6e16a7be9..02142630b 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java @@ -224,7 +224,7 @@ public class KuduScanner implements Iterable<RowResult> { client, table, projectedColumnNames, projectedColumnIndexes, readMode, isFaultTolerant, scanRequestTimeout, predicates, limit, cacheBlocks, prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey, startTimestamp, htTimestamp, batchSizeBytes, - PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs)); + PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs, queryId)); } } } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java index 54e7d6577..8ea7a1e08 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java @@ -233,6 +233,45 @@ public class TestKuduScanner { } } + @Test(timeout = 100000) + public void testScanWithQueryId() throws Exception { + KuduTable table = client.createTable(tableName, getBasicSchema(), getBasicCreateTableOptions()); + DataGenerator generator = new DataGenerator.DataGeneratorBuilder() + .random(RandomUtils.getRandom()) + .build(); + KuduSession session = client.newSession(); + int numRows = 10; + for (int i = 0; i < numRows; i++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + generator.randomizeRow(row); + session.apply(insert); + } + // Scan with specified query id. + { + int rowsScanned = 0; + KuduScanner scanner = client.newScannerBuilder(table) + .batchSizeBytes(100) + .setQueryId("request-id-for-test") + .build(); + while (scanner.hasMoreRows()) { + rowsScanned += scanner.nextRows().getNumRows(); + } + assertEquals(numRows, rowsScanned); + } + // Scan with default query id. + { + int rowsScanned = 0; + KuduScanner scanner = client.newScannerBuilder(table) + .batchSizeBytes(100) + .build(); + while (scanner.hasMoreRows()) { + rowsScanned += scanner.nextRows().getNumRows(); + } + assertEquals(numRows, rowsScanned); + } + } + @Test(timeout = 100000) public void testOpenScanWithDroppedPartition() throws Exception { // Create a table with 2 range partitions. diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java index 4ed897085..22aedbd6c 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java @@ -142,6 +142,59 @@ public class TestScanToken { } } + @Test + public void testScanTokenWithQueryId() throws Exception { + // Prepare the table for testing. + Schema schema = createManyStringsSchema(); + CreateTableOptions createOptions = new CreateTableOptions(); + final int buckets = 8; + createOptions.addHashPartitions(ImmutableList.of("key"), buckets); + client.createTable(testTableName, schema, createOptions); + + KuduSession session = client.newSession(); + KuduTable table = client.openTable(testTableName); + final int totalRows = 100; + for (int i = 0; i < totalRows; i++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addString("key", String.format("key_%02d", i)); + row.addString("c1", "c1_" + i); + row.addString("c2", "c2_" + i); + assertEquals(session.apply(insert).hasRowError(), false); + } + // Scan with specified query id. + { + int rowsScanned = 0; + KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table); + tokenBuilder.setProjectedColumnIndexes(ImmutableList.of()); + tokenBuilder.setQueryId("query-id-for-test"); + List<KuduScanToken> tokens = tokenBuilder.build(); + assertEquals(buckets, tokens.size()); + for (int i = 0; i < tokens.size(); i++) { + KuduScanner scanner = tokens.get(i).intoScanner(client); + while (scanner.hasMoreRows()) { + rowsScanned += scanner.nextRows().getNumRows(); + } + } + assertEquals(totalRows, rowsScanned); + } + // Scan with default query id. + { + int rowsScanned = 0; + KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table); + tokenBuilder.setProjectedColumnIndexes(ImmutableList.of()); + List<KuduScanToken> tokens = tokenBuilder.build(); + assertEquals(buckets, tokens.size()); + for (int i = 0; i < tokens.size(); i++) { + KuduScanner scanner = tokens.get(i).intoScanner(client); + while (scanner.hasMoreRows()) { + rowsScanned += scanner.nextRows().getNumRows(); + } + } + assertEquals(totalRows, rowsScanned); + } + } + /** * Regression test for KUDU-3349 */
