This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a02253d671d [multistage] Support Hash Functions Gracefully in V2 
Optimizer (#16296)
a02253d671d is described below

commit a02253d671d7781acc16b81628e51bd11a3005a0
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Jul 8 01:47:01 2025 -0500

    [multistage] Support Hash Functions Gracefully in V2 Optimizer (#16296)
---
 .../org/apache/pinot/query/QueryEnvironment.java   |  2 +-
 .../query/context/PhysicalPlannerContext.java      | 14 ++++++-
 .../planner/physical/v2/DistHashFunction.java      | 43 ++++++++++++++++++++++
 .../planner/physical/v2/HashDistributionDesc.java  |  6 +--
 .../physical/v2/PRelToPlanNodeConverter.java       |  3 +-
 .../v2/PlanFragmentAndMailboxAssignment.java       |  2 +-
 .../physical/v2/nodes/PhysicalExchange.java        | 13 +++++--
 .../v2/opt/rules/AggregatePushdownRule.java        |  2 +-
 .../opt/rules/LeafStageWorkerAssignmentRule.java   |  8 +++-
 .../v2/opt/rules/LiteModeWorkerAssignmentRule.java |  3 +-
 .../v2/opt/rules/RootExchangeInsertRule.java       |  3 +-
 .../physical/v2/opt/rules/SortPushdownRule.java    |  3 +-
 .../v2/opt/rules/WorkerExchangeAssignmentRule.java | 43 ++++++++++++----------
 .../query/planner/plannode/MailboxSendNode.java    |  9 -----
 .../query/planner/logical/StagesTestBase.java      |  5 ++-
 .../physical/v2/PinotDataDistributionTest.java     |  6 ++-
 .../rules/LeafStageWorkerAssignmentRuleTest.java   | 21 +++++++++--
 17 files changed, 133 insertions(+), 53 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 2d30a9cdf38..1ee92b679cd 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -195,7 +195,7 @@ public class QueryEnvironment {
           workerManager.getHostName(), workerManager.getPort(), 
_envConfig.getRequestId(),
           workerManager.getInstanceId(), sqlNodeAndOptions.getOptions(),
           _envConfig.defaultUseLiteMode(), _envConfig.defaultRunInBroker(), 
_envConfig.defaultUseBrokerPruning(),
-          _envConfig.defaultLiteModeServerStageLimit());
+          _envConfig.defaultLiteModeServerStageLimit(), 
_envConfig.defaultHashFunction());
     }
     return new PlannerContext(_config, _catalogReader, _typeFactory, 
optProgram, traitProgram,
         sqlNodeAndOptions.getOptions(), _envConfig, format, 
physicalPlannerContext);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index e8e46b39405..f67004bd2a7 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -28,6 +28,8 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.physical.v2.DistHashFunction;
 import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.spi.utils.CommonConstants;
 
