walterddr commented on code in PR #11234:
URL: https://github.com/apache/pinot/pull/11234#discussion_r1281239345
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -96,19 +97,66 @@ private static boolean isLeafPlan(DispatchablePlanMetadata
metadata) {
}
private void assignWorkersToLeafFragment(PlanFragment fragment,
DispatchablePlanContext context) {
- DispatchablePlanMetadata metadata =
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
- // table scan stage, need to attach server as well as segment info for
each physical table type.
- List<String> scannedTables = metadata.getScannedTables();
- String logicalTableName = scannedTables.get(0);
- Map<String, RoutingTable> routingTableMap =
getRoutingTable(logicalTableName, context.getRequestId());
- if (routingTableMap.size() == 0) {
- throw new IllegalArgumentException("Unable to find routing entries for
table: " + logicalTableName);
+ // NOTE: For pipeline breaker, leaf fragment can also have children
+ for (PlanFragment child : fragment.getChildren()) {
+ assignWorkersToNonRootFragment(child, context);
+ }
+
+ TableScanNode tableScanNode =
findTableScanNode(fragment.getFragmentRoot());
+ Preconditions.checkState(tableScanNode != null, "Failed to find table scan
node under leaf fragment");
+ String tableName = tableScanNode.getTableName();
+
+ // Extract partitionKey and numPartitions from hint if provided
+ Map<String, String> tableHintOptions =
+
tableScanNode.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS);
+ String partitionKey = null;
+ int numPartitions = 0;
+ if (tableHintOptions != null) {
+ partitionKey =
tableHintOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
+ String partitionSize =
tableHintOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+ if (partitionSize != null) {
+ numPartitions = Integer.parseInt(partitionSize);
+ }
+ }
+
+ if (partitionKey == null) {
+ assignWorkersToNonPartitionedLeafFragment(fragment, context, tableName);
+ } else {
+ Preconditions.checkState(numPartitions > 0, "'%s' must be provided for
partition key: %s",
+ PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
+ assignWorkersToPartitionedLeafFragment(fragment, context, tableName,
partitionKey, numPartitions);
}
+ }
+
+ @Nullable
+ private TableScanNode findTableScanNode(PlanNode planNode) {
Review Comment:
nit: this can be set into the fragment metadata during fragment visitor
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java:
##########
@@ -98,51 +101,54 @@ public void setUp()
boolean allowEmptySegment =
!BooleanUtils.toBoolean(extractExtraProps(testCase._extraProps,
"noEmptySegment"));
String tableName = testCaseName + "_" + tableEntry.getKey();
// Testing only OFFLINE table b/c Hybrid table test is a special case
to test separately.
- String tableNameWithType =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
- org.apache.pinot.spi.data.Schema pinotSchema =
constructSchema(tableName, tableEntry.getValue()._schema);
+ String offlineTableName =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
+ Schema pinotSchema = constructSchema(tableName,
tableEntry.getValue()._schema);
schemaMap.put(tableName, pinotSchema);
- factory1.registerTable(pinotSchema, tableNameWithType);
- factory2.registerTable(pinotSchema, tableNameWithType);
+ factory1.registerTable(pinotSchema, offlineTableName);
+ factory2.registerTable(pinotSchema, offlineTableName);
List<QueryTestCase.ColumnAndType> columnAndTypes =
tableEntry.getValue()._schema;
List<GenericRow> genericRows = toRow(columnAndTypes,
tableEntry.getValue()._inputs);
// generate segments and dump into server1 and server2
List<String> partitionColumns =
tableEntry.getValue()._partitionColumns;
+ String partitionColumn = null;
+ List<List<String>> partitionIdToSegmentsMap = null;
+ if (partitionColumns != null && partitionColumns.size() == 1) {
+ partitionColumn = partitionColumns.get(0);
+ partitionIdToSegmentsMap = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
Review Comment:
make the `4` private static final ?
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java:
##########
@@ -59,9 +59,38 @@ public Void process(PlanNode node, DispatchablePlanContext
context) {
receiverMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>()).put(senderFragmentId, mailboxMetadata);
}
});
+ } else if (senderMetadata.isPartitionedTableScan()) {
+ // For partitioned table scan, send the data to the worker with the
same worker id (not necessary the same
+ // instance)
Review Comment:
maybe we can add a TODO - which instead of same worker-id we will do a 1:2
or 1:4 fan-out.
(possibly based on the parallelism setting such as `set
stageParallelism=xxx`)
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -96,19 +97,66 @@ private static boolean isLeafPlan(DispatchablePlanMetadata
metadata) {
}
private void assignWorkersToLeafFragment(PlanFragment fragment,
DispatchablePlanContext context) {
- DispatchablePlanMetadata metadata =
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
- // table scan stage, need to attach server as well as segment info for
each physical table type.
- List<String> scannedTables = metadata.getScannedTables();
- String logicalTableName = scannedTables.get(0);
- Map<String, RoutingTable> routingTableMap =
getRoutingTable(logicalTableName, context.getRequestId());
- if (routingTableMap.size() == 0) {
- throw new IllegalArgumentException("Unable to find routing entries for
table: " + logicalTableName);
+ // NOTE: For pipeline breaker, leaf fragment can also have children
+ for (PlanFragment child : fragment.getChildren()) {
+ assignWorkersToNonRootFragment(child, context);
+ }
+
+ TableScanNode tableScanNode =
findTableScanNode(fragment.getFragmentRoot());
+ Preconditions.checkState(tableScanNode != null, "Failed to find table scan
node under leaf fragment");
+ String tableName = tableScanNode.getTableName();
+
+ // Extract partitionKey and numPartitions from hint if provided
+ Map<String, String> tableHintOptions =
+
tableScanNode.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS);
Review Comment:
nit: can we simplify all these by putting the hint on the dispatchable plan
metadata map during visitor pattern?
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -354,14 +349,12 @@ private ColocatedTableInfo getColocatedTableInfo(String
tableName) {
TimeBoundaryInfo timeBoundaryInfo =
_routingManager.getTimeBoundaryInfo(offlineTableName);
// Ignore OFFLINE side when time boundary info is unavailable
if (timeBoundaryInfo == null) {
- return getRealtimeColocatedTableInfo(realtimeTableName);
+ return getRealtimeColocatedTableInfo(realtimeTableName,
partitionKey, numPartitions);
Review Comment:
should we add a integration-test for hybrid table colocated join? i am not
sure how we can mock that in unit-test but an integration test should be super
helpful to cover this.
##########
pinot-query-runtime/src/test/resources/queries/QueryHints.json:
##########
@@ -51,37 +51,41 @@
}
},
"queries": [
+ {
+ "description": "Group by partition column",
+ "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
+ },
{
"description": "Colocated JOIN with partition column",
- "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */
{tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON
{tbl1}.num = {tbl2}.num"
+ "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2}
/*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num =
{tbl2}.num"
Review Comment:
can we add a test to indicate partition size or partition key invalid (e.g.
not `num` or number is not `4`) should throw (or should fallback?)
##########
pinot-query-runtime/src/test/resources/queries/QueryHints.json:
##########
@@ -51,37 +51,41 @@
}
},
"queries": [
+ {
+ "description": "Group by partition column",
+ "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
+ },
{
"description": "Colocated JOIN with partition column",
- "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */
{tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON
{tbl1}.num = {tbl2}.num"
+ "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2}
/*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num =
{tbl2}.num"
},
{
"description": "Colocated JOIN with partition column and group by
partition column",
- "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'),
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name,
SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY
{tbl1}.num, {tbl1}.name"
+ "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true')
*/ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num =
{tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name"
},
{
"description": "Colocated JOIN with partition column and group by
non-partitioned column",
- "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'),
aggOptions(is_partitioned_by_group_by_keys='false') */ {tbl1}.name,
SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY
{tbl1}.name"
+ "sql": "SELECT {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num =
{tbl2}.num GROUP BY {tbl1}.name"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition
column",
- "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast',
is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.name FROM {tbl1} WHERE
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx',
'yyy'))"
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.num, {tbl1}.name FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN
('xxx', 'yyy'))"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition
column and group by partition column",
- "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast',
is_colocated_by_join_keys='true'),
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num,
COUNT({tbl1}.name) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM
{tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'),
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num,
COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN
('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition
column and group by non-partitioned column",
- "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast',
is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx',
'yyy')) GROUP BY {tbl1}.name"
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN
('xxx', 'yyy')) GROUP BY {tbl1}.name"
},
{
"description": "Dynamic broadcast SEMI-JOIN with empty right table
result",
- "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM
{tbl2} WHERE {tbl2}.val = 'non-exist') GROUP BY {tbl1}.name"
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val =
'non-exist') GROUP BY {tbl1}.name"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partially
empty right table result for some servers",
- "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast',
is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'z') GROUP BY
{tbl1}.name"
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val =
'z') GROUP BY {tbl1}.name"
Review Comment:
can we add test with `set stageParallelism = 2;` variance to the hinted
tests here. we never tested option + hints at the same time, good to add. (but
can be followed up, i just tried and they all passed)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]