This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c1a19ba43b2 [multistage] Add configurable hash function support for 
KeySelector in query planning (#16242)
c1a19ba43b2 is described below

commit c1a19ba43b29bd49a5d91e60cf8b8dd3ac7703ca
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Sun Jul 6 14:28:26 2025 -0700

    [multistage] Add configurable hash function support for KeySelector in 
query planning (#16242)
---
 .../MultiStageBrokerRequestHandler.java            |   4 +
 pinot-common/src/main/proto/plan.proto             |   1 +
 .../org/apache/pinot/query/QueryEnvironment.java   |  14 ++-
 .../planner/logical/PinotLogicalQueryPlanner.java  |  12 +-
 .../query/planner/logical/PlanFragmenter.java      |   2 +-
 .../planner/logical/RelToPlanNodeConverter.java    |   8 +-
 .../planner/partitioning/EmptyKeySelector.java     |  19 +++
 .../planner/partitioning/HashFunctionSelector.java | 130 +++++++++++++++++++++
 .../planner/partitioning/KeySelectorFactory.java   |  10 +-
 .../partitioning/MultiColumnKeySelector.java       |  21 ++--
 .../partitioning/SingleColumnKeySelector.java      |  13 ++-
 .../physical/v2/PRelToPlanNodeConverter.java       |   4 +-
 .../pinot/query/planner/plannode/ExchangeNode.java |  17 ++-
 .../query/planner/plannode/MailboxSendNode.java    |  37 +++---
 .../query/planner/serde/PlanNodeDeserializer.java  |   7 +-
 .../query/planner/serde/PlanNodeSerializer.java    |   1 +
 .../query/planner/logical/StagesTestBase.java      |   5 +-
 .../partitioning/HashFunctionSelectorTest.java     | 125 ++++++++++++++++++++
 .../partitioning/KeySelectorHashFunctionTest.java  | 109 +++++++++++++++++
 .../runtime/operator/MailboxSendOperator.java      |  14 ++-
 .../runtime/operator/exchange/BlockExchange.java   |  12 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +
 22 files changed, 513 insertions(+), 56 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 81bfe277d8e..7a3c4055a5a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -402,6 +402,9 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     int defaultLiteModeServerStageLimit = _config.getProperty(
         CommonConstants.Broker.CONFIG_OF_LITE_MODE_LEAF_STAGE_LIMIT,
         CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT);
+    String defaultHashFunction = _config.getProperty(
+        CommonConstants.Broker.CONFIG_OF_BROKER_DEFAULT_HASH_FUNCTION,
+        CommonConstants.Broker.DEFAULT_BROKER_DEFAULT_HASH_FUNCTION);
     boolean caseSensitive = !_config.getProperty(
         CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY,
         CommonConstants.Helix.DEFAULT_ENABLE_CASE_INSENSITIVE
@@ -422,6 +425,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         .defaultRunInBroker(defaultRunInBroker)
         .defaultUseBrokerPruning(defaultUseBrokerPruning)
         .defaultLiteModeServerStageLimit(defaultLiteModeServerStageLimit)
+        .defaultHashFunction(defaultHashFunction)
         .build();
   }
 
diff --git a/pinot-common/src/main/proto/plan.proto 
b/pinot-common/src/main/proto/plan.proto
index 4e4fbb16843..703abb96075 100644
--- a/pinot-common/src/main/proto/plan.proto
+++ b/pinot-common/src/main/proto/plan.proto
@@ -159,6 +159,7 @@ message MailboxSendNode {
   repeated Collation collations = 6;
   bool sort = 7;
   repeated int32 receiverStageIds = 8;
+  string hashFunction = 9;
 }
 
 message ProjectNode {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 68a73472d28..05be6cd8a9e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -475,7 +475,8 @@ public class QueryEnvironment {
           _envConfig.getWorkerManager(), requestId, 
_envConfig.getTableCache());
       return pinotDispatchPlanner.createDispatchableSubPlanV2(plan.getLeft(), 
plan.getRight());
     }
-    SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker, 
useSpools(plannerContext.getOptions()));
+    SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker, 
useSpools(plannerContext.getOptions()),
+        _envConfig.defaultHashFunction());
     PinotDispatchPlanner pinotDispatchPlanner =
         new PinotDispatchPlanner(plannerContext, 
_envConfig.getWorkerManager(), _envConfig.getRequestId(),
             _envConfig.getTableCache());
@@ -757,6 +758,17 @@ public class QueryEnvironment {
       return CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT;
     }
 
+    /**
+     * Default hash function to use for KeySelector data shuffling.
+     *
+     * This is treated as the default value for the broker and it is expected 
to be obtained from a Pinot configuration.
+     * This default value can be always overridden at query level by the query 
option.
+     */
+    @Value.Default
+    default String defaultHashFunction() {
+      return CommonConstants.Broker.DEFAULT_BROKER_DEFAULT_HASH_FUNCTION;
+    }
+
     /**
      * Returns the worker manager.
      *
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
index 9bca603e3f4..1e2ddca9491 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
@@ -60,10 +60,11 @@ public class PinotLogicalQueryPlanner {
    * Converts a Calcite {@link RelRoot} into a Pinot {@link SubPlan}.
    */
   public static SubPlan makePlan(RelRoot relRoot,
-      @Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker, 
boolean useSpools) {
-    PlanNode rootNode = new 
RelToPlanNodeConverter(tracker).toPlanNode(relRoot.rel);
+      @Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker, 
boolean useSpools,
+      String hashFunction) {
+    PlanNode rootNode = new RelToPlanNodeConverter(tracker, 
hashFunction).toPlanNode(relRoot.rel);
 
-    PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker, 
useSpools);
+    PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker, 
useSpools, hashFunction);
     return new SubPlan(rootFragment,
         new 
SubPlanMetadata(RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel), 
relRoot.fields), List.of());
 