@@ -66,6 +68,7 @@ public class PhysicalPlannerContext {
   private final boolean _runInBroker;
   private final boolean _useBrokerPruning;
   private final int _liteModeServerStageLimit;
+  private final DistHashFunction _defaultHashFunction;
 
   /**
    * Used by controller when it needs to extract table names from the query.
@@ -82,18 +85,20 @@ public class PhysicalPlannerContext {
     _runInBroker = CommonConstants.Broker.DEFAULT_RUN_IN_BROKER;
     _useBrokerPruning = CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING;
     _liteModeServerStageLimit = 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT;
+    _defaultHashFunction = 
DistHashFunction.valueOf(KeySelector.DEFAULT_HASH_ALGORITHM.toUpperCase());
   }
 
   public PhysicalPlannerContext(RoutingManager routingManager, String 
hostName, int port, long requestId,
       String instanceId, Map<String, String> queryOptions) {
     this(routingManager, hostName, port, requestId, instanceId, queryOptions,
         CommonConstants.Broker.DEFAULT_USE_LITE_MODE, 
CommonConstants.Broker.DEFAULT_RUN_IN_BROKER,
-        CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING, 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT);
+        CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING, 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT,
+        KeySelector.DEFAULT_HASH_ALGORITHM);
   }
 
   public PhysicalPlannerContext(RoutingManager routingManager, String 
hostName, int port, long requestId,
       String instanceId, Map<String, String> queryOptions, boolean 
defaultUseLiteMode, boolean defaultRunInBroker,
-      boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit) {
+      boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit, 
String defaultHashFunction) {
     _routingManager = routingManager;
     _hostName = hostName;
     _port = port;
@@ -105,6 +110,7 @@ public class PhysicalPlannerContext {
     _useBrokerPruning = QueryOptionsUtils.isUseBrokerPruning(_queryOptions, 
defaultUseBrokerPruning);
     _liteModeServerStageLimit = 
QueryOptionsUtils.getLiteModeServerStageLimit(_queryOptions,
         defaultLiteModeLeafStageLimit);
+    _defaultHashFunction = 
DistHashFunction.valueOf(defaultHashFunction.toUpperCase());
     _instanceIdToQueryServerInstance.put(instanceId, 
getBrokerQueryServerInstance());
   }
 
@@ -175,6 +181,10 @@ public class PhysicalPlannerContext {
         .collect(Collectors.toList()).get(numCandidates == 1 ? 0 : 
random.nextInt(numCandidates - 1));
   }
 
+  public DistHashFunction getDefaultHashFunction() {
+    return _defaultHashFunction;
+  }
+
   private QueryServerInstance getBrokerQueryServerInstance() {
     return new QueryServerInstance(_instanceId, _hostName, _port, _port);
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/DistHashFunction.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/DistHashFunction.java
new file mode 100644
index 00000000000..1f8b3d0aad5
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/DistHashFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2;
+
+/**
+ * Hash Functions supported by the v2 query optimizer. These are the hash 
functions supported by the Shuffle Exchange
+ * runtime, and the ones which are considered for the table-scan. If there's a 
hash-function in the table-scan that
+ * doesn't belong here, the table-scan won't be considered partitioned.
+ */
+public enum DistHashFunction {
+  MURMUR,
+  MURMUR3,
+  HASHCODE,
+  ABSHASHCODE;
+
+  /**
+   * Whether the given hash-function is considered by the v2 query optimizer 
for partitioning purposes.
+   */
+  public static boolean isSupported(String hashFunction) {
+    try {
+      DistHashFunction.valueOf(hashFunction.toUpperCase());
+      return true;
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
index dc373b4a882..d57073b6eea 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/HashDistributionDesc.java
@@ -39,10 +39,10 @@ import 
org.apache.pinot.query.planner.physical.v2.mapping.PinotDistMapping;
 public class HashDistributionDesc {
   private final int _cachedHashCode;
   private final List<Integer> _keys;
-  private final String _hashFunction;
+  private final DistHashFunction _hashFunction;
   private final int _numPartitions;
 
-  public HashDistributionDesc(List<Integer> keys, String hashFunction, int 
numPartitions) {
+  public HashDistributionDesc(List<Integer> keys, DistHashFunction 
hashFunction, int numPartitions) {
     _cachedHashCode = Objects.hash(keys, hashFunction, numPartitions);
     _keys = keys;
     _hashFunction = hashFunction;
@@ -53,7 +53,7 @@ public class HashDistributionDesc {
     return _keys;
   }
 
-  public String getHashFunction() {
+  public DistHashFunction getHashFunction() {
     return _hashFunction;
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
index 26b395b4648..8a79f61fe29 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
@@ -49,7 +49,6 @@ import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.logical.RexExpressionUtils;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAsOfJoin;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
@@ -119,7 +118,7 @@ public class PRelToPlanNodeConverter {
         new ArrayList<>(), node.getRelExchangeType(), 
RelDistribution.Type.ANY, node.getDistributionKeys(),
         false, node.getRelCollation().getFieldCollations(), false,
         !node.getRelCollation().getKeys().isEmpty(), Set.of() /* table names 
*/, node.getExchangeStrategy(),
-        KeySelector.DEFAULT_HASH_ALGORITHM);
+        node.getHashFunction().name());
   }
 
   public static SetOpNode convertSetOp(SetOp node) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
index bfc200981d2..d7f45ccacfb 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
@@ -102,7 +102,7 @@ public class PlanFragmentAndMailboxAssignment {
       MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, 
inputFragmentSchema, new ArrayList<>(),
           currentFragmentId, PinotRelExchangeType.getDefaultExchangeType(), 
distributionType,
           physicalExchange.getDistributionKeys(), false, 
physicalExchange.getRelCollation().getFieldCollations(),
-          false /* sort on sender */);
+          false /* sort on sender */, 
physicalExchange.getHashFunction().name());
       MailboxReceiveNode receiveNode = new 
MailboxReceiveNode(currentFragmentId, inputFragmentSchema,
           senderFragmentId, PinotRelExchangeType.getDefaultExchangeType(), 
distributionType,
           physicalExchange.getDistributionKeys(), 
physicalExchange.getRelCollation().getFieldCollations(),
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
index b00d786f00c..5964a086014 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
@@ -33,6 +33,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
 import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTraitDef;
+import org.apache.pinot.query.planner.physical.v2.DistHashFunction;
 import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
 import org.apache.pinot.query.planner.physical.v2.PRelNode;
 import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
@@ -76,10 +77,11 @@ public class PhysicalExchange extends Exchange implements 
PRelNode {
    * When not empty, records in each output stream will be sorted by the 
ordering defined by this collation.
    */
   private final RelCollation _relCollation;
+  private final DistHashFunction _hashFunction;
 
   public PhysicalExchange(int nodeId, PRelNode input, @Nullable 
PinotDataDistribution pinotDataDistribution,
       List<Integer> distributionKeys, ExchangeStrategy exchangeStrategy, 
@Nullable RelCollation relCollation,
-      PinotExecStrategyTrait execStrategyTrait) {
+      PinotExecStrategyTrait execStrategyTrait, DistHashFunction hashFunction) 
{
     super(input.unwrap().getCluster(), 
EMPTY_TRAIT_SET.plus(execStrategyTrait), input.unwrap(),
         ExchangeStrategy.getRelDistribution(exchangeStrategy, 
distributionKeys));
     _nodeId = nodeId;
@@ -88,6 +90,7 @@ public class PhysicalExchange extends Exchange implements 
PRelNode {
     _distributionKeys = distributionKeys;
     _exchangeStrategy = exchangeStrategy;
     _relCollation = relCollation == null ? RelCollations.EMPTY : relCollation;
+    _hashFunction = hashFunction;
   }
 
   @Override
@@ -95,7 +98,7 @@ public class PhysicalExchange extends Exchange implements 
PRelNode {
     Preconditions.checkState(newInput instanceof PRelNode, "Expected input of 
PhysicalExchange to be a PRelNode");
     // TODO(mse-physical): this always uses streaming exec strategy at the 
moment.
     return new PhysicalExchange(_nodeId, (PRelNode) newInput, 
_pinotDataDistribution, _distributionKeys,
-        _exchangeStrategy, _relCollation, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+        _exchangeStrategy, _relCollation, 
PinotExecStrategyTrait.getDefaultExecStrategy(), _hashFunction);
   }
 
   @Override
@@ -152,10 +155,14 @@ public class PhysicalExchange extends Exchange implements 
PRelNode {
     return trait.getType();
   }
 
+  public DistHashFunction getHashFunction() {
+    return _hashFunction;
+  }
+
   @Override
   public PRelNode with(int newNodeId, List<PRelNode> newInputs, 
PinotDataDistribution newDistribution) {
     return new PhysicalExchange(newNodeId, newInputs.get(0), newDistribution, 
_distributionKeys, _exchangeStrategy,
-        _relCollation, PinotExecStrategyTrait.getDefaultExecStrategy());
+        _relCollation, PinotExecStrategyTrait.getDefaultExecStrategy(), 
_hashFunction);
   }
 
   @Override public RelWriter explainTerms(RelWriter pw) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/AggregatePushdownRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/AggregatePushdownRule.java
index f38dcef4807..05ed48c5fb6 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/AggregatePushdownRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/AggregatePushdownRule.java
@@ -136,7 +136,7 @@ public class AggregatePushdownRule extends PRelOptRule {
         : PinotDistMapping.apply(o1.getRelCollation(), 
mapFromInputToPartialAgg);
     PhysicalExchange n1 = new PhysicalExchange(o1.getNodeId(), n2,
         o1.getPinotDataDistributionOrThrow().apply(mapFromInputToPartialAgg), 
newDistKeys, o1.getExchangeStrategy(),
-        newCollation, PinotExecStrategyTrait.getDefaultExecStrategy());
+        newCollation, PinotExecStrategyTrait.getDefaultExecStrategy(), 
o1.getHashFunction());
     return convertAggFromIntermediateInput(aggPRelNode, n1, AggType.FINAL, 
leafReturnFinalResult,
         PinotDistMapping.apply(RelCollations.of(o0.getCollations()), 
mapFromInputToPartialAgg).getFieldCollations(),
         aggPRelNode.getLimit(), idGenerator);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
index 09008e41448..1ac9d9f1bf7 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
@@ -57,6 +57,7 @@ import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.context.PhysicalPlannerContext;
 import org.apache.pinot.query.planner.logical.LeafStageToPinotQuery;
+import org.apache.pinot.query.planner.physical.v2.DistHashFunction;
 import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
 import org.apache.pinot.query.planner.physical.v2.PRelNode;
 import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
@@ -281,7 +282,9 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
     int keyIndex = fieldNames.indexOf(tablePartitionInfo.getPartitionColumn());
     String function = tablePartitionInfo.getPartitionFunctionName();
     int numSelectedServers = instanceIdToSegmentsMap.size();
-    if (keyIndex == -1) {
+    if (!DistHashFunction.isSupported(function)) {
+      return null;
+    } else if (keyIndex == -1) {
       LOGGER.warn("Unable to find partition column {} in table scan fields 
{}", tablePartitionInfo.getPartitionColumn(),
           fieldNames);
       return null;
@@ -362,7 +365,8 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
       workers.set(workerId, String.format("%s@%s", workerId, 
workers.get(workerId)));
       workerIdToSegmentsMap.put(workerId, ImmutableMap.of(tableType, 
segmentsForWorker));
     }
-    HashDistributionDesc desc = new 
HashDistributionDesc(ImmutableList.of(keyIndex), function, numPartitions);
+    HashDistributionDesc desc = new 
HashDistributionDesc(ImmutableList.of(keyIndex),
+        DistHashFunction.valueOf(function.toUpperCase()), numPartitions);
     PinotDataDistribution dataDistribution = new 
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
         workers, workers.hashCode(), ImmutableSet.of(desc), null);
     return new TableScanWorkerAssignmentResult(dataDistribution, 
workerIdToSegmentsMap);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
index e3cf4379bbb..4cebd16f32e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
@@ -106,7 +106,8 @@ public class LiteModeWorkerAssignmentRule implements 
PRelNodeTransformer {
           RelDistribution.Type.SINGLETON, liteModeWorkers, 
liteModeWorkers.hashCode(), null, null);
     }
     return new PhysicalExchange(nodeId(), leafStageRoot, pdd, 
Collections.emptyList(),
-        ExchangeStrategy.SINGLETON_EXCHANGE, collation, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+        ExchangeStrategy.SINGLETON_EXCHANGE, collation, 
PinotExecStrategyTrait.getDefaultExecStrategy(),
+        _context.getDefaultHashFunction());
   }
 
   /**
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
index 714f096a319..c2c53ffd631 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
@@ -53,7 +53,8 @@ public class RootExchangeInsertRule implements 
PRelNodeTransformer {
     PinotDataDistribution pinotDataDistribution = new 
PinotDataDistribution(RelDistribution.Type.SINGLETON,
         workers, workers.hashCode(), null, inferCollation(currentNode));
     return new PhysicalExchange(nodeId(), currentNode, pinotDataDistribution, 
List.of(),
-        ExchangeStrategy.SINGLETON_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+        ExchangeStrategy.SINGLETON_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy(),
+        _context.getDefaultHashFunction());
   }
 
   private String brokerWorkerId() {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/SortPushdownRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/SortPushdownRule.java
index f906bcffd90..94481d515fb 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/SortPushdownRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/SortPushdownRule.java
@@ -78,7 +78,8 @@ public class SortPushdownRule extends PRelOptRule {
     PhysicalSort n2 = new PhysicalSort(sort.getCluster(), 
RelTraitSet.createEmpty(), List.of(), sort.collation,
         null /* offset */, newSortFetch, o2, nodeId(), 
o2.getPinotDataDistributionOrThrow(), o2.isLeafStage());
     PhysicalExchange n1 = new PhysicalExchange(nodeId(), n2, 
o1.getPinotDataDistributionOrThrow(),
-        o1.getDistributionKeys(), o1.getExchangeStrategy(), 
o1.getRelCollation(), o1.getExecStrategy());
+        o1.getDistributionKeys(), o1.getExchangeStrategy(), 
o1.getRelCollation(), o1.getExecStrategy(),
+        o1.getHashFunction());
     return new PhysicalSort(sort.getCluster(), sort.getTraitSet(), 
sort.getHints(), sort.getCollation(),
         sort.offset, sort.fetch, n1, sort.getNodeId(), 
sort.getPinotDataDistributionOrThrow(), false);
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
index 8bffea30933..2f338083bd2 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
@@ -43,7 +43,7 @@ import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
 import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
 import org.apache.pinot.query.context.PhysicalPlannerContext;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.physical.v2.DistHashFunction;
 import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
 import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
 import org.apache.pinot.query.planner.physical.v2.PRelNode;
@@ -77,7 +77,6 @@ import org.slf4j.LoggerFactory;
 public class WorkerExchangeAssignmentRule implements PRelNodeTransformer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WorkerExchangeAssignmentRule.class);
   private final PhysicalPlannerContext _physicalPlannerContext;
-  private static final String DEFAULT_HASH_FUNCTION = 
KeySelector.DEFAULT_HASH_ALGORITHM;
 
   public WorkerExchangeAssignmentRule(PhysicalPlannerContext context) {
     _physicalPlannerContext = context;
@@ -150,7 +149,7 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
       // Update current node with its distribution, and since this is a leaf 
stage boundary, add an identity exchange.
       return new PhysicalExchange(nodeId(), currentNode,
           currentNode.getPinotDataDistribution(), Collections.emptyList(), 
ExchangeStrategy.IDENTITY_EXCHANGE,
-          null, PinotExecStrategyTrait.getDefaultExecStrategy());
+          null, PinotExecStrategyTrait.getDefaultExecStrategy(), 
_physicalPlannerContext.getDefaultHashFunction());
     }
     // When no exchange, simply update current node with the distribution.
     return currentNode.with(currentNode.getPRelInputs(), 
currentNodeDistribution);
@@ -234,7 +233,7 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
         PinotDataDistribution newDataDistribution = 
derivedDistribution.withCollation(relCollation);
         currentNodeExchange = new PhysicalExchange(nodeId(), currentNode,
             newDataDistribution, Collections.emptyList(), 
ExchangeStrategy.IDENTITY_EXCHANGE, relCollation,
-            PinotExecStrategyTrait.getDefaultExecStrategy());
+            PinotExecStrategyTrait.getDefaultExecStrategy(), 
_physicalPlannerContext.getDefaultHashFunction());
       }
     } else {
       if (!relCollation.getKeys().isEmpty()) {
@@ -244,7 +243,7 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
         currentNodeExchange = new 
PhysicalExchange(_physicalPlannerContext.getNodeIdGenerator().get(),
             oldExchange.getPRelInput(0), 
newDataDistribution.withCollation(relCollation),
             oldExchange.getDistributionKeys(), 
oldExchange.getExchangeStrategy(), relCollation,
-            PinotExecStrategyTrait.getDefaultExecStrategy());
+            PinotExecStrategyTrait.getDefaultExecStrategy(), 
oldExchange.getHashFunction());
       }
     }
     return currentNodeExchange;
