This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a02253d671d [multistage] Support Hash Functions Gracefully in V2
Optimizer (#16296)
a02253d671d is described below
commit a02253d671d7781acc16b81628e51bd11a3005a0
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Jul 8 01:47:01 2025 -0500
[multistage] Support Hash Functions Gracefully in V2 Optimizer (#16296)
---
.../org/apache/pinot/query/QueryEnvironment.java | 2 +-
.../query/context/PhysicalPlannerContext.java | 14 ++++++-
.../planner/physical/v2/DistHashFunction.java | 43 ++++++++++++++++++++++
.../planner/physical/v2/HashDistributionDesc.java | 6 +--
.../physical/v2/PRelToPlanNodeConverter.java | 3 +-
.../v2/PlanFragmentAndMailboxAssignment.java | 2 +-
.../physical/v2/nodes/PhysicalExchange.java | 13 +++++--
.../v2/opt/rules/AggregatePushdownRule.java | 2 +-
.../opt/rules/LeafStageWorkerAssignmentRule.java | 8 +++-
.../v2/opt/rules/LiteModeWorkerAssignmentRule.java | 3 +-
.../v2/opt/rules/RootExchangeInsertRule.java | 3 +-
.../physical/v2/opt/rules/SortPushdownRule.java | 3 +-
.../v2/opt/rules/WorkerExchangeAssignmentRule.java | 43 ++++++++++++----------
.../query/planner/plannode/MailboxSendNode.java | 9 -----
.../query/planner/logical/StagesTestBase.java | 5 ++-
.../physical/v2/PinotDataDistributionTest.java | 6 ++-
.../rules/LeafStageWorkerAssignmentRuleTest.java | 21 +++++++++--
17 files changed, 133 insertions(+), 53 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 2d30a9cdf38..1ee92b679cd 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -195,7 +195,7 @@ public class QueryEnvironment {
workerManager.getHostName(), workerManager.getPort(),
_envConfig.getRequestId(),
workerManager.getInstanceId(), sqlNodeAndOptions.getOptions(),
_envConfig.defaultUseLiteMode(), _envConfig.defaultRunInBroker(),
_envConfig.defaultUseBrokerPruning(),
- _envConfig.defaultLiteModeServerStageLimit());
+ _envConfig.defaultLiteModeServerStageLimit(),
_envConfig.defaultHashFunction());
}
return new PlannerContext(_config, _catalogReader, _typeFactory,
optProgram, traitProgram,
sqlNodeAndOptions.getOptions(), _envConfig, format,
physicalPlannerContext);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index e8e46b39405..f67004bd2a7 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -28,6 +28,8 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.physical.v2.DistHashFunction;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -66,6 +68,7 @@ public class PhysicalPlannerContext {
private final boolean _runInBroker;
private final boolean _useBrokerPruning;
private final int _liteModeServerStageLimit;
+ private final DistHashFunction _defaultHashFunction;
/**
* Used by controller when it needs to extract table names from the query.
@@ -82,18 +85,20 @@ public class PhysicalPlannerContext {
_runInBroker = CommonConstants.Broker.DEFAULT_RUN_IN_BROKER;
_useBrokerPruning = CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING;
_liteModeServerStageLimit =
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT;
+ _defaultHashFunction =
DistHashFunction.valueOf(KeySelector.DEFAULT_HASH_ALGORITHM.toUpperCase());
}
public PhysicalPlannerContext(RoutingManager routingManager, String
hostName, int port, long requestId,
String instanceId, Map<String, String> queryOptions) {
this(routingManager, hostName, port, requestId, instanceId, queryOptions,
CommonConstants.Broker.DEFAULT_USE_LITE_MODE,
CommonConstants.Broker.DEFAULT_RUN_IN_BROKER,
- CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT);
+ CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT,
+ KeySelector.DEFAULT_HASH_ALGORITHM);
}
public PhysicalPlannerContext(RoutingManager routingManager, String
hostName, int port, long requestId,
String instanceId, Map<String, String> queryOptions, boolean
defaultUseLiteMode, boolean defaultRunInBroker,
- boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit) {
+ boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit,
String defaultHashFunction) {
_routingManager = routingManager;
_hostName = hostName;
_port = port;
@@ -105,6 +110,7 @@ public class PhysicalPlannerContext {
_useBrokerPruning = QueryOptionsUtils.isUseBrokerPruning(_queryOptions,
defaultUseBrokerPruning);
_liteModeServerStageLimit =
QueryOptionsUtils.getLiteModeServerStageLimit(_queryOptions,
defaultLiteModeLeafStageLimit);
+ _defaultHashFunction =
DistHashFunction.valueOf(defaultHashFunction.toUpperCase());
_instanceIdToQueryServerInstance.put(instanceId,
getBrokerQueryServerInstance());
}
@@ -175,6 +181,10 @@ public class PhysicalPlannerContext {
.collect(Collectors.toList()).get(numCandidates == 1 ? 0 :
random.nextInt(numCandidates - 1));
}
+ public DistHashFunction getDefaultHashFunction() {
+ return _defaultHashFunction;
+ }
+
private QueryServerInstance getBrokerQueryServerInstance() {
return new QueryServerInstance(_instanceId, _hostName, _port, _port);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/DistHashFunction.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/DistHashFunction.java
new file mode 100644
index 00000000000..1f8b3d0aad5
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/DistHashFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2;
+
+/**
+ * Hash Functions supported by the v2 query optimizer. These are the hash
functions supported by the Shuffle Exchange
+ * runtime, and the ones which are considered for the table-scan. If there's a
hash-function in the table-scan that
+ * doesn't belong here, the table-scan won't be considered partitioned.
+ */
+public enum DistHashFunction {
+ MURMUR,
+ MURMUR3,
+ HASHCODE,
+ ABSHASHCODE;
+
+ /**
+ * Whether the given hash-function is considered by the v2 query optimizer
for partitioning purposes.
+ */
+ public static boolean isSupported(String hashFunction) {
+ try {
+ DistHashFunction.valueOf(hashFunction.toUpperCase());
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
index dc373b4a882..d57073b6eea 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
@@ -39,10 +39,10 @@ import
org.apache.pinot.query.planner.physical.v2.mapping.PinotDistMapping;
public class HashDistributionDesc {
private final int _cachedHashCode;
private final List<Integer> _keys;
- private final String _hashFunction;
+ private final DistHashFunction _hashFunction;
private final int _numPartitions;
- public HashDistributionDesc(List<Integer> keys, String hashFunction, int
numPartitions) {
+ public HashDistributionDesc(List<Integer> keys, DistHashFunction
hashFunction, int numPartitions) {
_cachedHashCode = Objects.hash(keys, hashFunction, numPartitions);
_keys = keys;
_hashFunction = hashFunction;
@@ -53,7 +53,7 @@ public class HashDistributionDesc {
return _keys;
}
- public String getHashFunction() {
+ public DistHashFunction getHashFunction() {
return _hashFunction;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
index 26b395b4648..8a79f61fe29 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
@@ -49,7 +49,6 @@ import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.logical.RexExpressionUtils;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAsOfJoin;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
@@ -119,7 +118,7 @@ public class PRelToPlanNodeConverter {
new ArrayList<>(), node.getRelExchangeType(),
RelDistribution.Type.ANY, node.getDistributionKeys(),
false, node.getRelCollation().getFieldCollations(), false,
!node.getRelCollation().getKeys().isEmpty(), Set.of() /* table names
*/, node.getExchangeStrategy(),
- KeySelector.DEFAULT_HASH_ALGORITHM);
+ node.getHashFunction().name());
}
public static SetOpNode convertSetOp(SetOp node) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
index bfc200981d2..d7f45ccacfb 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
@@ -102,7 +102,7 @@ public class PlanFragmentAndMailboxAssignment {
MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId,
inputFragmentSchema, new ArrayList<>(),
currentFragmentId, PinotRelExchangeType.getDefaultExchangeType(),
distributionType,
physicalExchange.getDistributionKeys(), false,
physicalExchange.getRelCollation().getFieldCollations(),
- false /* sort on sender */);
+ false /* sort on sender */,
physicalExchange.getHashFunction().name());
MailboxReceiveNode receiveNode = new
MailboxReceiveNode(currentFragmentId, inputFragmentSchema,
senderFragmentId, PinotRelExchangeType.getDefaultExchangeType(),
distributionType,
physicalExchange.getDistributionKeys(),
physicalExchange.getRelCollation().getFieldCollations(),
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
index b00d786f00c..5964a086014 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
@@ -33,6 +33,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTraitDef;
+import org.apache.pinot.query.planner.physical.v2.DistHashFunction;
import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
@@ -76,10 +77,11 @@ public class PhysicalExchange extends Exchange implements
PRelNode {
* When not empty, records in each output stream will be sorted by the
ordering defined by this collation.
*/
private final RelCollation _relCollation;
+ private final DistHashFunction _hashFunction;
public PhysicalExchange(int nodeId, PRelNode input, @Nullable
PinotDataDistribution pinotDataDistribution,
List<Integer> distributionKeys, ExchangeStrategy exchangeStrategy,
@Nullable RelCollation relCollation,
- PinotExecStrategyTrait execStrategyTrait) {
+ PinotExecStrategyTrait execStrategyTrait, DistHashFunction hashFunction)
{
super(input.unwrap().getCluster(),
EMPTY_TRAIT_SET.plus(execStrategyTrait), input.unwrap(),
ExchangeStrategy.getRelDistribution(exchangeStrategy,
distributionKeys));
_nodeId = nodeId;
@@ -88,6 +90,7 @@ public class PhysicalExchange extends Exchange implements
PRelNode {
_distributionKeys = distributionKeys;
_exchangeStrategy = exchangeStrategy;
_relCollation = relCollation == null ? RelCollations.EMPTY : relCollation;
+ _hashFunction = hashFunction;
}
@Override
@@ -95,7 +98,7 @@ public class PhysicalExchange extends Exchange implements
PRelNode {
Preconditions.checkState(newInput instanceof PRelNode, "Expected input of
PhysicalExchange to be a PRelNode");
// TODO(mse-physical): this always uses streaming exec strategy at the
moment.
return new PhysicalExchange(_nodeId, (PRelNode) newInput,
_pinotDataDistribution, _distributionKeys,
- _exchangeStrategy, _relCollation,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ _exchangeStrategy, _relCollation,
PinotExecStrategyTrait.getDefaultExecStrategy(), _hashFunction);
}
@Override
@@ -152,10 +155,14 @@ public class PhysicalExchange extends Exchange implements
PRelNode {
return trait.getType();
}
+ public DistHashFunction getHashFunction() {
+ return _hashFunction;
+ }
+
@Override
public PRelNode with(int newNodeId, List<PRelNode> newInputs,
PinotDataDistribution newDistribution) {
return new PhysicalExchange(newNodeId, newInputs.get(0), newDistribution,
_distributionKeys, _exchangeStrategy,
- _relCollation, PinotExecStrategyTrait.getDefaultExecStrategy());
+ _relCollation, PinotExecStrategyTrait.getDefaultExecStrategy(),
_hashFunction);
}
@Override public RelWriter explainTerms(RelWriter pw) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/AggregatePushdownRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/AggregatePushdownRule.java
index f38dcef4807..05ed48c5fb6 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/AggregatePushdownRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/AggregatePushdownRule.java
@@ -136,7 +136,7 @@ public class AggregatePushdownRule extends PRelOptRule {
: PinotDistMapping.apply(o1.getRelCollation(),
mapFromInputToPartialAgg);
PhysicalExchange n1 = new PhysicalExchange(o1.getNodeId(), n2,
o1.getPinotDataDistributionOrThrow().apply(mapFromInputToPartialAgg),
newDistKeys, o1.getExchangeStrategy(),
- newCollation, PinotExecStrategyTrait.getDefaultExecStrategy());
+ newCollation, PinotExecStrategyTrait.getDefaultExecStrategy(),
o1.getHashFunction());
return convertAggFromIntermediateInput(aggPRelNode, n1, AggType.FINAL,
leafReturnFinalResult,
PinotDistMapping.apply(RelCollations.of(o0.getCollations()),
mapFromInputToPartialAgg).getFieldCollations(),
aggPRelNode.getLimit(), idGenerator);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
index 09008e41448..1ac9d9f1bf7 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
@@ -57,6 +57,7 @@ import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.logical.LeafStageToPinotQuery;
+import org.apache.pinot.query.planner.physical.v2.DistHashFunction;
import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
@@ -281,7 +282,9 @@ public class LeafStageWorkerAssignmentRule extends
PRelOptRule {
int keyIndex = fieldNames.indexOf(tablePartitionInfo.getPartitionColumn());
String function = tablePartitionInfo.getPartitionFunctionName();
int numSelectedServers = instanceIdToSegmentsMap.size();
- if (keyIndex == -1) {
+ if (!DistHashFunction.isSupported(function)) {
+ return null;
+ } else if (keyIndex == -1) {
LOGGER.warn("Unable to find partition column {} in table scan fields
{}", tablePartitionInfo.getPartitionColumn(),
fieldNames);
return null;
@@ -362,7 +365,8 @@ public class LeafStageWorkerAssignmentRule extends
PRelOptRule {
workers.set(workerId, String.format("%s@%s", workerId,
workers.get(workerId)));
workerIdToSegmentsMap.put(workerId, ImmutableMap.of(tableType,
segmentsForWorker));
}
- HashDistributionDesc desc = new
HashDistributionDesc(ImmutableList.of(keyIndex), function, numPartitions);
+ HashDistributionDesc desc = new
HashDistributionDesc(ImmutableList.of(keyIndex),
+ DistHashFunction.valueOf(function.toUpperCase()), numPartitions);
PinotDataDistribution dataDistribution = new
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
workers, workers.hashCode(), ImmutableSet.of(desc), null);
return new TableScanWorkerAssignmentResult(dataDistribution,
workerIdToSegmentsMap);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
index e3cf4379bbb..4cebd16f32e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
@@ -106,7 +106,8 @@ public class LiteModeWorkerAssignmentRule implements
PRelNodeTransformer {
RelDistribution.Type.SINGLETON, liteModeWorkers,
liteModeWorkers.hashCode(), null, null);
}
return new PhysicalExchange(nodeId(), leafStageRoot, pdd,
Collections.emptyList(),
- ExchangeStrategy.SINGLETON_EXCHANGE, collation,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ ExchangeStrategy.SINGLETON_EXCHANGE, collation,
PinotExecStrategyTrait.getDefaultExecStrategy(),
+ _context.getDefaultHashFunction());
}
/**
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
index 714f096a319..c2c53ffd631 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
@@ -53,7 +53,8 @@ public class RootExchangeInsertRule implements
PRelNodeTransformer {
PinotDataDistribution pinotDataDistribution = new
PinotDataDistribution(RelDistribution.Type.SINGLETON,
workers, workers.hashCode(), null, inferCollation(currentNode));
return new PhysicalExchange(nodeId(), currentNode, pinotDataDistribution,
List.of(),
- ExchangeStrategy.SINGLETON_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ ExchangeStrategy.SINGLETON_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy(),
+ _context.getDefaultHashFunction());
}
private String brokerWorkerId() {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/SortPushdownRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/SortPushdownRule.java
index f906bcffd90..94481d515fb 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/SortPushdownRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/SortPushdownRule.java
@@ -78,7 +78,8 @@ public class SortPushdownRule extends PRelOptRule {
PhysicalSort n2 = new PhysicalSort(sort.getCluster(),
RelTraitSet.createEmpty(), List.of(), sort.collation,
null /* offset */, newSortFetch, o2, nodeId(),
o2.getPinotDataDistributionOrThrow(), o2.isLeafStage());
PhysicalExchange n1 = new PhysicalExchange(nodeId(), n2,
o1.getPinotDataDistributionOrThrow(),
- o1.getDistributionKeys(), o1.getExchangeStrategy(),
o1.getRelCollation(), o1.getExecStrategy());
+ o1.getDistributionKeys(), o1.getExchangeStrategy(),
o1.getRelCollation(), o1.getExecStrategy(),
+ o1.getHashFunction());
return new PhysicalSort(sort.getCluster(), sort.getTraitSet(),
sort.getHints(), sort.getCollation(),
sort.offset, sort.fetch, n1, sort.getNodeId(),
sort.getPinotDataDistributionOrThrow(), false);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
index 8bffea30933..2f338083bd2 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
@@ -43,7 +43,7 @@ import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
import org.apache.pinot.query.context.PhysicalPlannerContext;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.physical.v2.DistHashFunction;
import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
@@ -77,7 +77,6 @@ import org.slf4j.LoggerFactory;
public class WorkerExchangeAssignmentRule implements PRelNodeTransformer {
private static final Logger LOGGER =
LoggerFactory.getLogger(WorkerExchangeAssignmentRule.class);
private final PhysicalPlannerContext _physicalPlannerContext;
- private static final String DEFAULT_HASH_FUNCTION =
KeySelector.DEFAULT_HASH_ALGORITHM;
public WorkerExchangeAssignmentRule(PhysicalPlannerContext context) {
_physicalPlannerContext = context;
@@ -150,7 +149,7 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
// Update current node with its distribution, and since this is a leaf
stage boundary, add an identity exchange.
return new PhysicalExchange(nodeId(), currentNode,
currentNode.getPinotDataDistribution(), Collections.emptyList(),
ExchangeStrategy.IDENTITY_EXCHANGE,
- null, PinotExecStrategyTrait.getDefaultExecStrategy());
+ null, PinotExecStrategyTrait.getDefaultExecStrategy(),
_physicalPlannerContext.getDefaultHashFunction());
}
// When no exchange, simply update current node with the distribution.
return currentNode.with(currentNode.getPRelInputs(),
currentNodeDistribution);
@@ -234,7 +233,7 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
PinotDataDistribution newDataDistribution =
derivedDistribution.withCollation(relCollation);
currentNodeExchange = new PhysicalExchange(nodeId(), currentNode,
newDataDistribution, Collections.emptyList(),
ExchangeStrategy.IDENTITY_EXCHANGE, relCollation,
- PinotExecStrategyTrait.getDefaultExecStrategy());
+ PinotExecStrategyTrait.getDefaultExecStrategy(),
_physicalPlannerContext.getDefaultHashFunction());
}
} else {
if (!relCollation.getKeys().isEmpty()) {
@@ -244,7 +243,7 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
currentNodeExchange = new
PhysicalExchange(_physicalPlannerContext.getNodeIdGenerator().get(),
oldExchange.getPRelInput(0),
newDataDistribution.withCollation(relCollation),
oldExchange.getDistributionKeys(),
oldExchange.getExchangeStrategy(), relCollation,
- PinotExecStrategyTrait.getDefaultExecStrategy());
+ PinotExecStrategyTrait.getDefaultExecStrategy(),
oldExchange.getHashFunction());
}
}
return currentNodeExchange;
@@ -264,23 +263,25 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
RelDistribution.Type.BROADCAST_DISTRIBUTED,
currentNodeDistribution.getWorkers(),
currentNodeDistribution.getWorkerHash(), null, null);
return new PhysicalExchange(nodeId(), currentNode,
pinotDataDistribution, List.of(),
- ExchangeStrategy.BROADCAST_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ ExchangeStrategy.BROADCAST_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy(),
+ _physicalPlannerContext.getDefaultHashFunction());
}
if (distributionConstraint.getType() == RelDistribution.Type.SINGLETON) {
List<String> newWorkers =
currentNodeDistribution.getWorkers().subList(0, 1);
PinotDataDistribution pinotDataDistribution = new
PinotDataDistribution(RelDistribution.Type.SINGLETON,
newWorkers, newWorkers.hashCode(), null, null);
return new PhysicalExchange(nodeId(), currentNode,
pinotDataDistribution, List.of(),
- ExchangeStrategy.SINGLETON_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ ExchangeStrategy.SINGLETON_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy(),
+ _physicalPlannerContext.getDefaultHashFunction());
}
if (distributionConstraint.getType() ==
RelDistribution.Type.HASH_DISTRIBUTED) {
- HashDistributionDesc desc = new HashDistributionDesc(
- distributionConstraint.getKeys(), DEFAULT_HASH_FUNCTION,
currentNodeDistribution.getWorkers().size());
- PinotDataDistribution pinotDataDistribution = new PinotDataDistribution(
- RelDistribution.Type.HASH_DISTRIBUTED,
currentNodeDistribution.getWorkers(),
- currentNodeDistribution.getWorkerHash(), ImmutableSet.of(desc),
null);
+ HashDistributionDesc desc = new
HashDistributionDesc(distributionConstraint.getKeys(),
+ _physicalPlannerContext.getDefaultHashFunction(),
currentNodeDistribution.getWorkers().size());
+ PinotDataDistribution pinotDataDistribution = new
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
+ currentNodeDistribution.getWorkers(),
currentNodeDistribution.getWorkerHash(), ImmutableSet.of(desc), null);
return new PhysicalExchange(nodeId(), currentNode,
pinotDataDistribution, distributionConstraint.getKeys(),
- ExchangeStrategy.PARTITIONING_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ ExchangeStrategy.PARTITIONING_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy(),
+ _physicalPlannerContext.getDefaultHashFunction());
}
throw new IllegalStateException("Distribution constraint not met: " +
distributionConstraint.getType());
}
@@ -305,7 +306,7 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
PinotDataDistribution newDistribution = new
PinotDataDistribution(RelDistribution.Type.RANDOM_DISTRIBUTED,
parentDistribution.getWorkers(), parentDistribution.getWorkerHash(),
null, null);
return new PhysicalExchange(nodeId(), currentNode, newDistribution,
List.of(), ExchangeStrategy.RANDOM_EXCHANGE,
- null, PinotExecStrategyTrait.getDefaultExecStrategy());
+ null, PinotExecStrategyTrait.getDefaultExecStrategy(),
_physicalPlannerContext.getDefaultHashFunction());
} else if (relDistribution.getType() ==
RelDistribution.Type.BROADCAST_DISTRIBUTED) {
if (assumedDistribution.getType() ==
RelDistribution.Type.BROADCAST_DISTRIBUTED) {
if (parentHasSameWorkers) {
@@ -317,7 +318,8 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
PinotDataDistribution newDistribution = new
PinotDataDistribution(RelDistribution.Type.BROADCAST_DISTRIBUTED,
parentDistribution.getWorkers(), parentDistribution.getWorkerHash(),
null, null);
return new PhysicalExchange(nodeId(), currentNode, newDistribution,
List.of(),
- ExchangeStrategy.BROADCAST_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ ExchangeStrategy.BROADCAST_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy(),
+ _physicalPlannerContext.getDefaultHashFunction());
} else if (relDistribution.getType() == RelDistribution.Type.SINGLETON) {
if (parentHasSameWorkers) {
return null;
@@ -327,7 +329,8 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
PinotDataDistribution newDistribution = new
PinotDataDistribution(RelDistribution.Type.SINGLETON,
parentDistribution.getWorkers(), parentDistribution.getWorkerHash(),
null, null);
return new PhysicalExchange(nodeId(), currentNode, newDistribution,
List.of(),
- ExchangeStrategy.SINGLETON_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ ExchangeStrategy.SINGLETON_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy(),
+ _physicalPlannerContext.getDefaultHashFunction());
}
Preconditions.checkState(relDistribution.getType() ==
RelDistribution.Type.HASH_DISTRIBUTED,
"Unexpected distribution constraint: %s", relDistribution.getType());
@@ -355,7 +358,8 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
parentDistribution.getWorkers(),
parentDistribution.getWorkerHash(),
assumedDistribution.getHashDistributionDesc(),
assumedDistribution.getCollation());
return new PhysicalExchange(nodeId(), currentNode,
newDistribution, List.of(),
- ExchangeStrategy.IDENTITY_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ ExchangeStrategy.IDENTITY_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy(),
+ _physicalPlannerContext.getDefaultHashFunction());
}
}
}
@@ -363,14 +367,15 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
}
// Re-partition.
int numberOfPartitions = hashDistToMatch.getNumPartitions();
- String hashFunction = hashDistToMatch.getHashFunction();
+ DistHashFunction hashFunction = hashDistToMatch.getHashFunction();
HashDistributionDesc newDesc = new
HashDistributionDesc(relDistribution.getKeys(), hashFunction,
numberOfPartitions);
PinotDataDistribution newDistribution = new
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
parentDistribution.getWorkers(), parentDistribution.getWorkerHash(),
ImmutableSet.of(newDesc),
null);
return new PhysicalExchange(nodeId(), currentNode, newDistribution,
relDistribution.getKeys(),
- ExchangeStrategy.PARTITIONING_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ ExchangeStrategy.PARTITIONING_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy(),
+ hashFunction);
}
private boolean complicatedButColocated(int partitionOne, int partitionTwo,
int numStreams) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
index ca481f3923a..940cc4eedb1 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
@@ -28,7 +28,6 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
public class MailboxSendNode extends BasePlanNode {
@@ -65,14 +64,6 @@ public class MailboxSendNode extends BasePlanNode {
distributionType, keys, prePartitioned, collations, sort,
hashFunction);
}
- public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode>
inputs,
- int receiverStage, PinotRelExchangeType exchangeType,
- RelDistribution.Type distributionType, @Nullable List<Integer> keys,
boolean prePartitioned,
- @Nullable List<RelFieldCollation> collations, boolean sort) {
- this(stageId, dataSchema, inputs, toBitSet(receiverStage), exchangeType,
distributionType, keys, prePartitioned,
- collations, sort, KeySelector.DEFAULT_HASH_ALGORITHM);
- }
-
public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode>
inputs,
int receiverStage, PinotRelExchangeType exchangeType,
RelDistribution.Type distributionType, @Nullable List<Integer> keys,
boolean prePartitioned,
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
index 021e65fa765..c20b4b161ea 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
@@ -135,7 +135,8 @@ public class StagesTestBase {
boolean prePartitioned, List<RelFieldCollation> collations, boolean
sort, boolean sortedOnSender) {
PlanNode input = childBuilder.build(nextStageId);
MailboxSendNode mailboxSendNode = new MailboxSendNode(nextStageId,
input.getDataSchema(), List.of(input),
- stageId, exchangeType, distribution, keys, prePartitioned,
collations, sort);
+ stageId, exchangeType, distribution, keys, prePartitioned,
collations, sort,
+ KeySelector.DEFAULT_HASH_ALGORITHM);
MailboxSendNode old = _stageRoots.put(nextStageId, mailboxSendNode);
Preconditions.checkState(old == null, "Mailbox already exists for
stageId: %s", nextStageId);
return new MailboxReceiveNode(stageId, input.getDataSchema(),
nextStageId, exchangeType, distribution, keys,
@@ -192,7 +193,7 @@ public class StagesTestBase {
return (stageId, mySchema, myHints) -> {
PlanNode input = childBuilder.build(stageId);
MailboxSendNode mailboxSendNode = new MailboxSendNode(newStageId,
mySchema, List.of(input), stageId, null,
- null, null, false, null, false);
+ null, null, false, null, false, KeySelector.DEFAULT_HASH_ALGORITHM);
MailboxSendNode old = _stageRoots.put(stageId, mailboxSendNode);
Preconditions.checkState(old == null, "Mailbox already exists for
stageId: %s", stageId);
return mailboxSendNode;
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistributionTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistributionTest.java
index c0939a267b6..d173f8fb6ae 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistributionTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistributionTest.java
@@ -97,7 +97,8 @@ public class PinotDataDistributionTest {
final int numPartitions = 8;
PinotDataDistribution distribution = new
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
ImmutableList.of("0@0", "1@0"), 0L, Collections.singleton(
- new HashDistributionDesc(keys, MURMUR_HASH_FUNCTION,
numPartitions)), null);
+ new HashDistributionDesc(keys,
+
DistHashFunction.valueOf(MURMUR_HASH_FUNCTION.toUpperCase()), numPartitions)),
null);
assertTrue(distribution.satisfies(RelDistributions.hash(keys)));
}
{
@@ -120,7 +121,8 @@ public class PinotDataDistributionTest {
public void testSatisfiesHashDistributionDesc() {
PinotDataDistribution distribution = new
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
ImmutableList.of("0@0", "1@0"), 0L, Collections.singleton(
- new HashDistributionDesc(ImmutableList.of(1, 3),
MURMUR_HASH_FUNCTION, 8)), null);
+ new HashDistributionDesc(ImmutableList.of(1, 3),
+ DistHashFunction.valueOf(MURMUR_HASH_FUNCTION.toUpperCase()),
8)), null);
{
// Case-1: Hash distribution desc with different keys.
assertNull(distribution.satisfiesHashDistributionDesc(ImmutableList.of(1, 2)));
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
index a5103e08ead..c6b6895119a 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
@@ -46,6 +46,7 @@ import static org.testng.Assert.*;
public class LeafStageWorkerAssignmentRuleTest {
private static final String TABLE_NAME = "testTable";
+ private static final String TABLE_NAME_RT = "testTable_REALTIME";
private static final String INVALID_SEGMENT_PARTITION =
"testTable__1__35__20250509T1444Z";
private static final List<String> FIELDS_IN_SCAN = List.of("userId",
"orderId", "orderAmount", "cityId", "cityName");
private static final String PARTITION_COLUMN = "userId";
@@ -119,7 +120,7 @@ public class LeafStageWorkerAssignmentRuleTest {
HashDistributionDesc desc =
result._pinotDataDistribution.getHashDistributionDesc().iterator().next();
assertEquals(desc.getNumPartitions(), OFFLINE_NUM_PARTITIONS);
assertEquals(desc.getKeys(),
List.of(FIELDS_IN_SCAN.indexOf(PARTITION_COLUMN)));
- assertEquals(desc.getHashFunction(), PARTITION_FUNCTION);
+ assertEquals(desc.getHashFunction().name(),
PARTITION_FUNCTION.toUpperCase());
validateTableScanAssignment(result,
OFFLINE_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap, "OFFLINE");
}
@@ -135,7 +136,7 @@ public class LeafStageWorkerAssignmentRuleTest {
HashDistributionDesc desc =
result._pinotDataDistribution.getHashDistributionDesc().iterator().next();
assertEquals(desc.getNumPartitions(), REALTIME_NUM_PARTITIONS);
assertEquals(desc.getKeys(),
List.of(FIELDS_IN_SCAN.indexOf(PARTITION_COLUMN)));
- assertEquals(desc.getHashFunction(), PARTITION_FUNCTION);
+ assertEquals(desc.getHashFunction().name(),
PARTITION_FUNCTION.toUpperCase());
validateTableScanAssignment(result,
REALTIME_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME");
}
@@ -157,7 +158,7 @@ public class LeafStageWorkerAssignmentRuleTest {
HashDistributionDesc desc =
result._pinotDataDistribution.getHashDistributionDesc().iterator().next();
assertEquals(desc.getNumPartitions(), REALTIME_NUM_PARTITIONS);
assertEquals(desc.getKeys(),
List.of(FIELDS_IN_SCAN.indexOf(PARTITION_COLUMN)));
- assertEquals(desc.getHashFunction(), PARTITION_FUNCTION);
+ assertEquals(desc.getHashFunction().name(),
PARTITION_FUNCTION.toUpperCase());
validateTableScanAssignment(result,
REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS._realtimeTableSegmentsMap,
"REALTIME");
}
@@ -270,6 +271,20 @@ public class LeafStageWorkerAssignmentRuleTest {
inferPartitions, tableNameWithType, 8));
}
+ @Test
+ public void testAttemptPartitionedDistributionWhenInvalidHashFunction() {
+ TablePartitionInfo tablePartitionInfo = createRealtimeTablePartitionInfo();
+ // Test with this tablePartitionInfo to confirm partitioned distribution
is generated.
+
assertNotNull(LeafStageWorkerAssignmentRule.attemptPartitionedDistribution(TABLE_NAME_RT,
FIELDS_IN_SCAN, Map.of(),
+ tablePartitionInfo, false, false));
+ // Change TPI to set an invalid function name.
+ tablePartitionInfo = new
TablePartitionInfo(tablePartitionInfo.getTableNameWithType(),
+ tablePartitionInfo.getPartitionColumn(), "foobar",
tablePartitionInfo.getNumPartitions(),
+ tablePartitionInfo.getSegmentsByPartition(),
tablePartitionInfo.getSegmentsWithInvalidPartition());
+
assertNull(LeafStageWorkerAssignmentRule.attemptPartitionedDistribution(TABLE_NAME_RT,
FIELDS_IN_SCAN, Map.of(),
+ tablePartitionInfo, false, false));
+ }
+
@Test
public void testInferPartitionId() {
// Valid name cases. When numPartitions is less than the stream partition
number, then we expect modulus to be used.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]