@@ -108,7 +109,8 @@ public class PinotLogicalQueryPlanner {
   }
 
   private static PlanFragment planNodeToPlanFragment(
-      PlanNode node, @Nullable TransformationTracker.Builder<PlanNode, 
RelNode> tracker, boolean useSpools) {
+      PlanNode node, @Nullable TransformationTracker.Builder<PlanNode, 
RelNode> tracker, boolean useSpools,
+      String hashFunction) {
     PlanFragmenter fragmenter = new PlanFragmenter();
     PlanFragmenter.Context fragmenterContext = fragmenter.createContext();
     node = node.visit(fragmenter, fragmenterContext);
@@ -126,7 +128,7 @@ public class PinotLogicalQueryPlanner {
     MailboxSendNode subPlanRootSenderNode =
         new MailboxSendNode(node.getStageId(), node.getDataSchema(), 
List.of(node), 0,
             PinotRelExchangeType.getDefaultExchangeType(), 
RelDistribution.Type.BROADCAST_DISTRIBUTED, null, false,
-            null, false);
+            null, false, hashFunction);
     PlanFragment planFragment1 = new PlanFragment(1, subPlanRootSenderNode, 
new ArrayList<>());
     planFragmentMap.put(1, planFragment1);
     for (Int2ObjectMap.Entry<IntList> entry : 
childPlanFragmentIdsMap.int2ObjectEntrySet()) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
index 8ffc4c78f99..420cf505bf2 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
@@ -186,7 +186,7 @@ public class PlanFragmenter implements 
PlanNodeVisitor<PlanNode, PlanFragmenter.
     MailboxSendNode mailboxSendNode =
         new MailboxSendNode(senderPlanFragmentId, 
nextPlanFragmentRoot.getDataSchema(), List.of(nextPlanFragmentRoot),
             receiverPlanFragmentId, exchangeType, distributionType, keys, 
node.isPrePartitioned(), node.getCollations(),
-            node.isSortOnSender());
+            node.isSortOnSender(), node.getHashFunction());
     _planFragmentMap.put(senderPlanFragmentId,
         new PlanFragment(senderPlanFragmentId, mailboxSendNode, new 
ArrayList<>()));
     _mailboxSendToExchangeNodeMap.put(mailboxSendNode, node);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index c9526fa4884..abaf4704287 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -94,9 +94,12 @@ public final class RelToPlanNodeConverter {
   private boolean _windowFunctionFound;
   @Nullable
   private final TransformationTracker.Builder<PlanNode, RelNode> _tracker;
+  private final String _hashFunction;
 
-  public RelToPlanNodeConverter(@Nullable 
TransformationTracker.Builder<PlanNode, RelNode> tracker) {
+  public RelToPlanNodeConverter(@Nullable 
TransformationTracker.Builder<PlanNode, RelNode> tracker,
+      String hashFunction) {
     _tracker = tracker;
+    _hashFunction = hashFunction;
   }
 
   /**
@@ -190,7 +193,8 @@ public final class RelToPlanNodeConverter {
       }
     }
     return new ExchangeNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), 
convertInputs(node.getInputs()),
-        exchangeType, distributionType, keys, prePartitioned, collations, 
sortOnSender, sortOnReceiver, null, null);
+        exchangeType, distributionType, keys, prePartitioned, collations, 
sortOnSender, sortOnReceiver, null, null,
+        _hashFunction);
   }
 
   private SetOpNode convertLogicalSetOp(SetOp node) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java
index fc02ff257b4..087d61ff11e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java
@@ -22,11 +22,25 @@ import javax.annotation.Nullable;
 
 
 public class EmptyKeySelector implements KeySelector<Integer> {
+  private final String _hashFunction;
+
   private EmptyKeySelector() {
+    this(KeySelector.DEFAULT_HASH_ALGORITHM);
+  }
+
+  private EmptyKeySelector(String hashFunction) {
+    _hashFunction = hashFunction;
   }
 
   public static final EmptyKeySelector INSTANCE = new EmptyKeySelector();
 
+  public static EmptyKeySelector getInstance(String hashFunction) {
+    if (KeySelector.DEFAULT_HASH_ALGORITHM.equals(hashFunction)) {
+      return INSTANCE;
+    }
+    return new EmptyKeySelector(hashFunction);
+  }
+
   @Nullable
   @Override
   public Integer getKey(Object[] row) {
@@ -37,4 +51,9 @@ public class EmptyKeySelector implements KeySelector<Integer> 
{
   public int computeHash(Object[] input) {
     return 0;
   }
+
+  @Override
+  public String hashAlgorithm() {
+    return _hashFunction;
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelector.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelector.java
new file mode 100644
index 00000000000..6ab6ee7ccde
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelector.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.partitioning;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
+
+
+/**
+ * Utility class to compute hash values using different hash functions.
+ * This class provides consistent hash computation for KeySelector 
implementations.
+ */
+public class HashFunctionSelector {
+  public static final String MURMUR2 = "murmur";
+  public static final String MURMUR3 = "murmur3";
+  public static final String HASH_CODE = "hashcode";
+
+  private HashFunctionSelector() {
+  }
+
+  /**
+   * Computes a hash code for a single value using the specified hash function.
+   * @param value The value to hash.
+   * @param hashFunction The hash function to use (e.g., "murmur", "murmur3", 
"cityhash", "absHashCode").
+   * @return The computed hash code.
+   */
+  public static int computeHash(Object value, String hashFunction) {
+    if (value == null) {
+      return 0;
+    }
+
+    switch (hashFunction.toLowerCase()) {
+      case MURMUR2: return murmur2(value);
+      case MURMUR3: return murmur3(value);
+      // hashCode and absHashCode are treated the same for single hash.
+      case HASH_CODE:
+      // Default hash is absHashCode.
+      default: return absHashCode(value);
+    }
+  }
+
+  /**
+   * Computes a hash code for multiple values based on specified key IDs using 
the specified hash function.
+   * This is useful for partitioning where only certain keys are relevant.
+   * @param values The array of values to hash.
+   * @param keyIds The array of key IDs indicating which values to include in 
the hash computation.
+   * @param hashFunction The hash function to use (e.g., "murmur2", "murmur3", 
"cityhash", "absHashCode").
+   * @return The computed hash code.
+   */
+  public static int computeMultiHash(Object[] values, int[] keyIds, String 
hashFunction) {
+    if (values == null || values.length == 0) {
+      return 0;
+    }
+
+    switch (hashFunction.toLowerCase()) {
+      case MURMUR2: return murmur2(values, keyIds);
+      case MURMUR3: return murmur3(values, keyIds);
+      // hashCode and absHashCode are treated the same for multi hash.
+      case HASH_CODE:
+        // We should hashCode instead of absHashCode for multi hash to 
maintain consistency with legacy behavior.
+      default: return hashCode(values, keyIds);
+    }
+  }
+
+  private static int absHashCode(Object value) {
+    return value.hashCode() & Integer.MAX_VALUE;
+  }
+
+  private static int hashCode(Object value) {
+    return value.hashCode();
+  }
+
+  private static int murmur2(Object value) {
+    return MurmurHashFunctions.murmurHash2(toBytes(value)) & Integer.MAX_VALUE;
+  }
+
+  private static int murmur3(Object value) {
+    return MurmurHashFunctions.murmurHash3X64Bit32(toBytes(value), 0) & 
Integer.MAX_VALUE;
+  }
+
+  private static int murmur2(Object[] values, int[] keyIds) {
+    int hash = 0;
+    for (int keyId : keyIds) {
+      if (keyId < values.length && values[keyId] != null) {
+        hash += murmur2(values[keyId]);
+      }
+    }
+    return hash & Integer.MAX_VALUE;
+  }
+
+  private static int murmur3(Object[] values, int[] keyIds) {
+    int hash = 0;
+    for (int keyId : keyIds) {
+      if (keyId < values.length && values[keyId] != null) {
+        hash += murmur3(values[keyId]);
+      }
+    }
+    return hash & Integer.MAX_VALUE;
+  }
+
+  private static int hashCode(Object[] values, int[] keyIds) {
+    int hash = 0;
+    for (int keyId : keyIds) {
+      if (keyId < values.length && values[keyId] != null) {
+        hash += hashCode(values[keyId]);
+      }
+    }
+    return hash & Integer.MAX_VALUE;
+  }
+
+  private static byte[] toBytes(Object value) {
+    return value.toString().getBytes(StandardCharsets.UTF_8);
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java
index b70615f68d0..6e0a0159b6e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java
@@ -26,17 +26,21 @@ public class KeySelectorFactory {
   }
 
   public static KeySelector<?> getKeySelector(List<Integer> keyIds) {
+    return getKeySelector(keyIds, KeySelector.DEFAULT_HASH_ALGORITHM);
+  }
+
+  public static KeySelector<?> getKeySelector(List<Integer> keyIds, String 
hashFunction) {
     int numKeys = keyIds.size();
     if (numKeys == 0) {
-      return EmptyKeySelector.INSTANCE;
+      return EmptyKeySelector.getInstance(hashFunction);
     } else if (numKeys == 1) {
-      return new SingleColumnKeySelector(keyIds.get(0));
+      return new SingleColumnKeySelector(keyIds.get(0), hashFunction);
     } else {
       int[] ids = new int[numKeys];
       for (int i = 0; i < numKeys; i++) {
         ids[i] = keyIds.get(i);
       }
-      return new MultiColumnKeySelector(ids);
+      return new MultiColumnKeySelector(ids, hashFunction);
     }
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java
index c510e5b7014..51fe01668f3 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java
@@ -23,9 +23,15 @@ import org.apache.pinot.core.data.table.Key;
 
 public class MultiColumnKeySelector implements KeySelector<Key> {
   private final int[] _keyIds;
+  private final String _hashFunction;
 
   public MultiColumnKeySelector(int[] keyIds) {
+    this(keyIds, KeySelector.DEFAULT_HASH_ALGORITHM);
+  }
+
+  public MultiColumnKeySelector(int[] keyIds, String hashFunction) {
     _keyIds = keyIds;
+    _hashFunction = hashFunction;
   }
 
   @Override
@@ -56,15 +62,12 @@ public class MultiColumnKeySelector implements 
KeySelector<Key> {
     // also see: https://github.com/apache/pinot/issues/9998
     //
     // TODO: consider better hashing algorithms than hashCode sum, such as 
XOR'ing
-    int hashCode = 0;
-    for (int keyId : _keyIds) {
-      Object value = input[keyId];
-      if (value != null) {
-        hashCode += value.hashCode();
-      }
-    }
-
     // return a positive number because this is used directly to modulo-index
-    return hashCode & Integer.MAX_VALUE;
+    return HashFunctionSelector.computeMultiHash(input, _keyIds, 
_hashFunction);
+  }
+
+  @Override
+  public String hashAlgorithm() {
+    return _hashFunction;
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java
index 071078d3b60..3d19523941c 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java
@@ -23,9 +23,15 @@ import javax.annotation.Nullable;
 
 public class SingleColumnKeySelector implements KeySelector<Object> {
   private final int _keyId;
+  private final String _hashFunction;
 
   public SingleColumnKeySelector(int keyId) {
+    this(keyId, KeySelector.DEFAULT_HASH_ALGORITHM);
+  }
+
+  public SingleColumnKeySelector(int keyId, String hashFunction) {
     _keyId = keyId;
+    _hashFunction = hashFunction;
   }
 
   @Nullable
@@ -37,6 +43,11 @@ public class SingleColumnKeySelector implements 
KeySelector<Object> {
   @Override
   public int computeHash(Object[] input) {
     Object key = input[_keyId];
-    return key != null ? key.hashCode() & Integer.MAX_VALUE : 0;
+    return HashFunctionSelector.computeHash(key, _hashFunction);
+  }
+
+  @Override
+  public String hashAlgorithm() {
+    return _hashFunction;
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
index 8dc8c81dc9d..26b395b4648 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
@@ -49,6 +49,7 @@ import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.logical.RexExpressionUtils;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAsOfJoin;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
@@ -117,7 +118,8 @@ public class PRelToPlanNodeConverter {
     return new ExchangeNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()),
         new ArrayList<>(), node.getRelExchangeType(), 
RelDistribution.Type.ANY, node.getDistributionKeys(),
         false, node.getRelCollation().getFieldCollations(), false,
-        !node.getRelCollation().getKeys().isEmpty(), Set.of() /* table names 
*/, node.getExchangeStrategy());
+        !node.getRelCollation().getKeys().isEmpty(), Set.of() /* table names 
*/, node.getExchangeStrategy(),
+        KeySelector.DEFAULT_HASH_ALGORITHM);
   }
 
   public static SetOpNode convertSetOp(SetOp node) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
index ea02ca6b7ef..a9e8bf0b4a9 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
@@ -48,11 +48,12 @@ public class ExchangeNode extends BasePlanNode {
   private final Set<String> _tableNames;
   @Nullable
   private final ExchangeStrategy _exchangeStrategy;
+  private final String _hashFunction;
 
   public ExchangeNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs, PinotRelExchangeType exchangeType,
       RelDistribution.Type distributionType, @Nullable List<Integer> keys, 
boolean prePartitioned,
       @Nullable List<RelFieldCollation> collations, boolean sortOnSender, 
boolean sortOnReceiver,
-      @Nullable Set<String> tableNames, ExchangeStrategy exchangeStrategy) {
+      @Nullable Set<String> tableNames, ExchangeStrategy exchangeStrategy, 
String hashFunction) {
     super(stageId, dataSchema, null, inputs);
     _exchangeType = exchangeType;
     _distributionType = distributionType;
@@ -63,6 +64,7 @@ public class ExchangeNode extends BasePlanNode {
     _sortOnReceiver = sortOnReceiver;
     _tableNames = tableNames;
     _exchangeStrategy = exchangeStrategy;
+    _hashFunction = hashFunction;
   }
 
   public PinotRelExchangeType getExchangeType() {
@@ -105,6 +107,10 @@ public class ExchangeNode extends BasePlanNode {
     return _exchangeStrategy;
   }
 
+  public String getHashFunction() {
+    return _hashFunction;
+  }
+
   @Override
   public String explain() {
     return "EXCHANGE";
@@ -118,7 +124,7 @@ public class ExchangeNode extends BasePlanNode {
   @Override
   public PlanNode withInputs(List<PlanNode> inputs) {
     return new ExchangeNode(_stageId, _dataSchema, inputs, _exchangeType, 
_distributionType, _keys, _prePartitioned,
-        _collations, _sortOnSender, _sortOnReceiver, _tableNames, null);
+        _collations, _sortOnSender, _sortOnReceiver, _tableNames, null, 
_hashFunction);
   }
 
   @Override
@@ -135,13 +141,14 @@ public class ExchangeNode extends BasePlanNode {
     ExchangeNode that = (ExchangeNode) o;
     return _sortOnSender == that._sortOnSender && _sortOnReceiver == 
that._sortOnReceiver
         && _prePartitioned == that._prePartitioned && _exchangeType == 
that._exchangeType
-        && _distributionType == that._distributionType && 
Objects.equals(_keys, that._keys) && Objects.equals(
-        _collations, that._collations) && Objects.equals(_tableNames, 
that._tableNames);
+        && _distributionType == that._distributionType && 
Objects.equals(_keys, that._keys)
+        && Objects.equals(_collations, that._collations) && 
Objects.equals(_tableNames, that._tableNames)
+        && Objects.equals(_hashFunction, that._hashFunction);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(super.hashCode(), _exchangeType, _distributionType, 
_keys, _sortOnSender, _sortOnReceiver,
-        _prePartitioned, _collations, _tableNames);
+        _prePartitioned, _collations, _tableNames, _hashFunction);
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
index c40fa50b000..ca481f3923a 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
 
 
 public class MailboxSendNode extends BasePlanNode {
@@ -38,12 +39,13 @@ public class MailboxSendNode extends BasePlanNode {
   private final boolean _prePartitioned;
   private final List<RelFieldCollation> _collations;
   private final boolean _sort;
+  private final String _hashFunction;
 
   // NOTE: null List is converted to empty List because there is no way to 
differentiate them in proto during ser/de.
   private MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
       BitSet receiverStages, PinotRelExchangeType exchangeType,
       RelDistribution.Type distributionType, @Nullable List<Integer> keys, 
boolean prePartitioned,
-      @Nullable List<RelFieldCollation> collations, boolean sort) {
+      @Nullable List<RelFieldCollation> collations, boolean sort, String 
hashFunction) {
     super(stageId, dataSchema, null, inputs);
     _receiverStages = receiverStages;
     _exchangeType = exchangeType;
@@ -52,14 +54,15 @@ public class MailboxSendNode extends BasePlanNode {
     _prePartitioned = prePartitioned;
     _collations = collations != null ? collations : List.of();
     _sort = sort;
+    _hashFunction = hashFunction;
   }
 
   public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
       @Nullable List<Integer> receiverStages, PinotRelExchangeType 
exchangeType,
       RelDistribution.Type distributionType, @Nullable List<Integer> keys, 
boolean prePartitioned,
-      @Nullable List<RelFieldCollation> collations, boolean sort) {
+      @Nullable List<RelFieldCollation> collations, boolean sort, String 
hashFunction) {
     this(stageId, dataSchema, inputs, toBitSet(receiverStages), exchangeType,
-        distributionType, keys, prePartitioned, collations, sort);
+        distributionType, keys, prePartitioned, collations, sort, 
hashFunction);
   }
 
   public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
@@ -67,7 +70,15 @@ public class MailboxSendNode extends BasePlanNode {
       RelDistribution.Type distributionType, @Nullable List<Integer> keys, 
boolean prePartitioned,
       @Nullable List<RelFieldCollation> collations, boolean sort) {
     this(stageId, dataSchema, inputs, toBitSet(receiverStage), exchangeType, 
distributionType, keys, prePartitioned,
-        collations, sort);
+        collations, sort, KeySelector.DEFAULT_HASH_ALGORITHM);
+  }
+
+  public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
+      int receiverStage, PinotRelExchangeType exchangeType,
+      RelDistribution.Type distributionType, @Nullable List<Integer> keys, 
boolean prePartitioned,
+      @Nullable List<RelFieldCollation> collations, boolean sort, String 
hashFunction) {
+    this(stageId, dataSchema, inputs, toBitSet(receiverStage), exchangeType, 
distributionType, keys, prePartitioned,
+        collations, sort, hashFunction);
   }
 
   private static BitSet toBitSet(int receiverStage) {
@@ -87,13 +98,6 @@ public class MailboxSendNode extends BasePlanNode {
     return bitSet;
   }
 
-  public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
-      PinotRelExchangeType exchangeType, RelDistribution.Type 
distributionType, @Nullable List<Integer> keys,
-      boolean prePartitioned, @Nullable List<RelFieldCollation> collations, 
boolean sort) {
-    this(stageId, dataSchema, inputs, new BitSet(), exchangeType, 
distributionType, keys, prePartitioned, collations,
-        sort);
-  }
-
   public boolean sharesReceiverStages(MailboxSendNode other) {
     return _receiverStages.intersects(other._receiverStages);
   }
@@ -167,6 +171,10 @@ public class MailboxSendNode extends BasePlanNode {
     return _sort;
   }
 
+  public String getHashFunction() {
+    return _hashFunction;
+  }
+
   @Override
   public String explain() {
     StringBuilder sb = new StringBuilder();
@@ -190,7 +198,7 @@ public class MailboxSendNode extends BasePlanNode {
   @Override
   public PlanNode withInputs(List<PlanNode> inputs) {
     return new MailboxSendNode(_stageId, _dataSchema, inputs, _receiverStages, 
_exchangeType, _distributionType, _keys,
-        _prePartitioned, _collations, _sort);
+        _prePartitioned, _collations, _sort, _hashFunction);
   }
 
   @Override
@@ -207,13 +215,14 @@ public class MailboxSendNode extends BasePlanNode {
     MailboxSendNode that = (MailboxSendNode) o;
     return Objects.equals(_receiverStages, that._receiverStages) && 
_prePartitioned == that._prePartitioned
         && _sort == that._sort && _exchangeType == that._exchangeType && 
_distributionType == that._distributionType
-        && Objects.equals(_keys, that._keys) && Objects.equals(_collations, 
that._collations);
+        && Objects.equals(_keys, that._keys) && Objects.equals(_collations, 
that._collations)
+        && Objects.equals(_hashFunction, that._hashFunction);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(super.hashCode(), _receiverStages, _exchangeType, 
_distributionType, _keys, _prePartitioned,
-        _collations, _sort);
+        _collations, _sort, _hashFunction);
   }
 
   @Override
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
index 9cf5cd80000..b9701323471 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.proto.Plan;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.ExplainedNode;
 import org.apache.pinot.query.planner.plannode.FilterNode;
@@ -129,12 +130,16 @@ public class PlanNodeDeserializer {
     } else {
       receiverIds = protoReceiverIds;
     }
+    String hashFunction = protoMailboxSendNode.getHashFunction();
+    if (hashFunction == null || hashFunction.isEmpty()) {
+      hashFunction = KeySelector.DEFAULT_HASH_ALGORITHM;
+    }
 
     return new MailboxSendNode(protoNode.getStageId(), 
extractDataSchema(protoNode), extractInputs(protoNode),
         receiverIds, 
convertExchangeType(protoMailboxSendNode.getExchangeType()),
         convertDistributionType(protoMailboxSendNode.getDistributionType()), 
protoMailboxSendNode.getKeysList(),
         protoMailboxSendNode.getPrePartitioned(), 
convertCollations(protoMailboxSendNode.getCollationsList()),
-        protoMailboxSendNode.getSort());
+        protoMailboxSendNode.getSort(), hashFunction);
   }
 
   private static ProjectNode deserializeProjectNode(Plan.PlanNode protoNode) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
index 359b3895eef..c23f3bfbd4d 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
@@ -161,6 +161,7 @@ public class PlanNodeSerializer {
           
.setDistributionType(convertDistributionType(node.getDistributionType()))
           .addAllKeys(node.getKeys())
           .setPrePartitioned(node.isPrePartitioned())
+          .setHashFunction(node.getHashFunction())
           .addAllCollations(convertCollations(node.getCollations()))
           .setSort(node.isSort())
           .build();
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
index 3735a829766..021e65fa765 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
@@ -380,8 +381,8 @@ public class StagesTestBase {
 
       PlanNode input = _childBuilder.build(_senderStageId);
       DataSchema mySchema = input.getDataSchema();
-      _sender = new MailboxSendNode(_senderStageId, mySchema, List.of(input), 
null,
-          null, null, false, null, false);
+      _sender = new MailboxSendNode(_senderStageId, mySchema, List.of(input), 
0, null,
+          null, null, false, null, false, KeySelector.DEFAULT_HASH_ALGORITHM);
     }
 
     /**
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelectorTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelectorTest.java
new file mode 100644
index 00000000000..d6d6a2209cf
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelectorTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.partitioning;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Test for {@link HashFunctionSelector}
+ */
+public class HashFunctionSelectorTest {
+
+  @Test
+  public void testAbsHashCode() {
+    String value = "test";
+    int hash1 = HashFunctionSelector.computeHash(value, "abshashcode");
+    int hash2 = HashFunctionSelector.computeHash(value, "abshashcode");
+
+    // Same input should produce same hash
+    Assert.assertEquals(hash1, hash2);
+
+    // Should be positive
+    Assert.assertTrue(hash1 >= 0);
+  }
+
+  @Test
+  public void testMurmur2() {
+    String value = "test";
+    int hash1 = HashFunctionSelector.computeHash(value, "murmur");
+    int hash2 = HashFunctionSelector.computeHash(value, "murmur");
+
+    // Same input should produce same hash
+    Assert.assertEquals(hash1, hash2);
+
+    // Should be positive
+    Assert.assertTrue(hash1 >= 0);
+
+    // Should be different from absHashCode
+    int absHash = HashFunctionSelector.computeHash(value, "abshashcode");
+    Assert.assertNotEquals(hash1, absHash);
+  }
+
+  @Test
+  public void testMurmur3() {
+    String value = "test";
+    int hash1 = HashFunctionSelector.computeHash(value, "murmur3");
+    int hash2 = HashFunctionSelector.computeHash(value, "murmur3");
+
+    // Same input should produce same hash
+    Assert.assertEquals(hash1, hash2);
+
+    // Should be positive
+    Assert.assertTrue(hash1 >= 0);
+
+    // Should be different from other hash functions
+    int absHash = HashFunctionSelector.computeHash(value, "abshashcode");
+    int murmur2Hash = HashFunctionSelector.computeHash(value, "murmur2");
+    Assert.assertNotEquals(hash1, absHash);
+    Assert.assertNotEquals(hash1, murmur2Hash);
+  }
+
+  @Test
+  public void testHashCode() {
+    String value = "test";
+    int hash1 = HashFunctionSelector.computeHash(value, "hashcode");
+    int hash2 = HashFunctionSelector.computeHash(value, "hashcode");
+
+    // Same input should produce same hash
+    Assert.assertEquals(hash1, hash2);
+
+    // Should be positive
+    Assert.assertTrue(hash1 >= 0);
+
+    // Should be different from murmur and murmur3 but same as absHashCode
+    int absHash = HashFunctionSelector.computeHash(value, "abshashcode");
+    int murmur2Hash = HashFunctionSelector.computeHash(value, "murmur");
+    int murmur3Hash = HashFunctionSelector.computeHash(value, "murmur3");
+    Assert.assertEquals(hash1, absHash);
+    Assert.assertNotEquals(hash1, murmur2Hash);
+    Assert.assertNotEquals(hash1, murmur3Hash);
+  }
+
+  @Test
+  public void testNullValue() {
+    // Null values should return 0 for all hash functions
+    Assert.assertEquals(HashFunctionSelector.computeHash(null, "abshashcode"), 
0);
+    Assert.assertEquals(HashFunctionSelector.computeHash(null, "murmur"), 0);
+    Assert.assertEquals(HashFunctionSelector.computeHash(null, "murmur3"), 0);
+    Assert.assertEquals(HashFunctionSelector.computeHash(null, "cityhash"), 0);
+  }
+
+  @Test
+  public void testUnknownHashFunction() {
+    String value = "test";
+    // Unknown hash function should default to absHashCode
+    int hash = HashFunctionSelector.computeHash(value, "unknown");
+    int expectedHash = HashFunctionSelector.computeHash(value, "abshashcode");
+    Assert.assertEquals(hash, expectedHash);
+  }
+
+  @Test
+  public void testCaseInsensitive() {
+    String value = "test";
+    int hash1 = HashFunctionSelector.computeHash(value, "MURMUR");
+    int hash2 = HashFunctionSelector.computeHash(value, "murmur");
+    Assert.assertEquals(hash1, hash2);
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/KeySelectorHashFunctionTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/KeySelectorHashFunctionTest.java
new file mode 100644
index 00000000000..83a1eed38ff
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/KeySelectorHashFunctionTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.partitioning;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Test for KeySelector implementations with custom hash functions
+ */
+public class KeySelectorHashFunctionTest {
+
+  @Test
+  public void testSingleColumnKeySelectorWithCustomHashFunction() {
+    SingleColumnKeySelector selector = new SingleColumnKeySelector(0, 
"murmur");
+
+    Object[] row = {"test"};
+    int hash = selector.computeHash(row);
+
+    // Should be positive
+    Assert.assertTrue(hash >= 0);
+
+    // Should use the specified hash function
+    Assert.assertEquals(selector.hashAlgorithm(), "murmur");
+
+    // Same input should produce same hash
+    int hash2 = selector.computeHash(row);
+    Assert.assertEquals(hash, hash2);
+  }
+
+  @Test
+  public void testMultiColumnKeySelectorWithCustomHashFunction() {
+    MultiColumnKeySelector selector = new MultiColumnKeySelector(new int[]{0, 
1}, "murmur3");
+
+    Object[] row = {"test1", "test2"};
+    int hash = selector.computeHash(row);
+
+    // Should be positive
+    Assert.assertTrue(hash >= 0);
+
+    // Should use the specified hash function
+    Assert.assertEquals(selector.hashAlgorithm(), "murmur3");
+
+    // Same input should produce same hash
+    int hash2 = selector.computeHash(row);
+    Assert.assertEquals(hash, hash2);
+  }
+
+  @Test
+  public void testEmptyKeySelectorWithCustomHashFunction() {
+    EmptyKeySelector selector = EmptyKeySelector.getInstance("hashcode");
+
+    Object[] row = {"test"};
+    int hash = selector.computeHash(row);
+
+    // Should always return 0
+    Assert.assertEquals(hash, 0);
+
+    // Should use the specified hash function
+    Assert.assertEquals(selector.hashAlgorithm(), "hashcode");
+  }
+
+  @Test
+  public void testKeySelectorFactoryWithCustomHashFunction() {
+    // Test single column
+    KeySelector<?> singleSelector = 
KeySelectorFactory.getKeySelector(java.util.List.of(0), "murmur");
+    Assert.assertEquals(singleSelector.hashAlgorithm(), "murmur");
+
+    // Test multi column
+    KeySelector<?> multiSelector = 
KeySelectorFactory.getKeySelector(java.util.List.of(0, 1), "murmur3");
+    Assert.assertEquals(multiSelector.hashAlgorithm(), "murmur3");
+
+    // Test empty
+    KeySelector<?> emptySelector = 
KeySelectorFactory.getKeySelector(java.util.List.of(), "hashcode");
+    Assert.assertEquals(emptySelector.hashAlgorithm(), "hashcode");
+  }
+
+  @Test
+  public void testKeySelectorFactoryWithDefaultHashFunction() {
+    // Test single column
+    KeySelector<?> singleSelector = 
KeySelectorFactory.getKeySelector(java.util.List.of(0));
+    Assert.assertEquals(singleSelector.hashAlgorithm(), 
KeySelector.DEFAULT_HASH_ALGORITHM);
+
+    // Test multi column
+    KeySelector<?> multiSelector = 
KeySelectorFactory.getKeySelector(java.util.List.of(0, 1));
+    Assert.assertEquals(multiSelector.hashAlgorithm(), 
KeySelector.DEFAULT_HASH_ALGORITHM);
+
+    // Test empty
+    KeySelector<?> emptySelector = 
KeySelectorFactory.getKeySelector(java.util.List.of());
+    Assert.assertEquals(emptySelector.hashAlgorithm(), 
KeySelector.DEFAULT_HASH_ALGORITHM);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 6c223e861f1..e2a1009d53f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -85,7 +85,7 @@ public class MailboxSendOperator extends MultiStageOperator {
    * Creates a {@link BlockExchange} for the given {@link MailboxSendNode}.
    *
    * In normal cases, where the sender sends data to a single receiver stage, 
this method just delegates on
-   * {@link #getBlockExchange(OpChainExecutionContext, int, 
RelDistribution.Type, List, StatMap, BlockSplitter)}.
+   * {@link #getBlockExchange(OpChainExecutionContext, int, MailboxSendNode, 
StatMap, BlockSplitter)}.
    *
    * In case of a multi-sender node, this method creates a two steps exchange:
    * <ol>
@@ -102,20 +102,20 @@ public class MailboxSendOperator extends 
MultiStageOperator {
     if (!node.isMultiSend()) {
       // it is guaranteed that there is exactly one receiver stage
       int receiverStageId = node.getReceiverStageIds().iterator().next();
-      return getBlockExchange(ctx, receiverStageId, 
node.getDistributionType(), node.getKeys(), statMap, mainSplitter);
+      return getBlockExchange(ctx, receiverStageId, node, statMap, 
mainSplitter);
     }
     List<SendingMailbox> perStageSendingMailboxes = new ArrayList<>();
     // The inner splitter is a NO_OP because the outer splitter will take care 
of splitting the blocks
     BlockSplitter innerSplitter = BlockSplitter.NO_OP;
     for (int receiverStageId : node.getReceiverStageIds()) {
       BlockExchange blockExchange =
-          getBlockExchange(ctx, receiverStageId, node.getDistributionType(), 
node.getKeys(), statMap, innerSplitter);
+          getBlockExchange(ctx, receiverStageId, node, statMap, innerSplitter);
       
perStageSendingMailboxes.add(blockExchange.asSendingMailbox(Integer.toString(receiverStageId)));
     }
 
     Function<List<SendingMailbox>, Integer> statsIndexChooser = 
getStatsIndexChooser(ctx, node);
     return BlockExchange.getExchange(perStageSendingMailboxes, 
RelDistribution.Type.BROADCAST_DISTRIBUTED,
-        Collections.emptyList(), mainSplitter, statsIndexChooser);
+        Collections.emptyList(), mainSplitter, statsIndexChooser, 
node.getHashFunction());
   }
 
   private static Function<List<SendingMailbox>, Integer> 
getStatsIndexChooser(OpChainExecutionContext ctx,
@@ -154,7 +154,8 @@ public class MailboxSendOperator extends MultiStageOperator 
{
    * In case of a multi-sender node, this method will be called for each 
receiver stage.
    */
   private static BlockExchange getBlockExchange(OpChainExecutionContext 
context, int receiverStageId,
-      RelDistribution.Type distributionType, List<Integer> keys, 
StatMap<StatKey> statMap, BlockSplitter splitter) {
+      MailboxSendNode node, StatMap<StatKey> statMap, BlockSplitter splitter) {
+    RelDistribution.Type distributionType = node.getDistributionType();
     
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(distributionType), 
"Unsupported distribution type: %s",
         distributionType);
     MailboxService mailboxService = context.getMailboxService();
@@ -172,7 +173,8 @@ public class MailboxSendOperator extends MultiStageOperator 
{
         .map(v -> mailboxService.getSendingMailbox(v.getHostname(), 
v.getPort(), v.getMailboxId(), deadlineMs, statMap))
         .collect(Collectors.toList());
     statMap.merge(StatKey.FAN_OUT, sendingMailboxes.size());
-    return BlockExchange.getExchange(sendingMailboxes, distributionType, keys, 
splitter);
+    return BlockExchange.getExchange(sendingMailboxes, distributionType, 
node.getKeys(), splitter,
+        node.getHashFunction());
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index db30427525c..2518a31b761 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -63,13 +63,15 @@ public abstract class BlockExchange {
    * mailbox index that receives the stats should be tuned.
    * @param statsIndexChooser a function to choose the mailbox index to send 
stats to.
    */
-  public static BlockExchange getExchange(List<SendingMailbox> 
sendingMailboxes, RelDistribution.Type distributionType,
-      List<Integer> keys, BlockSplitter splitter, 
Function<List<SendingMailbox>, Integer> statsIndexChooser) {
+  public static BlockExchange getExchange(List<SendingMailbox> 
sendingMailboxes,
+      RelDistribution.Type distributionType, List<Integer> keys, BlockSplitter 
splitter,
+      Function<List<SendingMailbox>, Integer> statsIndexChooser, String 
hashFunction) {
     switch (distributionType) {
       case SINGLETON:
         return new SingletonExchange(sendingMailboxes, splitter, 
statsIndexChooser);
       case HASH_DISTRIBUTED:
-        return new HashExchange(sendingMailboxes, 
KeySelectorFactory.getKeySelector(keys), splitter, statsIndexChooser);
+        return new HashExchange(sendingMailboxes, 
KeySelectorFactory.getKeySelector(keys, hashFunction), splitter,
+            statsIndexChooser);
       case RANDOM_DISTRIBUTED:
         return new RandomExchange(sendingMailboxes, splitter, 
statsIndexChooser);
       case BROADCAST_DISTRIBUTED:
@@ -83,8 +85,8 @@ public abstract class BlockExchange {
   }
 
   public static BlockExchange getExchange(List<SendingMailbox> 
sendingMailboxes, RelDistribution.Type distributionType,
-      List<Integer> keys, BlockSplitter splitter) {
-    return getExchange(sendingMailboxes, distributionType, keys, splitter, 
RANDOM_INDEX_CHOOSER);
+      List<Integer> keys, BlockSplitter splitter, String hashFunction) {
+    return getExchange(sendingMailboxes, distributionType, keys, splitter, 
RANDOM_INDEX_CHOOSER, hashFunction);
   }
 
   protected BlockExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter 
splitter,
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 290cf20c231..d6de4b9ba22 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -533,6 +533,10 @@ public class CommonConstants {
         "pinot.broker.multistage.lite.mode.leaf.stage.limit";
     public static final int DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT = 100_000;
 
+    // Config for default hash function used in KeySelector for data shuffling
+    public static final String CONFIG_OF_BROKER_DEFAULT_HASH_FUNCTION = 
"pinot.broker.multistage.default.hash.function";
+    public static final String DEFAULT_BROKER_DEFAULT_HASH_FUNCTION = 
"absHashCode";
+
     // When the server instance's pool field is null or the pool contains 
multi distinguished group value, the broker
     // would set the pool to -1 in the routing table for that server.
     public static final int FALLBACK_POOL_ID = -1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to