@@ -264,23 +263,25 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
           RelDistribution.Type.BROADCAST_DISTRIBUTED, 
currentNodeDistribution.getWorkers(),
           currentNodeDistribution.getWorkerHash(), null, null);
       return new PhysicalExchange(nodeId(), currentNode, 
pinotDataDistribution, List.of(),
-          ExchangeStrategy.BROADCAST_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+          ExchangeStrategy.BROADCAST_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy(),
+          _physicalPlannerContext.getDefaultHashFunction());
     }
     if (distributionConstraint.getType() == RelDistribution.Type.SINGLETON) {
       List<String> newWorkers = 
currentNodeDistribution.getWorkers().subList(0, 1);
       PinotDataDistribution pinotDataDistribution = new 
PinotDataDistribution(RelDistribution.Type.SINGLETON,
           newWorkers, newWorkers.hashCode(), null, null);
       return new PhysicalExchange(nodeId(), currentNode, 
pinotDataDistribution, List.of(),
-          ExchangeStrategy.SINGLETON_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+          ExchangeStrategy.SINGLETON_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy(),
+          _physicalPlannerContext.getDefaultHashFunction());
     }
     if (distributionConstraint.getType() == 
RelDistribution.Type.HASH_DISTRIBUTED) {
-      HashDistributionDesc desc = new HashDistributionDesc(
-          distributionConstraint.getKeys(), DEFAULT_HASH_FUNCTION, 
currentNodeDistribution.getWorkers().size());
-      PinotDataDistribution pinotDataDistribution = new PinotDataDistribution(
-          RelDistribution.Type.HASH_DISTRIBUTED, 
currentNodeDistribution.getWorkers(),
-          currentNodeDistribution.getWorkerHash(), ImmutableSet.of(desc), 
null);
+      HashDistributionDesc desc = new 
HashDistributionDesc(distributionConstraint.getKeys(),
+          _physicalPlannerContext.getDefaultHashFunction(), 
currentNodeDistribution.getWorkers().size());
+      PinotDataDistribution pinotDataDistribution = new 
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
+          currentNodeDistribution.getWorkers(), 
currentNodeDistribution.getWorkerHash(), ImmutableSet.of(desc), null);
       return new PhysicalExchange(nodeId(), currentNode, 
pinotDataDistribution, distributionConstraint.getKeys(),
-          ExchangeStrategy.PARTITIONING_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+          ExchangeStrategy.PARTITIONING_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy(),
+          _physicalPlannerContext.getDefaultHashFunction());
     }
     throw new IllegalStateException("Distribution constraint not met: " + 
distributionConstraint.getType());
   }
