This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d240975d19 [multistage] fix mailbox visitor mismatch receive/send
(#11908)
d240975d19 is described below
commit d240975d190295b720810bbc3eed0fbbf1ff3564
Author: Rong Rong <[email protected]>
AuthorDate: Tue Oct 31 09:26:28 2023 -0700
[multistage] fix mailbox visitor mismatch receive/send (#11908)
Co-authored-by: Rong Rong <[email protected]>
---
.../planner/physical/MailboxAssignmentVisitor.java | 2 +-
.../query/runtime/queries/QueryRunnerTestBase.java | 2 +
.../runtime/queries/ResourceBasedQueriesTest.java | 22 ++++---
.../src/test/resources/queries/QueryHints.json | 76 ++++++++++++++++++++++
4 files changed, 91 insertions(+), 11 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 4d3855b2be..519d4d14b3 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -67,7 +67,7 @@ public class MailboxAssignmentVisitor extends
DefaultPostOrderTraversalVisitor<V
senderMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>()).put(receiverFragmentId, mailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>()).put(senderFragmentId, mailboxMetadata);
}
- } else if (senderMetadata.isPartitionedTableScan()) {
+ } else if (senderMetadata.isPartitionedTableScan() && (numReceivers /
numSenders > 0)) {
// For partitioned table scan, send the data to the worker with the
same worker id (not necessary the same
// instance). When partition parallelism is configured, send the data
to the corresponding workers.
// NOTE: Do not use partitionParallelism from the metadata because it
might be configured only in the first
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index b9ee6abfbb..8837e97a0e 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -549,6 +549,8 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
public List<List<Object>> _inputs;
@JsonProperty("partitionColumns")
public List<String> _partitionColumns;
+ @JsonProperty("partitionCount")
+ public Integer _partitionCount;
}
@JsonIgnoreProperties(ignoreUnknown = true)
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 1f48a2827a..1ebf761920 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -72,7 +72,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
private static final Random RANDOM = new Random(42);
private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
private static final String IGNORE_FILTER_PROPERTY = "pinot.runIgnored";
- private static final int NUM_PARTITIONS = 4;
+ private static final int DEFAULT_NUM_PARTITIONS = 4;
private final Map<String, Set<String>> _tableToSegmentMap = new HashMap<>();
private boolean _isRunIgnored;
@@ -122,41 +122,43 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
// generate segments and dump into server1 and server2
List<String> partitionColumns =
tableEntry.getValue()._partitionColumns;
+ int numPartitions = tableEntry.getValue()._partitionCount == null ?
DEFAULT_NUM_PARTITIONS
+ : tableEntry.getValue()._partitionCount;
String partitionColumn = null;
List<List<String>> partitionIdToSegmentsMap = null;
if (partitionColumns != null && partitionColumns.size() == 1) {
partitionColumn = partitionColumns.get(0);
partitionIdToSegmentsMap = new ArrayList<>();
- for (int i = 0; i < NUM_PARTITIONS; i++) {
+ for (int i = 0; i < numPartitions; i++) {
partitionIdToSegmentsMap.add(new ArrayList<>());
}
}
List<List<GenericRow>> partitionIdToRowsMap = new ArrayList<>();
- for (int i = 0; i < NUM_PARTITIONS; i++) {
+ for (int i = 0; i < numPartitions; i++) {
partitionIdToRowsMap.add(new ArrayList<>());
}
for (GenericRow row : genericRows) {
if (row == SEGMENT_BREAKER_ROW) {
addSegments(factory1, factory2, offlineTableName,
allowEmptySegment, partitionIdToRowsMap,
- partitionIdToSegmentsMap);
+ partitionIdToSegmentsMap, numPartitions);
} else {
int partitionId;
if (partitionColumns == null) {
- partitionId = RANDOM.nextInt(NUM_PARTITIONS);
+ partitionId = RANDOM.nextInt(numPartitions);
} else {
int hashCode = 0;
for (String field : partitionColumns) {
hashCode += row.getValue(field).hashCode();
}
- partitionId = (hashCode & Integer.MAX_VALUE) % NUM_PARTITIONS;
+ partitionId = (hashCode & Integer.MAX_VALUE) % numPartitions;
}
partitionIdToRowsMap.get(partitionId).add(row);
}
}
addSegments(factory1, factory2, offlineTableName, allowEmptySegment,
partitionIdToRowsMap,
- partitionIdToSegmentsMap);
+ partitionIdToSegmentsMap, numPartitions);
if (partitionColumn != null) {
partitionedSegmentsMap.put(offlineTableName,
Pair.of(partitionColumn, partitionIdToSegmentsMap));
@@ -223,9 +225,9 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
private void addSegments(MockInstanceDataManagerFactory factory1,
MockInstanceDataManagerFactory factory2,
String offlineTableName, boolean allowEmptySegment,
List<List<GenericRow>> partitionIdToRowsMap,
- @Nullable List<List<String>> partitionIdToSegmentsMap) {
- for (int i = 0; i < NUM_PARTITIONS; i++) {
- MockInstanceDataManagerFactory factory = i < (NUM_PARTITIONS / 2) ?
factory1 : factory2;
+ @Nullable List<List<String>> partitionIdToSegmentsMap, int
numPartitions) {
+ for (int i = 0; i < numPartitions; i++) {
+ MockInstanceDataManagerFactory factory = i < (numPartitions / 2) ?
factory1 : factory2;
List<GenericRow> rows = partitionIdToRowsMap.get(i);
if (allowEmptySegment || !rows.isEmpty()) {
String segmentName = factory.addSegment(offlineTableName, rows);
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index c69e2adb2a..84079a33eb 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -118,5 +118,81 @@
"sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */
{tbl1}.name, COUNT(*), SUM({tbl1}.num), MIN({tbl1}.num) FROM {tbl1} WHERE
{tbl1}.num >= 0 GROUP BY {tbl1}.name"
}
]
+ },
+ "hint_option_queries_unmatched_partition": {
+ "tables": {
+ "tbl1": {
+ "schema": [
+ {"name": "num", "type": "INT"},
+ {"name": "name", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, "a"],
+ [2, "b"],
+ [3, "c"],
+ [3, "yyy"],
+ [4, "e"],
+ [4, "e"],
+ [6, "e"],
+ [7, "d"],
+ [7, "f"],
+ [8, "z"]
+ ],
+ "partitionColumns": [
+ "num"
+ ],
+ "partitionCount": 2
+ },
+ "tbl2": {
+ "schema": [
+ {"name": "num", "type": "INT"},
+ {"name": "val", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, "xxx"],
+ [1, "xxx"],
+ [3, "yyy"],
+ [3, "zzz"],
+ [5, "zzz"],
+ [6, "e"],
+ [7, "d"],
+ [8, "z"]
+ ],
+ "partitionColumns": [
+ "num"
+ ],
+ "partitionCount": 4
+ },
+ "tbl_empty": {
+ "schema": [
+ {"name": "strCol1", "type": "STRING"},
+ {"name": "intCol1", "type": "INT"},
+ {"name": "strCol2", "type": "STRING"}
+ ],
+ "inputs": []
+ }
+ },
+ "queries": [
+ {
+ "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition
column",
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.num, {tbl1}.name FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN
('xxx', 'yyy'))"
+ },
+ {
+ "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition
column and group by partition column",
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'),
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num,
COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN
('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
+ },
+ {
+ "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition
column and group by non-partitioned column",
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN
('xxx', 'yyy')) GROUP BY {tbl1}.name"
+ },
+ {
+ "description": "Dynamic broadcast SEMI-JOIN with empty right table
result",
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val =
'non-exist') GROUP BY {tbl1}.name"
+ },
+ {
+ "description": "Colocated, Dynamic broadcast SEMI-JOIN with partially
empty right table result for some servers",
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val =
'z') GROUP BY {tbl1}.name"
+ }
+ ]
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]