This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5faa792762 Support partition parallelism for partitioned table scan
(#11266)
5faa792762 is described below
commit 5faa792762a2a3e7ed5839eefd80b578e0cd3d46
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 25 16:08:40 2023 -0700
Support partition parallelism for partitioned table scan (#11266)
---
.../apache/calcite/rel/hint/PinotHintOptions.java | 1 +
.../planner/physical/DispatchablePlanMetadata.java | 9 ++++
.../planner/physical/MailboxAssignmentVisitor.java | 60 ++++++++++++++++------
.../apache/pinot/query/routing/WorkerManager.java | 55 +++++++++++++-------
.../src/test/resources/queries/QueryHints.json | 12 +++++
5 files changed, 102 insertions(+), 35 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
index 7129d832a2..1625fa1e9c 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
@@ -83,5 +83,6 @@ public class PinotHintOptions {
public static class TableHintOptions {
public static final String PARTITION_KEY = "partition_key";
public static final String PARTITION_SIZE = "partition_size";
+ public static final String PARTITION_PARALLELISM = "partition_parallelism";
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index abe4f64a46..48378d4e90 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -70,6 +70,7 @@ public class DispatchablePlanMetadata implements Serializable
{
// whether a stage is partitioned table scan
private boolean _isPartitionedTableScan;
+ private int _partitionParallelism;
public DispatchablePlanMetadata() {
_scannedTables = new ArrayList<>();
@@ -143,6 +144,14 @@ public class DispatchablePlanMetadata implements
Serializable {
_isPartitionedTableScan = isPartitionedTableScan;
}
+ public int getPartitionParallelism() {
+ return _partitionParallelism;
+ }
+
+ public void setPartitionParallelism(int partitionParallelism) {
+ _partitionParallelism = partitionParallelism;
+ }
+
public Map<String, Set<String>> getTableToUnavailableSegmentsMap() {
return _tableToUnavailableSegmentsMap;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index a6d758335d..4d3855b2be 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -69,23 +69,49 @@ public class MailboxAssignmentVisitor extends
DefaultPostOrderTraversalVisitor<V
}
} else if (senderMetadata.isPartitionedTableScan()) {
// For partitioned table scan, send the data to the worker with the
same worker id (not necessary the same
- // instance)
- // TODO: Support further split the single partition into multiple
workers
- Preconditions.checkState(numSenders == numReceivers,
- "Got different number of workers for partitioned table scan,
sender: %s, receiver: %s", numSenders,
- numReceivers);
- for (int workerId = 0; workerId < numSenders; workerId++) {
- String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId,
workerId, receiverFragmentId, workerId);
- MailboxMetadata serderMailboxMetadata = new
MailboxMetadata(Collections.singletonList(mailboxId),
- Collections.singletonList(new
VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
- Collections.emptyMap());
- MailboxMetadata receiverMailboxMetadata = new
MailboxMetadata(Collections.singletonList(mailboxId),
- Collections.singletonList(new
VirtualServerAddress(senderServerMap.get(workerId), workerId)),
- Collections.emptyMap());
- senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
- .put(receiverFragmentId, serderMailboxMetadata);
- receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
- .put(senderFragmentId, receiverMailboxMetadata);
+ // instance). When partition parallelism is configured, send the data
to the corresponding workers.
+ // NOTE: Do not use partitionParallelism from the metadata because it
might be configured only in the first
+ // child. Re-compute it based on the number of receivers.
+ int partitionParallelism = numReceivers / numSenders;
+ if (partitionParallelism == 1) {
+ for (int workerId = 0; workerId < numSenders; workerId++) {
+ String mailboxId =
MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId,
workerId);
+ MailboxMetadata serderMailboxMetadata = new
MailboxMetadata(Collections.singletonList(mailboxId),
+ Collections.singletonList(new
VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
+ Collections.emptyMap());
+ MailboxMetadata receiverMailboxMetadata = new
MailboxMetadata(Collections.singletonList(mailboxId),
+ Collections.singletonList(new
VirtualServerAddress(senderServerMap.get(workerId), workerId)),
+ Collections.emptyMap());
+ senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
+ .put(receiverFragmentId, serderMailboxMetadata);
+ receiverMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>())
+ .put(senderFragmentId, receiverMailboxMetadata);
+ }
+ } else {
+ int receiverWorkerId = 0;
+ for (int senderWorkerId = 0; senderWorkerId < numSenders;
senderWorkerId++) {
+ VirtualServerAddress senderAddress =
+ new VirtualServerAddress(senderServerMap.get(senderWorkerId),
senderWorkerId);
+ MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
+ senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new
HashMap<>())
+ .put(receiverFragmentId, senderMailboxMetadata);
+ for (int i = 0; i < partitionParallelism; i++) {
+ VirtualServerAddress receiverAddress =
+ new
VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
+ String mailboxId =
MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId,
receiverFragmentId,
+ receiverWorkerId);
+ senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
+
senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
+
+ MailboxMetadata receiverMailboxMetadata =
+ receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k ->
new HashMap<>())
+ .computeIfAbsent(senderFragmentId, k -> new
MailboxMetadata());
+ receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
+
receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
+
+ receiverWorkerId++;
+ }
+ }
}
} else {
// For other exchange types, send the data to all the instances in the
receiver fragment
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index da3821d07a..2befb33306 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -102,21 +102,22 @@ public class WorkerManager {
DispatchablePlanMetadata metadata =
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
Map<String, String> tableOptions = metadata.getTableOptions();
- String partitionKey = null;
- int numPartitions = 0;
- if (tableOptions != null) {
- partitionKey =
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
- String partitionSize =
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
- if (partitionSize != null) {
- numPartitions = Integer.parseInt(partitionSize);
- }
- }
+ String partitionKey =
+ tableOptions != null ?
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY) : null;
if (partitionKey == null) {
assignWorkersToNonPartitionedLeafFragment(metadata, context);
} else {
- Preconditions.checkState(numPartitions > 0, "'%s' must be provided for
partition key: %s",
+ String numPartitionsStr =
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+ Preconditions.checkState(numPartitionsStr != null, "'%s' must be
provided for partition key: %s",
PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
- assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey,
numPartitions);
+ int numPartitions = Integer.parseInt(numPartitionsStr);
+ Preconditions.checkState(numPartitions > 0, "'%s' must be positive, got:
%s",
+ PinotHintOptions.TableHintOptions.PARTITION_SIZE, numPartitions);
+ String partitionParallelismStr =
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_PARALLELISM);
+ int partitionParallelism = partitionParallelismStr != null ?
Integer.parseInt(partitionParallelismStr) : 1;
+ Preconditions.checkState(partitionParallelism > 0, "'%s' must be
positive: %s, got: %s",
+ PinotHintOptions.TableHintOptions.PARTITION_PARALLELISM,
partitionParallelism);
+ assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey,
numPartitions, partitionParallelism);
}
}
@@ -207,7 +208,7 @@ public class WorkerManager {
}
private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata
metadata,
- DispatchablePlanContext context, String partitionKey, int numPartitions)
{
+ DispatchablePlanContext context, String partitionKey, int numPartitions,
int partitionParallelism) {
String tableName = metadata.getScannedTables().get(0);
ColocatedTableInfo colocatedTableInfo = getColocatedTableInfo(tableName,
partitionKey, numPartitions);
@@ -238,6 +239,7 @@ public class WorkerManager {
metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo);
metadata.setPartitionedTableScan(true);
+ metadata.setPartitionParallelism(partitionParallelism);
}
private void assignWorkersToIntermediateFragment(PlanFragment fragment,
DispatchablePlanContext context) {
@@ -249,12 +251,29 @@ public class WorkerManager {
Map<Integer, DispatchablePlanMetadata> metadataMap =
context.getDispatchablePlanMetadataMap();
DispatchablePlanMetadata metadata =
metadataMap.get(fragment.getFragmentId());
- // If the first child is partitioned table scan, use the same worker
assignment to avoid shuffling data
- // TODO: Introduce a hint to control this
- if (children.size() > 0) {
+ // If the first child is partitioned table scan, use the same worker
assignment to avoid shuffling data. When
+ // partition parallelism is configured, create multiple intermediate stage
workers on the same instance for each
+ // worker in the first child.
+ if (!children.isEmpty()) {
DispatchablePlanMetadata firstChildMetadata =
metadataMap.get(children.get(0).getFragmentId());
if (firstChildMetadata.isPartitionedTableScan()) {
-
metadata.setWorkerIdToServerInstanceMap(firstChildMetadata.getWorkerIdToServerInstanceMap());
+ int partitionParallelism =
firstChildMetadata.getPartitionParallelism();
+ Map<Integer, QueryServerInstance> childWorkerIdToServerInstanceMap =
+ firstChildMetadata.getWorkerIdToServerInstanceMap();
+ if (partitionParallelism == 1) {
+
metadata.setWorkerIdToServerInstanceMap(childWorkerIdToServerInstanceMap);
+ } else {
+ int numChildWorkers = childWorkerIdToServerInstanceMap.size();
+ Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new
HashMap<>();
+ int workerId = 0;
+ for (int i = 0; i < numChildWorkers; i++) {
+ QueryServerInstance serverInstance =
childWorkerIdToServerInstanceMap.get(i);
+ for (int j = 0; j < partitionParallelism; j++) {
+ workerIdToServerInstanceMap.put(workerId++, serverInstance);
+ }
+ }
+ metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
+ }
return;
}
}
@@ -308,13 +327,13 @@ public class WorkerManager {
throw new IllegalStateException(
"No server instance found for intermediate stage for tables: " +
Arrays.toString(tableNames.toArray()));
}
- Map<String, String> options = context.getPlannerContext().getOptions();
- int stageParallelism =
Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
if (metadata.isRequiresSingletonInstance()) {
// require singleton should return a single global worker ID with 0;
metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0,
new
QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
} else {
+ Map<String, String> options = context.getPlannerContext().getOptions();
+ int stageParallelism =
Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new
HashMap<>();
int workerId = 0;
for (ServerInstance serverInstance : serverInstances) {
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index 940b318f1a..c69e2adb2a 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -65,10 +65,22 @@
"description": "Group by partition column",
"sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
},
+ {
+ "description": "Group by partition column with partition parallelism",
+ "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='4',
partition_parallelism='2') */ GROUP BY {tbl1}.num"
+ },
{
"description": "Colocated JOIN with partition column",
"sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2}
/*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num =
{tbl2}.num"
},
+ {
+ "description": "Colocated JOIN with partition column with partition
parallelism",
+ "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4',
partition_parallelism='2') */ JOIN {tbl2} /*+ tableOptions(partition_key='num',
partition_size='4', partition_parallelism='2') */ ON {tbl1}.num = {tbl2}.num"
+ },
+ {
+ "description": "Colocated JOIN with partition column with partition
parallelism in first table",
+ "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4',
partition_parallelism='2') */ JOIN {tbl2} /*+ tableOptions(partition_key='num',
partition_size='4') */ ON {tbl1}.num = {tbl2}.num"
+ },
{
"description": "Colocated JOIN with partition column and group by
partition column",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true')
*/ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num =
{tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]