@@ -305,7 +306,7 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
       PinotDataDistribution newDistribution = new 
PinotDataDistribution(RelDistribution.Type.RANDOM_DISTRIBUTED,
           parentDistribution.getWorkers(), parentDistribution.getWorkerHash(), 
null, null);
       return new PhysicalExchange(nodeId(), currentNode, newDistribution, 
List.of(), ExchangeStrategy.RANDOM_EXCHANGE,
-          null, PinotExecStrategyTrait.getDefaultExecStrategy());
+          null, PinotExecStrategyTrait.getDefaultExecStrategy(), 
_physicalPlannerContext.getDefaultHashFunction());
     } else if (relDistribution.getType() == 
RelDistribution.Type.BROADCAST_DISTRIBUTED) {
       if (assumedDistribution.getType() == 
RelDistribution.Type.BROADCAST_DISTRIBUTED) {
         if (parentHasSameWorkers) {
@@ -317,7 +318,8 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
       PinotDataDistribution newDistribution = new 
PinotDataDistribution(RelDistribution.Type.BROADCAST_DISTRIBUTED,
           parentDistribution.getWorkers(), parentDistribution.getWorkerHash(), 
null, null);
       return new PhysicalExchange(nodeId(), currentNode, newDistribution, 
List.of(),
-          ExchangeStrategy.BROADCAST_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+          ExchangeStrategy.BROADCAST_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy(),
+          _physicalPlannerContext.getDefaultHashFunction());
     } else if (relDistribution.getType() == RelDistribution.Type.SINGLETON) {
       if (parentHasSameWorkers) {
         return null;
@@ -327,7 +329,8 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
       PinotDataDistribution newDistribution = new 
PinotDataDistribution(RelDistribution.Type.SINGLETON,
           parentDistribution.getWorkers(), parentDistribution.getWorkerHash(), 
null, null);
       return new PhysicalExchange(nodeId(), currentNode, newDistribution, 
List.of(),
-          ExchangeStrategy.SINGLETON_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+          ExchangeStrategy.SINGLETON_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy(),
+          _physicalPlannerContext.getDefaultHashFunction());
     }
     Preconditions.checkState(relDistribution.getType() == 
RelDistribution.Type.HASH_DISTRIBUTED,
         "Unexpected distribution constraint: %s", relDistribution.getType());
@@ -355,7 +358,8 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
                 parentDistribution.getWorkers(), 
parentDistribution.getWorkerHash(),
                 assumedDistribution.getHashDistributionDesc(), 
assumedDistribution.getCollation());
             return new PhysicalExchange(nodeId(), currentNode, 
newDistribution, List.of(),
-                ExchangeStrategy.IDENTITY_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+                ExchangeStrategy.IDENTITY_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy(),
+                _physicalPlannerContext.getDefaultHashFunction());
           }
         }
       }
