This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d240975d19 [multistage] fix mailbox visitor mismatch receive/send 
(#11908)
d240975d19 is described below

commit d240975d190295b720810bbc3eed0fbbf1ff3564
Author: Rong Rong <[email protected]>
AuthorDate: Tue Oct 31 09:26:28 2023 -0700

    [multistage] fix mailbox visitor mismatch receive/send (#11908)
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../planner/physical/MailboxAssignmentVisitor.java |  2 +-
 .../query/runtime/queries/QueryRunnerTestBase.java |  2 +
 .../runtime/queries/ResourceBasedQueriesTest.java  | 22 ++++---
 .../src/test/resources/queries/QueryHints.json     | 76 ++++++++++++++++++++++
 4 files changed, 91 insertions(+), 11 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 4d3855b2be..519d4d14b3 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -67,7 +67,7 @@ public class MailboxAssignmentVisitor extends 
DefaultPostOrderTraversalVisitor<V
           senderMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>()).put(receiverFragmentId, mailboxMetadata);
           receiverMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>()).put(senderFragmentId, mailboxMetadata);
         }
-      } else if (senderMetadata.isPartitionedTableScan()) {
+      } else if (senderMetadata.isPartitionedTableScan() && (numReceivers / 
numSenders > 0)) {
         // For partitioned table scan, send the data to the worker with the 
same worker id (not necessary the same
         // instance). When partition parallelism is configured, send the data 
to the corresponding workers.
         // NOTE: Do not use partitionParallelism from the metadata because it 
might be configured only in the first
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index b9ee6abfbb..8837e97a0e 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -549,6 +549,8 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
       public List<List<Object>> _inputs;
       @JsonProperty("partitionColumns")
       public List<String> _partitionColumns;
+      @JsonProperty("partitionCount")
+      public Integer _partitionCount;
     }
 
     @JsonIgnoreProperties(ignoreUnknown = true)
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 1f48a2827a..1ebf761920 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -72,7 +72,7 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
   private static final Random RANDOM = new Random(42);
   private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
   private static final String IGNORE_FILTER_PROPERTY = "pinot.runIgnored";
-  private static final int NUM_PARTITIONS = 4;
+  private static final int DEFAULT_NUM_PARTITIONS = 4;
 
   private final Map<String, Set<String>> _tableToSegmentMap = new HashMap<>();
   private boolean _isRunIgnored;
@@ -122,41 +122,43 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
 
         // generate segments and dump into server1 and server2
         List<String> partitionColumns = 
tableEntry.getValue()._partitionColumns;
+        int numPartitions = tableEntry.getValue()._partitionCount == null ? 
DEFAULT_NUM_PARTITIONS
+            : tableEntry.getValue()._partitionCount;
         String partitionColumn = null;
         List<List<String>> partitionIdToSegmentsMap = null;
         if (partitionColumns != null && partitionColumns.size() == 1) {
           partitionColumn = partitionColumns.get(0);
           partitionIdToSegmentsMap = new ArrayList<>();
-          for (int i = 0; i < NUM_PARTITIONS; i++) {
+          for (int i = 0; i < numPartitions; i++) {
             partitionIdToSegmentsMap.add(new ArrayList<>());
           }
         }
 
         List<List<GenericRow>> partitionIdToRowsMap = new ArrayList<>();
-        for (int i = 0; i < NUM_PARTITIONS; i++) {
+        for (int i = 0; i < numPartitions; i++) {
           partitionIdToRowsMap.add(new ArrayList<>());
         }
 
         for (GenericRow row : genericRows) {
           if (row == SEGMENT_BREAKER_ROW) {
             addSegments(factory1, factory2, offlineTableName, 
allowEmptySegment, partitionIdToRowsMap,
-                partitionIdToSegmentsMap);
+                partitionIdToSegmentsMap, numPartitions);
           } else {
             int partitionId;
             if (partitionColumns == null) {
-              partitionId = RANDOM.nextInt(NUM_PARTITIONS);
+              partitionId = RANDOM.nextInt(numPartitions);
             } else {
               int hashCode = 0;
               for (String field : partitionColumns) {
                 hashCode += row.getValue(field).hashCode();
               }
-              partitionId = (hashCode & Integer.MAX_VALUE) % NUM_PARTITIONS;
+              partitionId = (hashCode & Integer.MAX_VALUE) % numPartitions;
             }
             partitionIdToRowsMap.get(partitionId).add(row);
           }
         }
         addSegments(factory1, factory2, offlineTableName, allowEmptySegment, 
partitionIdToRowsMap,
-            partitionIdToSegmentsMap);
+            partitionIdToSegmentsMap, numPartitions);
 
         if (partitionColumn != null) {
           partitionedSegmentsMap.put(offlineTableName, 
Pair.of(partitionColumn, partitionIdToSegmentsMap));
@@ -223,9 +225,9 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
 
   private void addSegments(MockInstanceDataManagerFactory factory1, 
MockInstanceDataManagerFactory factory2,
       String offlineTableName, boolean allowEmptySegment, 
List<List<GenericRow>> partitionIdToRowsMap,
-      @Nullable List<List<String>> partitionIdToSegmentsMap) {
-    for (int i = 0; i < NUM_PARTITIONS; i++) {
-      MockInstanceDataManagerFactory factory = i < (NUM_PARTITIONS / 2) ? 
factory1 : factory2;
+      @Nullable List<List<String>> partitionIdToSegmentsMap, int 
numPartitions) {
+    for (int i = 0; i < numPartitions; i++) {
+      MockInstanceDataManagerFactory factory = i < (numPartitions / 2) ? 
factory1 : factory2;
       List<GenericRow> rows = partitionIdToRowsMap.get(i);
       if (allowEmptySegment || !rows.isEmpty()) {
         String segmentName = factory.addSegment(offlineTableName, rows);
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json 
b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index c69e2adb2a..84079a33eb 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -118,5 +118,81 @@
         "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ 
{tbl1}.name, COUNT(*), SUM({tbl1}.num), MIN({tbl1}.num) FROM {tbl1} WHERE 
{tbl1}.num >= 0 GROUP BY {tbl1}.name"
       }
     ]
+  },
+  "hint_option_queries_unmatched_partition": {
+    "tables": {
+      "tbl1": {
+        "schema": [
+          {"name": "num", "type": "INT"},
+          {"name": "name", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, "a"],
+          [2, "b"],
+          [3, "c"],
+          [3, "yyy"],
+          [4, "e"],
+          [4, "e"],
+          [6, "e"],
+          [7, "d"],
+          [7, "f"],
+          [8, "z"]
+        ],
+        "partitionColumns": [
+          "num"
+        ],
+        "partitionCount": 2
+      },
+      "tbl2": {
+        "schema": [
+          {"name": "num", "type": "INT"},
+          {"name": "val", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, "xxx"],
+          [1, "xxx"],
+          [3, "yyy"],
+          [3, "zzz"],
+          [5, "zzz"],
+          [6, "e"],
+          [7, "d"],
+          [8, "z"]
+        ],
+        "partitionColumns": [
+          "num"
+        ],
+        "partitionCount": 4
+      },
+      "tbl_empty": {
+        "schema": [
+          {"name": "strCol1", "type": "STRING"},
+          {"name": "intCol1", "type": "INT"},
+          {"name": "strCol2", "type": "STRING"}
+        ],
+        "inputs": []
+      }
+    },
+    "queries": [
+      {
+        "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition 
column",
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.num, {tbl1}.name FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN 
('xxx', 'yyy'))"
+      },
+      {
+        "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition 
column and group by partition column",
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), 
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, 
COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN 
('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
+      },
+      {
+        "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition 
column and group by non-partitioned column",
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN 
('xxx', 'yyy')) GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "Dynamic broadcast SEMI-JOIN with empty right table 
result",
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 
'non-exist') GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "Colocated, Dynamic broadcast SEMI-JOIN with partially 
empty right table result for some servers",
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 
'z') GROUP BY {tbl1}.name"
+      }
+    ]
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to