@@ -363,14 +367,15 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
     }
     // Re-partition.
     int numberOfPartitions = hashDistToMatch.getNumPartitions();
-    String hashFunction = hashDistToMatch.getHashFunction();
+    DistHashFunction hashFunction = hashDistToMatch.getHashFunction();
     HashDistributionDesc newDesc = new 
HashDistributionDesc(relDistribution.getKeys(), hashFunction,
         numberOfPartitions);
     PinotDataDistribution newDistribution = new 
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
         parentDistribution.getWorkers(), parentDistribution.getWorkerHash(), 
ImmutableSet.of(newDesc),
         null);
     return new PhysicalExchange(nodeId(), currentNode, newDistribution, 
relDistribution.getKeys(),
-        ExchangeStrategy.PARTITIONING_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+        ExchangeStrategy.PARTITIONING_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy(),
+        hashFunction);
   }
 
   private boolean complicatedButColocated(int partitionOne, int partitionTwo, 
int numStreams) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
index ca481f3923a..940cc4eedb1 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
@@ -28,7 +28,6 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
 
 
 public class MailboxSendNode extends BasePlanNode {
@@ -65,14 +64,6 @@ public class MailboxSendNode extends BasePlanNode {
         distributionType, keys, prePartitioned, collations, sort, 
hashFunction);
   }
 
-  public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
-      int receiverStage, PinotRelExchangeType exchangeType,
-      RelDistribution.Type distributionType, @Nullable List<Integer> keys, 
boolean prePartitioned,
-      @Nullable List<RelFieldCollation> collations, boolean sort) {
-    this(stageId, dataSchema, inputs, toBitSet(receiverStage), exchangeType, 
distributionType, keys, prePartitioned,
-        collations, sort, KeySelector.DEFAULT_HASH_ALGORITHM);
-  }
-
   public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
       int receiverStage, PinotRelExchangeType exchangeType,
       RelDistribution.Type distributionType, @Nullable List<Integer> keys, 
boolean prePartitioned,
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
index 021e65fa765..c20b4b161ea 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
@@ -135,7 +135,8 @@ public class StagesTestBase {
           boolean prePartitioned, List<RelFieldCollation> collations, boolean 
sort, boolean sortedOnSender) {
         PlanNode input = childBuilder.build(nextStageId);
         MailboxSendNode mailboxSendNode = new MailboxSendNode(nextStageId, 
input.getDataSchema(), List.of(input),
-            stageId, exchangeType, distribution, keys, prePartitioned, 
collations, sort);
+            stageId, exchangeType, distribution, keys, prePartitioned, 
collations, sort,
+            KeySelector.DEFAULT_HASH_ALGORITHM);
         MailboxSendNode old = _stageRoots.put(nextStageId, mailboxSendNode);
         Preconditions.checkState(old == null, "Mailbox already exists for 
stageId: %s", nextStageId);
         return new MailboxReceiveNode(stageId, input.getDataSchema(), 
nextStageId, exchangeType, distribution, keys,
@@ -192,7 +193,7 @@ public class StagesTestBase {
     return (stageId, mySchema, myHints) -> {
       PlanNode input = childBuilder.build(stageId);
       MailboxSendNode mailboxSendNode = new MailboxSendNode(newStageId, 
mySchema, List.of(input), stageId, null,
-          null, null, false, null, false);
+          null, null, false, null, false, KeySelector.DEFAULT_HASH_ALGORITHM);
       MailboxSendNode old = _stageRoots.put(stageId, mailboxSendNode);
       Preconditions.checkState(old == null, "Mailbox already exists for 
stageId: %s", stageId);
       return mailboxSendNode;
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistributionTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistributionTest.java
index c0939a267b6..d173f8fb6ae 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistributionTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistributionTest.java
@@ -97,7 +97,8 @@ public class PinotDataDistributionTest {
       final int numPartitions = 8;
       PinotDataDistribution distribution = new 
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
           ImmutableList.of("0@0", "1@0"), 0L, Collections.singleton(
-              new HashDistributionDesc(keys, MURMUR_HASH_FUNCTION, 
numPartitions)), null);
+              new HashDistributionDesc(keys,
+                  
DistHashFunction.valueOf(MURMUR_HASH_FUNCTION.toUpperCase()), numPartitions)), 
null);
       assertTrue(distribution.satisfies(RelDistributions.hash(keys)));
     }
     {
@@ -120,7 +121,8 @@ public class PinotDataDistributionTest {
   public void testSatisfiesHashDistributionDesc() {
     PinotDataDistribution distribution = new 
PinotDataDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
         ImmutableList.of("0@0", "1@0"), 0L, Collections.singleton(
-            new HashDistributionDesc(ImmutableList.of(1, 3), 
MURMUR_HASH_FUNCTION, 8)), null);
+            new HashDistributionDesc(ImmutableList.of(1, 3),
+                DistHashFunction.valueOf(MURMUR_HASH_FUNCTION.toUpperCase()), 
8)), null);
     {
       // Case-1: Hash distribution desc with different keys.
       
assertNull(distribution.satisfiesHashDistributionDesc(ImmutableList.of(1, 2)));
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
index a5103e08ead..c6b6895119a 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
@@ -46,6 +46,7 @@ import static org.testng.Assert.*;
 
 public class LeafStageWorkerAssignmentRuleTest {
   private static final String TABLE_NAME = "testTable";
+  private static final String TABLE_NAME_RT = "testTable_REALTIME";
   private static final String INVALID_SEGMENT_PARTITION = 
"testTable__1__35__20250509T1444Z";
   private static final List<String> FIELDS_IN_SCAN = List.of("userId", 
"orderId", "orderAmount", "cityId", "cityName");
   private static final String PARTITION_COLUMN = "userId";
@@ -119,7 +120,7 @@ public class LeafStageWorkerAssignmentRuleTest {
     HashDistributionDesc desc = 
result._pinotDataDistribution.getHashDistributionDesc().iterator().next();
     assertEquals(desc.getNumPartitions(), OFFLINE_NUM_PARTITIONS);
     assertEquals(desc.getKeys(), 
List.of(FIELDS_IN_SCAN.indexOf(PARTITION_COLUMN)));
-    assertEquals(desc.getHashFunction(), PARTITION_FUNCTION);
+    assertEquals(desc.getHashFunction().name(), 
PARTITION_FUNCTION.toUpperCase());
     validateTableScanAssignment(result, 
OFFLINE_INSTANCE_ID_TO_SEGMENTS._offlineTableSegmentsMap, "OFFLINE");
   }
 
@@ -135,7 +136,7 @@ public class LeafStageWorkerAssignmentRuleTest {
     HashDistributionDesc desc = 
result._pinotDataDistribution.getHashDistributionDesc().iterator().next();
     assertEquals(desc.getNumPartitions(), REALTIME_NUM_PARTITIONS);
     assertEquals(desc.getKeys(), 
List.of(FIELDS_IN_SCAN.indexOf(PARTITION_COLUMN)));
-    assertEquals(desc.getHashFunction(), PARTITION_FUNCTION);
+    assertEquals(desc.getHashFunction().name(), 
PARTITION_FUNCTION.toUpperCase());
     validateTableScanAssignment(result, 
REALTIME_INSTANCE_ID_TO_SEGMENTS._realtimeTableSegmentsMap, "REALTIME");
   }
 
@@ -157,7 +158,7 @@ public class LeafStageWorkerAssignmentRuleTest {
       HashDistributionDesc desc = 
result._pinotDataDistribution.getHashDistributionDesc().iterator().next();
       assertEquals(desc.getNumPartitions(), REALTIME_NUM_PARTITIONS);
       assertEquals(desc.getKeys(), 
List.of(FIELDS_IN_SCAN.indexOf(PARTITION_COLUMN)));
-      assertEquals(desc.getHashFunction(), PARTITION_FUNCTION);
+      assertEquals(desc.getHashFunction().name(), 
PARTITION_FUNCTION.toUpperCase());
       validateTableScanAssignment(result,
           
REALTIME_INSTANCE_ID_TO_SEGMENTS_WITH_INVALID_PARTITIONS._realtimeTableSegmentsMap,
 "REALTIME");
     }
@@ -270,6 +271,20 @@ public class LeafStageWorkerAssignmentRuleTest {
             inferPartitions, tableNameWithType, 8));
   }
 
+  @Test
+  public void testAttemptPartitionedDistributionWhenInvalidHashFunction() {
+    TablePartitionInfo tablePartitionInfo = createRealtimeTablePartitionInfo();
+    // Test with this tablePartitionInfo to confirm partitioned distribution 
is generated.
+    
assertNotNull(LeafStageWorkerAssignmentRule.attemptPartitionedDistribution(TABLE_NAME_RT,
 FIELDS_IN_SCAN, Map.of(),
+        tablePartitionInfo, false, false));
+    // Change TPI to set an invalid function name.
+    tablePartitionInfo = new 
TablePartitionInfo(tablePartitionInfo.getTableNameWithType(),
+        tablePartitionInfo.getPartitionColumn(), "foobar", 
tablePartitionInfo.getNumPartitions(),
+        tablePartitionInfo.getSegmentsByPartition(), 
tablePartitionInfo.getSegmentsWithInvalidPartition());
+    
assertNull(LeafStageWorkerAssignmentRule.attemptPartitionedDistribution(TABLE_NAME_RT,
 FIELDS_IN_SCAN, Map.of(),
+        tablePartitionInfo, false, false));
+  }
+
   @Test
   public void testInferPartitionId() {
     // Valid name cases. When numPartitions is less than the stream partition 
number, then we expect modulus to be used.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to