This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a3d7b2ac214 Dynamic PartitionFunction registry; unify int-to-id 
mapping via PartitionIdNormalizer (#18446)
a3d7b2ac214 is described below

commit a3d7b2ac214331761b1af3735af09bc4393816e1
Author: Xiang Fu <[email protected]>
AuthorDate: Fri May 8 19:22:37 2026 -0700

    Dynamic PartitionFunction registry; unify int-to-id mapping via 
PartitionIdNormalizer (#18446)
    
    * Make PartitionFunction registry dynamic; move impls to pinot-common
    
    Replaces the closed `PartitionFunctionFactory` enum/switch with an
    annotation-based registry so plug-in partition functions no longer
    require touching segment-spi code.
    
    - Add `@PartitionFunctionType(names = {...})` annotation in pinot-spi.
    - Convert `PartitionFunctionFactory` to a classpath-scanning registry
      (regex `.*\.partition\.function\..*`); the factory keeps the same
      static API, callers are unchanged. Add `init()` mirroring
      `FunctionRegistry.init()` and wire it from broker / server /
      controller starters.
    - Move the seven built-in impls (`Modulo`, `Murmur` / `Murmur2`,
      `Murmur3`, `Fnv`, `HashCode`, `ByteArray`, `BoundedColumnValue`)
      out of `pinot-segment-spi` into
      `pinot-common/.../partition/function/`, standardize their
      constructor on `(int numPartitions, Map<String,String> functionConfig)`,
      and annotate each.
    - Add `PartitionIntNormalizer` enum (`POSITIVE_MODULO` / `ABS` / `MASK`)
      in pinot-segment-spi and a default `getPartitionIdNormalizer()` on
      `PartitionFunction` so impls can declare which normalizer matches
      their internal modulo semantics. Used by the framework only for
      identity / staleness matching between config-side and segment-side
      metadata; legacy impls still compute their own modulo. Javadoc spells
      out the descriptive-only nature of the value for legacy functions.
    - Tests: existing `PartitionFunctionTest` (19 cases) moved to
      pinot-common; new `PartitionFunctionFactoryTest` covers
      registration completeness, alias resolution (`Murmur` /
      `Murmur2`), case-insensitive lookup, idempotent `init()`,
      unknown-name rejection, and per-impl normalizer label;
      new `PartitionIntNormalizerTest` covers per-normalizer math at
      edge cases (`Integer.MIN_VALUE`, `Integer.MAX_VALUE`),
      range invariant across all normalizers, and `fromConfigString`
      round-trip / blank / unknown.
    
    Plug-in path going forward: drop a class on the classpath under
    `*.partition.function.*`, implement `PartitionFunction` with the
    standard ctor, add `@PartitionFunctionType(names = "MyFn")` - the
    registry picks it up at startup.
    
    NOTE - backward-incompat: the seven impl classes' fully-qualified
    names changed (`org.apache.pinot.segment.spi.partition.*` ->
    `org.apache.pinot.common.partition.function.*`) and the no-Map
    constructors on `Modulo` / `HashCode` / `ByteArray` are gone in favor
    of the standard `(int, Map<String,String>)` form. `getName()` strings
    (`Modulo`, `Murmur`, ...) are unchanged, so segment-on-disk metadata
    is unaffected.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Drop FnvPartitionFunction inner enum; use PartitionIntNormalizer directly
    
    Removes the duplicated `NegativePartitionHandling` inner enum in
    `FnvPartitionFunction` and the `_negativePartitionHandling` field in
    favor of a single `_normalizer` of type `PartitionIntNormalizer`. The
    config key (`negativePartitionHandling`) and accepted values stay
    unchanged for `mask` / `abs`; `positive_modulo` is now also
    acceptable (strict superset). The error message for unknown values
    now comes from `PartitionIntNormalizer.fromConfigString`.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Make getPartitionIdNormalizer() non-null on PartitionFunction
    
    The interface default now returns `PartitionIntNormalizer.POSITIVE_MODULO`
    instead of null, and `@Nullable` is dropped from the method. Plug-ins
    that don't map onto a standard normalizer (e.g. `BoundedColumnValue`,
    which already produces ids in `[0, N)`) inherit the safe default
    without needing an explicit override.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Make getPartitionIdNormalizer() abstract; each impl declares its own
    
    Removes the interface default so every PartitionFunction must explicitly
    pick a normalizer. BoundedColumnValuePartitionFunction now overrides
    with POSITIVE_MODULO (no-op label, since its output is already a fixed
    mapping in [0, N)).
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Drive each partition function's int-to-id mapping via _normalizer field
    
    Every built-in PartitionFunction now stores a `_normalizer` of type
    PartitionIntNormalizer and applies it in `getPartition(...)` rather
    than open-coding the modulo logic. Defaults match prior behavior:
    
    | Impl              | Default normalizer |
    |-------------------|--------------------|
    | Modulo            | POSITIVE_MODULO    |
    | Murmur / Murmur2  | MASK               |
    | Murmur3           | MASK               |
    | Fnv               | MASK               |
    | HashCode          | KAFKA_ABS (new)    |
    | ByteArray         | KAFKA_ABS (new)    |
    | BoundedColumnValue| POSITIVE_MODULO    |
    
    Adds `KAFKA_ABS` to PartitionIntNormalizer to cover the Kafka-style
    `abs(hash) % N` (with `Integer.MIN_VALUE -> 0`) used by HashCode and
    ByteArray. With KAFKA_ABS in place the normalizer is now an
    authoritative driver of the partition-id computation, not just a
    descriptive label - the interface Javadoc is updated accordingly.
    
    Unifies the per-impl override path under a single config key
    `partitionIdNormalizer` (case-insensitive), parsed via shared
    `PartitionFunctionConfigs#normalizer`. Drops the FNV-specific
    `negativePartitionHandling` key.
    
    Tests:
    - New `KAFKA_ABS` cases in PartitionIntNormalizerTest covering the
      MIN_VALUE corner.
    - New `testPartitionIdNormalizerConfigOverridesDefaultAcrossImpls`
      in PartitionFunctionFactoryTest verifying the config rewires the
      computed partition for HashCode, Modulo, and ByteArray.
    - FNV tests updated to use `partitionIdNormalizer` config key.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Convert new Javadoc to JEP 467 markdown style
    
    Switches all new Javadoc blocks added by this PR from `/** */` block
    syntax to `///` markdown syntax (JEP 467), matching the convention
    established in #18165 and the rest of the recently-touched
    pinot-segment-spi files. Replaces `<p>`, `<ul>/<li>`, `<code>`,
    `{@code X}`, `{@link X}` HTML/Javadoc tags with their markdown
    equivalents (paragraph breaks, `-` lists, backticks, `[X]` refs).
    License headers remain as `/** */` block comments.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Address PR review feedback
    
    - Rename `PartitionIntNormalizer` -> `PartitionIdNormalizer` (Jackie #3).
    - Rename `KAFKA_ABS` -> `PRE_MODULO_ABS` to make the pre-vs-post-modulo
      distinction with `ABS` explicit (Jackie #4).
    - Change `getPartitionIdNormalizer()` to return `PartitionIdNormalizer`
      enum directly instead of `String`; drop `@JsonIgnore` so the field is
      visible in serialized form (Jackie #2). Method stays abstract -- every
      PartitionFunction implementation declares its own normalizer.
    - Drop the redundant `_normalizer` field from
      `BoundedColumnValuePartitionFunction`; it now returns
      `PartitionIdNormalizer.POSITIVE_MODULO` directly since its output is
      already a fixed mapping in `[0, numPartitions)` (Jackie #6).
    - Make `@PartitionFunctionType` annotation optional. The factory now
      scans every public, concrete `PartitionFunction` subtype under the
      `org.apache.pinot.*` package tree. When a class lacks the annotation
      (or `names()` is empty), the registry probes
      `PartitionFunction.getName()` by instantiating with `(1, null)` and
      registers under the returned name (Jackie #5). Annotation is now a
      pure aliasing / overriding mechanism.
    - Fix Javadoc on `@PartitionFunctionType` and `PartitionFunctionFactory`
      to drop the incorrect "any plugin package" claim and the unimplemented
      "stripping underscores" note (Copilot #1, #2, #4).
    - Add `UnannotatedTestPartitionFunction` fixture + factory test that
      exercises the no-annotation `getName()` fallback path.
    - Update `PartitionFunctionTest` to expect 4 JSON fields (the new
      `partitionIdNormalizer` field is no longer hidden).
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Trim and reject blank @PartitionFunctionType names; fall back to getName()
    
    Validates each entry in `@PartitionFunctionType.names()` at registry
    build time:
    
    - Each entry is trimmed of surrounding whitespace.
    - Blank entries are dropped silently.
    - When ALL declared entries are blank (or the array is empty after
      filtering), the registry logs a warning and falls back to probing
      `PartitionFunction.getName()`, the same path used for unannotated
      classes.
    
    This prevents a misconfigured annotation (e.g. `names = {"  "}`) from
    silently registering a function under an empty canonical name, making
    it undiscoverable at lookup time.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Drop @PartitionFunctionType annotation; drive registry from getNames() 
default
    
    The annotation added no value beyond what `PartitionFunction.getName()`
    already returned for every built-in. Remove it entirely and let the
    interface itself declare the registry contract:
    
    - New default `List<String> getNames()` on `PartitionFunction` returns
      `[getName()]`. The factory's static scan instantiates each subtype
      with `(1, null)` and registers under whatever `getNames()` returns.
    - Only `MurmurPartitionFunction` overrides `getNames()` (returns
      `["Murmur", "Murmur2"]` so both aliases resolve to the same impl);
      the other six built-ins use the default.
    - `getNames()` is `@JsonIgnore`'d so it doesn't pollute the
      serialized form (`testBasicProperties` JSON-shape assertion stays
      at four fields).
    - `BoundedColumnValuePartitionFunction`'s ctor now tolerates `null`
      config (probe path) and defers validation; real-config use still
      throws as before. `getPartition()` rejects probe-built instances.
    - `@PartitionFunctionType` annotation file deleted.
    - `UnannotatedTestPartitionFunction` fixture removed (no longer
      meaningful now that every class is "unannotated").
    - Added two new factory tests covering the default `getNames()`
      behavior and Murmur's override.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Add NO_OP PartitionIdNormalizer for already-in-range outputs
    
    `BoundedColumnValuePartitionFunction` produces a fixed mapping in
    `[0, numPartitions)` by construction; the previous label was
    `POSITIVE_MODULO` (a no-op for in-range values, but semantically
    overloaded). Add an explicit `NO_OP` value that is the identity on
    its inputs and use it for `BoundedColumnValue`. The framework does
    not validate that callers actually pass in-range values to NO_OP —
    out-of-range inputs yield out-of-range partition ids, by design.
    
    `PartitionIdNormalizerTest#testRangeFoldingNormalizersReturnInRange`
    (was `testAllNormalizersReturnInRange`) now skips NO_OP since the
    range invariant doesn't apply. New `testNoOpIsIdentity` locks the
    identity-with-narrowing semantics including the explicit
    out-of-range pass-through.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../broker/broker/helix/BaseBrokerStarter.java     |   4 +-
 .../BoundedColumnValuePartitionFunction.java       |  68 +++++---
 .../function}/ByteArrayPartitionFunction.java      |  37 +++--
 .../partition/function}/FnvPartitionFunction.java  |  80 ++-------
 .../function}/HashCodePartitionFunction.java       |  35 ++--
 .../function}/ModuloPartitionFunction.java         |  45 +++--
 .../function}/Murmur3PartitionFunction.java        |  35 ++--
 .../function}/MurmurPartitionFunction.java         |  41 +++--
 .../function/PartitionFunctionConfigs.java         |  49 ++++++
 .../function/PartitionFunctionFactoryTest.java     | 169 +++++++++++++++++++
 .../partition/function}/PartitionFunctionTest.java |  21 ++-
 .../pinot/controller/BaseControllerStarter.java    |   7 +-
 .../controller/utils/SegmentMetadataMockUtils.java |   4 +-
 ...gmentImplDropRecordOnPartitionMismatchTest.java |  12 +-
 .../local/segment/index/ColumnMetadataTest.java    |   2 +-
 .../index/creator/SegmentPartitionTest.java        |   2 +-
 .../segment/spi/partition/PartitionFunction.java   |  60 ++++---
 .../spi/partition/PartitionFunctionFactory.java    | 185 ++++++++++++++-------
 .../spi/partition/PartitionIdNormalizer.java       | 128 ++++++++++++++
 .../spi/partition/PartitionIdNormalizerTest.java   | 142 ++++++++++++++++
 .../pinot/server/starter/ServerInstance.java       |   4 +-
 21 files changed, 845 insertions(+), 285 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 6d8acad99a3..c53501a6a8b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -101,6 +101,7 @@ import 
org.apache.pinot.core.util.trace.ContinuousJfrStarter;
 import org.apache.pinot.query.routing.WorkerManager;
 import 
org.apache.pinot.query.runtime.operator.factory.DefaultQueryOperatorFactoryProvider;
 import 
org.apache.pinot.query.runtime.operator.factory.QueryOperatorFactoryProvider;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.spi.accounting.ThreadAccountant;
 import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
@@ -427,8 +428,9 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     
QueryRewriterFactory.init(_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_QUERY_REWRITER_CLASS_NAMES));
     LOGGER.info("Initializing ResultRewriterFactory");
     
ResultRewriterFactory.init(_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_RESULT_REWRITER_CLASS_NAMES));
-    // Initialize FunctionRegistry before starting the broker request handler
+    // Initialize FunctionRegistry and PartitionFunctionFactory before 
starting the broker request handler
     FunctionRegistry.init();
+    PartitionFunctionFactory.init();
     boolean caseInsensitive =
         _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, 
Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
     _tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/BoundedColumnValuePartitionFunction.java
similarity index 59%
rename from 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/partition/function/BoundedColumnValuePartitionFunction.java
index a72beb34c82..aae532663a5 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/BoundedColumnValuePartitionFunction.java
@@ -16,29 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
 
 import com.google.common.base.Preconditions;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
 
 
-/**
- * Implementation of {@link PartitionFunction} which partitions based 
configured column values.
- *
- * "columnPartitionMap": {
- *   "subject": {
- *     "functionName": "BoundedColumnValue",
- *     "functionConfig": {
- *       "columnValues": "Maths|English|Chemistry",
- *       "columnValuesDelimiter": "|"
- *     },
- *     "numPartitions": 4
- *   }
- * }
- * With this partition config on column "subject", partitionId would be 1 for 
Maths, 2 for English and 3 for Chemistry.
- * partitionId would be "0" for all other values which may occur, therefore 
'numPartitions' is set to 4.
- */
+/// Implementation of [PartitionFunction] which partitions based on configured 
column values.
+///
+/// ```json
+/// "columnPartitionMap": {
+///   "subject": {
+///     "functionName": "BoundedColumnValue",
+///     "functionConfig": {
+///       "columnValues": "Maths|English|Chemistry",
+///       "columnValuesDelimiter": "|"
+///     },
+///     "numPartitions": 4
+///   }
+/// }
+/// ```
+///
+/// With this partition config on column "subject", `partitionId` is `1` for 
Maths, `2` for English and `3` for
+/// Chemistry. `partitionId` is `0` for all other values which may occur, 
therefore `numPartitions` is set to `4`.
 public class BoundedColumnValuePartitionFunction implements PartitionFunction {
   private static final int DEFAULT_PARTITION_ID = 0;
   private static final String NAME = "BoundedColumnValue";
@@ -48,9 +52,17 @@ public class BoundedColumnValuePartitionFunction implements 
PartitionFunction {
   private final Map<String, String> _functionConfig;
   private final String[] _values;
 
-  public BoundedColumnValuePartitionFunction(int numPartitions, Map<String, 
String> functionConfig) {
-    Preconditions.checkArgument(functionConfig != null && 
functionConfig.size() > 0,
-        "'functionConfig' should be present, specified", functionConfig);
+  public BoundedColumnValuePartitionFunction(int numPartitions, @Nullable 
Map<String, String> functionConfig) {
+    _numPartitions = numPartitions;
+    if (functionConfig == null) {
+      // Probe-only path used by PartitionFunctionFactory startup scan; real 
use supplies a
+      // populated config and reaches the validation below. getPartition() 
will throw on a
+      // probe-built instance, which is fine because the registry never calls 
it.
+      _functionConfig = null;
+      _values = null;
+      return;
+    }
+    Preconditions.checkArgument(functionConfig.size() > 0, "'functionConfig' 
must not be empty");
     Preconditions.checkState(functionConfig.get(COLUMN_VALUES) != null, 
"columnValues must be configured");
     Preconditions.checkState(functionConfig.get(COLUMN_VALUES_DELIMITER) != 
null,
         "'columnValuesDelimiter' must be configured");
@@ -58,11 +70,11 @@ public class BoundedColumnValuePartitionFunction implements 
PartitionFunction {
     _values = StringUtils.split(functionConfig.get(COLUMN_VALUES), 
functionConfig.get(COLUMN_VALUES_DELIMITER));
     Preconditions.checkState(numPartitions == _values.length + 1,
         "'numPartitions' must just be one greater than number of column values 
configured");
-    _numPartitions = numPartitions;
   }
 
   @Override
   public int getPartition(String value) {
+    Preconditions.checkState(_values != null, 
"BoundedColumnValuePartitionFunction is not configured");
     for (int i = 0; i < _numPartitions - 1; i++) {
       if (_values[i].equalsIgnoreCase(value)) {
         return i + 1;
@@ -76,11 +88,9 @@ public class BoundedColumnValuePartitionFunction implements 
PartitionFunction {
     return NAME;
   }
 
-  /**
-   * Returns number of partitions configured for the column.
-   *
-   * @return Total number of partitions for the function.
-   */
+  /// Returns number of partitions configured for the column.
+  ///
+  /// @return Total number of partitions for the function.
   @Override
   public int getNumPartitions() {
     return _numPartitions;
@@ -91,6 +101,12 @@ public class BoundedColumnValuePartitionFunction implements 
PartitionFunction {
     return _functionConfig;
   }
 
+  @Override
+  public PartitionIdNormalizer getPartitionIdNormalizer() {
+    // Output is a fixed mapping in [0, numPartitions); no normalization is 
applied.
+    return PartitionIdNormalizer.NO_OP;
+  }
+
   @Override
   public String toString() {
     return getName();
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/ByteArrayPartitionFunction.java
similarity index 57%
rename from 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/partition/function/ByteArrayPartitionFunction.java
index e8ff8edc245..90b1d030d86 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/ByteArrayPartitionFunction.java
@@ -16,34 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
 
 import com.google.common.base.Preconditions;
 import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 
-/**
- * Implementation of {@link Byte array partitioner}
- *
- */
+/// [PartitionFunction] that hashes the input via [Arrays#hashCode(byte\[\])] 
of the value
+/// bytes and runs the configured [PartitionIdNormalizer] (default
+/// [PartitionIdNormalizer#PRE_MODULO_ABS], the Pre-modulo abs (Kafka-style) 
`abs(hash) % N` that maps
+/// `Integer.MIN_VALUE -> 0`) to derive the partition id.
 public class ByteArrayPartitionFunction implements PartitionFunction {
   private static final String NAME = "ByteArray";
+  private static final PartitionIdNormalizer DEFAULT_NORMALIZER = 
PartitionIdNormalizer.PRE_MODULO_ABS;
   private final int _numPartitions;
+  private final PartitionIdNormalizer _normalizer;
 
-  /**
-   * Constructor for the class.
-   * @param numPartitions Number of partitions
-   */
-  public ByteArrayPartitionFunction(int numPartitions) {
-    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0, specified", numPartitions);
+  public ByteArrayPartitionFunction(int numPartitions, @Nullable Map<String, 
String> functionConfig) {
+    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0, was: %s", numPartitions);
     _numPartitions = numPartitions;
+    _normalizer = PartitionFunctionConfigs.normalizer(functionConfig, 
DEFAULT_NORMALIZER);
   }
 
   @Override
   public int getPartition(String value) {
-    return abs(Arrays.hashCode(value.getBytes(UTF_8))) % _numPartitions;
+    return _normalizer.getPartitionId(Arrays.hashCode(value.getBytes(UTF_8)), 
_numPartitions);
   }
 
   @Override
@@ -56,14 +59,14 @@ public class ByteArrayPartitionFunction implements 
PartitionFunction {
     return _numPartitions;
   }
 
+  @Override
+  public PartitionIdNormalizer getPartitionIdNormalizer() {
+    return _normalizer;
+  }
+
   // Keep it for backward-compatibility, use getName() instead
   @Override
   public String toString() {
     return NAME;
   }
-
-  // NOTE: This matches the Utils.abs() in Kafka
-  private static int abs(int n) {
-    return (n == Integer.MIN_VALUE) ? 0 : Math.abs(n);
-  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/FnvPartitionFunction.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/FnvPartitionFunction.java
similarity index 57%
rename from 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/FnvPartitionFunction.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/partition/function/FnvPartitionFunction.java
index 1f96a2ba645..83fe5a8d5c0 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/FnvPartitionFunction.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/FnvPartitionFunction.java
@@ -16,76 +16,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
 
 import com.google.common.base.Preconditions;
 import java.util.Collections;
-import java.util.Locale;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.hash.FnvHashFunctions;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 
-/**
- * Stateless and thread-safe {@link PartitionFunction} backed by configurable 
FNV variants.
- */
+/// Stateless and thread-safe [PartitionFunction] backed by configurable FNV 
variants. The
+/// configured [PartitionIdNormalizer] (default [PartitionIdNormalizer#MASK]) 
is applied
+/// to the raw FNV hash to derive the partition id.
 public class FnvPartitionFunction implements PartitionFunction {
   private static final String NAME = "FNV";
   private static final String VARIANT_KEY = "variant";
   private static final String USE_RAW_BYTES_KEY = "useRawBytes";
-  private static final String NEGATIVE_PARTITION_HANDLING_KEY = 
"negativePartitionHandling";
   private static final FnvHashFunctions.Variant DEFAULT_VARIANT = 
FnvHashFunctions.Variant.FNV1A_32;
-  private static final NegativePartitionHandling 
DEFAULT_NEGATIVE_PARTITION_HANDLING =
-      NegativePartitionHandling.MASK;
+  private static final PartitionIdNormalizer DEFAULT_NORMALIZER = 
PartitionIdNormalizer.MASK;
 
   private final int _numPartitions;
   @Nullable
   private final Map<String, String> _functionConfig;
   private final FnvHashFunctions.Variant _variant;
   private final boolean _useRawBytes;
-  private final NegativePartitionHandling _negativePartitionHandling;
-
-  private enum NegativePartitionHandling {
-    MASK,
-    ABS;
-
-    private static final String ALLOWED_HANDLINGS = "mask or abs";
-
-    public static NegativePartitionHandling fromString(String value) {
-      if (value == null) {
-        throw invalidHandlingException(null);
-      }
-      try {
-        return valueOf(value.trim().toUpperCase(Locale.ROOT));
-      } catch (IllegalArgumentException e) {
-        throw invalidHandlingException(value);
-      }
-    }
-
-    public int getPartition(int hash, int numPartitions) {
-      if (this == MASK) {
-        return (hash & Integer.MAX_VALUE) % numPartitions;
-      }
-      int partition = hash % numPartitions;
-      return partition < 0 ? -partition : partition;
-    }
+  private final PartitionIdNormalizer _normalizer;
 
-    public int getPartition(long hash, int numPartitions) {
-      if (this == MASK) {
-        return (int) ((hash & Long.MAX_VALUE) % numPartitions);
-      }
-      long partition = hash % numPartitions;
-      return (int) (partition < 0 ? -partition : partition);
-    }
-  }
-
-  /**
-   * Builds a new FNV partition function from the provided configuration.
-   */
   public FnvPartitionFunction(int numPartitions, @Nullable Map<String, String> 
functionConfig) {
     Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0");
     _numPartitions = numPartitions;
@@ -93,21 +55,16 @@ public class FnvPartitionFunction implements 
PartitionFunction {
 
     FnvHashFunctions.Variant variant = DEFAULT_VARIANT;
     boolean useRawBytes = false;
-    NegativePartitionHandling negativePartitionHandling = 
DEFAULT_NEGATIVE_PARTITION_HANDLING;
     if (functionConfig != null) {
       String variantString = functionConfig.get(VARIANT_KEY);
       if (StringUtils.isNotBlank(variantString)) {
         variant = FnvHashFunctions.Variant.fromString(variantString);
       }
       useRawBytes = 
Boolean.parseBoolean(functionConfig.get(USE_RAW_BYTES_KEY));
-      String negativePartitionHandlingString = 
functionConfig.get(NEGATIVE_PARTITION_HANDLING_KEY);
-      if (StringUtils.isNotBlank(negativePartitionHandlingString)) {
-        negativePartitionHandling = 
NegativePartitionHandling.fromString(negativePartitionHandlingString);
-      }
     }
     _variant = variant;
     _useRawBytes = useRawBytes;
-    _negativePartitionHandling = negativePartitionHandling;
+    _normalizer = PartitionFunctionConfigs.normalizer(functionConfig, 
DEFAULT_NORMALIZER);
   }
 
   @Override
@@ -116,12 +73,12 @@ public class FnvPartitionFunction implements 
PartitionFunction {
     if (_variant.is64Bit()) {
       long hash = _variant == FnvHashFunctions.Variant.FNV1_64 ? 
FnvHashFunctions.fnv1Hash64(bytes)
           : FnvHashFunctions.fnv1aHash64(bytes);
-      return _negativePartitionHandling.getPartition(hash, _numPartitions);
+      return _normalizer.getPartitionId(hash, _numPartitions);
     }
 
     int hash = _variant == FnvHashFunctions.Variant.FNV1_32 ? 
FnvHashFunctions.fnv1Hash32(bytes)
         : FnvHashFunctions.fnv1aHash32(bytes);
-    return _negativePartitionHandling.getPartition(hash, _numPartitions);
+    return _normalizer.getPartitionId(hash, _numPartitions);
   }
 
   @Override
@@ -140,19 +97,14 @@ public class FnvPartitionFunction implements 
PartitionFunction {
     return _functionConfig;
   }
 
+  @Override
+  public PartitionIdNormalizer getPartitionIdNormalizer() {
+    return _normalizer;
+  }
+
   // Keep it for backward-compatibility, use getName() instead
   @Override
   public String toString() {
     return NAME;
   }
-
-  private static IllegalArgumentException invalidHandlingException(@Nullable 
String value) {
-    return new IllegalArgumentException(
-        "FNV negative partition handling must be " + 
NegativePartitionHandling.ALLOWED_HANDLINGS + ", but was: "
-            + formatValue(value));
-  }
-
-  private static String formatValue(@Nullable String value) {
-    return value == null ? "null" : "'" + value + "'";
-  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/HashCodePartitionFunction.java
similarity index 56%
rename from 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/partition/function/HashCodePartitionFunction.java
index 182760cf44c..a6f5c5867d7 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/HashCodePartitionFunction.java
@@ -16,29 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
 
 import com.google.common.base.Preconditions;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
 
 
-/**
- * Hash code partition function, where:
- * <ul>
- *   <li> partitionId = value.hashCode() % {@link #_numPartitions}</li>
- * </ul>
- */
+/// [PartitionFunction] that hashes the input via [String#hashCode()] and runs 
the
+/// configured [PartitionIdNormalizer] (default 
[PartitionIdNormalizer#PRE_MODULO_ABS], the
+/// Pre-modulo abs (Kafka-style) `abs(hash) % N` that maps `Integer.MIN_VALUE 
-> 0`) to derive the
+/// partition id.
 public class HashCodePartitionFunction implements PartitionFunction {
   private static final String NAME = "HashCode";
+  private static final PartitionIdNormalizer DEFAULT_NORMALIZER = 
PartitionIdNormalizer.PRE_MODULO_ABS;
   private final int _numPartitions;
+  private final PartitionIdNormalizer _normalizer;
 
-  public HashCodePartitionFunction(int numPartitions) {
-    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0, specified", numPartitions);
+  public HashCodePartitionFunction(int numPartitions, @Nullable Map<String, 
String> functionConfig) {
+    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0, was: %s", numPartitions);
     _numPartitions = numPartitions;
+    _normalizer = PartitionFunctionConfigs.normalizer(functionConfig, 
DEFAULT_NORMALIZER);
   }
 
   @Override
   public int getPartition(String value) {
-    return abs(value.hashCode()) % _numPartitions;
+    return _normalizer.getPartitionId(value.hashCode(), _numPartitions);
   }
 
   @Override
@@ -51,14 +56,14 @@ public class HashCodePartitionFunction implements 
PartitionFunction {
     return _numPartitions;
   }
 
+  @Override
+  public PartitionIdNormalizer getPartitionIdNormalizer() {
+    return _normalizer;
+  }
+
   // Keep it for backward-compatibility, use getName() instead
   @Override
   public String toString() {
     return NAME;
   }
-
-  // NOTE: This matches the Utils.abs() in Kafka
-  private static int abs(int n) {
-    return (n == Integer.MIN_VALUE) ? 0 : Math.abs(n);
-  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/ModuloPartitionFunction.java
similarity index 58%
rename from 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/partition/function/ModuloPartitionFunction.java
index a4b8eb49abc..8d34d855a75 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/ModuloPartitionFunction.java
@@ -16,41 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
 
 import com.google.common.base.Preconditions;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
 
 
-/**
- * Modulo operation based partition function, where:
- * <ul>
- *   <li> partitionId = value % {@link #_numPartitions}</li>
- * </ul>
- *
- */
+/// Modulo operation based partition function. Treats the input string as a 
base-10 long and runs
+/// the configured [PartitionIdNormalizer] (default 
[PartitionIdNormalizer#POSITIVE_MODULO])
+/// over it.
 public class ModuloPartitionFunction implements PartitionFunction {
   private static final String NAME = "Modulo";
+  private static final PartitionIdNormalizer DEFAULT_NORMALIZER = 
PartitionIdNormalizer.POSITIVE_MODULO;
   private final int _numPartitions;
+  private final PartitionIdNormalizer _normalizer;
 
-  /**
-   * Constructor for the class.
-   * @param numPartitions Number of partitions.
-   */
-  public ModuloPartitionFunction(int numPartitions) {
-    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0, specified", numPartitions);
+  public ModuloPartitionFunction(int numPartitions, @Nullable Map<String, 
String> functionConfig) {
+    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0, was: %s", numPartitions);
     _numPartitions = numPartitions;
+    _normalizer = PartitionFunctionConfigs.normalizer(functionConfig, 
DEFAULT_NORMALIZER);
   }
 
-  /**
-   * Returns partition id for a given value. Assumes that the passed in object
-   * is either an Integer, or a string representation of an Integer.
-   *
-   * @param value Value for which to determine the partition id.
-   * @return Partition id for the given value.
-   */
   @Override
   public int getPartition(String value) {
-    return toNonNegative((int) (Long.parseLong(value) % _numPartitions));
+    return _normalizer.getPartitionId(Long.parseLong(value), _numPartitions);
   }
 
   @Override
@@ -63,13 +55,14 @@ public class ModuloPartitionFunction implements 
PartitionFunction {
     return _numPartitions;
   }
 
+  @Override
+  public PartitionIdNormalizer getPartitionIdNormalizer() {
+    return _normalizer;
+  }
+
   // Keep it for backward-compatibility, use getName() instead
   @Override
   public String toString() {
     return NAME;
   }
-
-  private int toNonNegative(int partition) {
-    return partition < 0 ? partition + _numPartitions : partition;
-  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/Murmur3PartitionFunction.java
similarity index 75%
rename from 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/partition/function/Murmur3PartitionFunction.java
index 9a0110f810e..19b35c1aae3 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/Murmur3PartitionFunction.java
@@ -16,39 +16,39 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
 
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 
-/**
- * Implementation of {@link PartitionFunction} which partitions based on 32 
bit murmur3 hash
- */
+/// [PartitionFunction] backed by a 32-bit Murmur3 hash. The configured
+/// [PartitionIdNormalizer] (default [PartitionIdNormalizer#MASK]) is applied 
to the
+/// raw signed hash to derive the partition id.
 public class Murmur3PartitionFunction implements PartitionFunction {
   private static final String NAME = "Murmur3";
   private static final String SEED_KEY = "seed";
   private static final String VARIANT_KEY = "variant";
   private static final String USE_RAW_BYTES_KEY = "useRawBytes";
+  private static final PartitionIdNormalizer DEFAULT_NORMALIZER = 
PartitionIdNormalizer.MASK;
+
   private final int _numPartitions;
   @Nullable
   private final Map<String, String> _functionConfig;
   private final int _seed;
   private final boolean _useX64;
   private final boolean _useRawBytes;
+  private final PartitionIdNormalizer _normalizer;
 
-  /**
-   * Constructor for the class.
-   * @param numPartitions Number of partitions.
-   * @param functionConfig to extract configurations for the partition 
function.
-   */
   public Murmur3PartitionFunction(int numPartitions, @Nullable Map<String, 
String> functionConfig) {
     Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0");
     _numPartitions = numPartitions;
@@ -76,19 +76,21 @@ public class Murmur3PartitionFunction implements 
PartitionFunction {
     _seed = seed;
     _useX64 = useX64;
     _useRawBytes = useRawBytes;
+    _normalizer = PartitionFunctionConfigs.normalizer(functionConfig, 
DEFAULT_NORMALIZER);
   }
 
   @Override
   public int getPartition(String value) {
+    int hash;
     if (_useRawBytes) {
       byte[] bytes = BytesUtils.toBytes(value);
-      int hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(bytes, 
_seed)
+      hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(bytes, _seed)
           : MurmurHashFunctions.murmurHash3X86Bit32(bytes, _seed);
-      return (hash & Integer.MAX_VALUE) % _numPartitions;
+    } else {
+      hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(value, _seed)
+          : MurmurHashFunctions.murmurHash3X86Bit32(value.getBytes(UTF_8), 
_seed);
     }
-    int hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(value, _seed)
-        : MurmurHashFunctions.murmurHash3X86Bit32(value.getBytes(UTF_8), 
_seed);
-    return (hash & Integer.MAX_VALUE) % _numPartitions;
+    return _normalizer.getPartitionId(hash, _numPartitions);
   }
 
   @Override
@@ -107,6 +109,11 @@ public class Murmur3PartitionFunction implements 
PartitionFunction {
     return _functionConfig;
   }
 
+  @Override
+  public PartitionIdNormalizer getPartitionIdNormalizer() {
+    return _normalizer;
+  }
+
   // Keep it for backward-compatibility, use getName() instead
   @Override
   public String toString() {
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/MurmurPartitionFunction.java
similarity index 69%
rename from 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/partition/function/MurmurPartitionFunction.java
index 6c64dee763a..5396fbe07cc 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/MurmurPartitionFunction.java
@@ -16,53 +16,48 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
 
 import com.google.common.base.Preconditions;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 
-/**
- * Implementation of {@link PartitionFunction} which partitions based on 32 
bit murmur hash
- */
+/// [PartitionFunction] backed by a 32-bit Murmur2 hash. The configured
+/// [PartitionIdNormalizer] (default [PartitionIdNormalizer#MASK]) is applied 
to the
+/// raw signed hash to derive the partition id. Registered under both `Murmur` 
and `Murmur2`.
 public class MurmurPartitionFunction implements PartitionFunction {
   private static final String NAME = "Murmur";
+  private static final List<String> NAMES = List.of("Murmur", "Murmur2");
   private static final String USE_RAW_BYTES_KEY = "useRawBytes";
+  private static final PartitionIdNormalizer DEFAULT_NORMALIZER = 
PartitionIdNormalizer.MASK;
+
   private final int _numPartitions;
   @Nullable
   private final Map<String, String> _functionConfig;
   private final boolean _useRawBytes;
+  private final PartitionIdNormalizer _normalizer;
 
-  /**
-   * Constructor for backward compatibility.
-   * @param numPartitions Number of partitions.
-   */
-  public MurmurPartitionFunction(int numPartitions) {
-    this(numPartitions, null);
-  }
-
-  /**
-   * Constructor for the class.
-   * @param numPartitions Number of partitions.
-   * @param functionConfig to extract configurations for the partition 
function.
-   */
   public MurmurPartitionFunction(int numPartitions, @Nullable Map<String, 
String> functionConfig) {
     Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0");
     _numPartitions = numPartitions;
     _functionConfig = functionConfig != null ? 
Collections.unmodifiableMap(functionConfig) : null;
     _useRawBytes = functionConfig != null && 
Boolean.parseBoolean(functionConfig.get(USE_RAW_BYTES_KEY));
+    _normalizer = PartitionFunctionConfigs.normalizer(functionConfig, 
DEFAULT_NORMALIZER);
   }
 
   @Override
   public int getPartition(String value) {
     byte[] bytes = _useRawBytes ? BytesUtils.toBytes(value) : 
value.getBytes(UTF_8);
-    return (MurmurHashFunctions.murmurHash2(bytes) & Integer.MAX_VALUE) % 
_numPartitions;
+    return _normalizer.getPartitionId(MurmurHashFunctions.murmurHash2(bytes), 
_numPartitions);
   }
 
   @Override
@@ -70,6 +65,11 @@ public class MurmurPartitionFunction implements 
PartitionFunction {
     return NAME;
   }
 
+  @Override
+  public List<String> getNames() {
+    return NAMES;
+  }
+
   @Override
   public int getNumPartitions() {
     return _numPartitions;
@@ -81,6 +81,11 @@ public class MurmurPartitionFunction implements 
PartitionFunction {
     return _functionConfig;
   }
 
+  @Override
+  public PartitionIdNormalizer getPartitionIdNormalizer() {
+    return _normalizer;
+  }
+
   // Keep it for backward-compatibility, use getName() instead
   @Override
   public String toString() {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/partition/function/PartitionFunctionConfigs.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/PartitionFunctionConfigs.java
new file mode 100644
index 00000000000..721025dd1d1
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/PartitionFunctionConfigs.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition.function;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
+
+
+/// Shared config-parsing helpers for built-in partition functions.
+final class PartitionFunctionConfigs {
+  /// Config key under which an explicit [PartitionIdNormalizer] can be 
selected.
+  static final String PARTITION_ID_NORMALIZER_KEY = "partitionIdNormalizer";
+
+  private PartitionFunctionConfigs() {
+  }
+
+  /// Reads [#PARTITION_ID_NORMALIZER_KEY] from the function config and 
resolves it to a
+  /// [PartitionIdNormalizer]. Returns `defaultNormalizer` when the config is 
absent or
+  /// the value is blank. Throws [IllegalArgumentException] on an unrecognized 
value.
+  static PartitionIdNormalizer normalizer(@Nullable Map<String, String> 
functionConfig,
+      PartitionIdNormalizer defaultNormalizer) {
+    if (functionConfig == null) {
+      return defaultNormalizer;
+    }
+    String raw = functionConfig.get(PARTITION_ID_NORMALIZER_KEY);
+    if (StringUtils.isBlank(raw)) {
+      return defaultNormalizer;
+    }
+    return PartitionIdNormalizer.fromConfigString(raw);
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionFactoryTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionFactoryTest.java
new file mode 100644
index 00000000000..85cae7e8730
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionFactoryTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition.function;
+
+import java.util.Map;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/// Coverage for [PartitionFunctionFactory] dynamic registry behavior.
+public class PartitionFunctionFactoryTest {
+
+  @Test
+  public void testAllBuiltInFunctionsRegistered() {
+    // Resolves every built-in name. The test fails if the classpath subtype 
scan misses any impl
+    // or if its `getNames()` doesn't surface the expected canonical name.
+    assertTrue(
+        PartitionFunctionFactory.getPartitionFunction("Modulo", 4, null) 
instanceof ModuloPartitionFunction);
+    assertTrue(
+        PartitionFunctionFactory.getPartitionFunction("Murmur", 4, null) 
instanceof MurmurPartitionFunction);
+    assertTrue(
+        PartitionFunctionFactory.getPartitionFunction("Murmur2", 4, null) 
instanceof MurmurPartitionFunction);
+    assertTrue(
+        PartitionFunctionFactory.getPartitionFunction("Murmur3", 4, null) 
instanceof Murmur3PartitionFunction);
+    assertTrue(
+        PartitionFunctionFactory.getPartitionFunction("Fnv", 4, null) 
instanceof FnvPartitionFunction);
+    assertTrue(
+        PartitionFunctionFactory.getPartitionFunction("HashCode", 4, null) 
instanceof HashCodePartitionFunction);
+    assertTrue(
+        PartitionFunctionFactory.getPartitionFunction("ByteArray", 4, null) 
instanceof ByteArrayPartitionFunction);
+    assertTrue(
+        PartitionFunctionFactory.getPartitionFunction("BoundedColumnValue", 2,
+            Map.of("columnValues", "a", "columnValuesDelimiter", "|"))
+            instanceof BoundedColumnValuePartitionFunction);
+  }
+
+  @Test
+  public void testCaseInsensitiveLookup() {
+    // Names are matched after lower-casing. Both spellings resolve to the 
same impl class.
+    PartitionFunction lowerCase = 
PartitionFunctionFactory.getPartitionFunction("murmur3", 8, null);
+    PartitionFunction mixedCase = 
PartitionFunctionFactory.getPartitionFunction("MuRmUr3", 8, null);
+    assertEquals(lowerCase.getClass(), Murmur3PartitionFunction.class);
+    assertEquals(mixedCase.getClass(), Murmur3PartitionFunction.class);
+  }
+
+  @Test
+  public void testMurmurAndMurmur2AliasResolveToSameClass() {
+    // MurmurPartitionFunction overrides getNames() to return ["Murmur", 
"Murmur2"] so both
+    // names register against the same impl.
+    assertEquals(PartitionFunctionFactory.getPartitionFunction("Murmur", 4, 
null).getClass(),
+        PartitionFunctionFactory.getPartitionFunction("Murmur2", 4, 
null).getClass());
+  }
+
+  @Test
+  public void testGetNamesDefaultsToSingletonOfGetName() {
+    // The interface's default getNames() returns [getName()]. Verify a 
non-overriding impl
+    // surfaces a single-entry list whose only entry equals getName().
+    Murmur3PartitionFunction fn = new Murmur3PartitionFunction(4, null);
+    assertEquals(fn.getNames(), java.util.List.of(fn.getName()));
+  }
+
+  @Test
+  public void testMurmurOverridesGetNamesWithTwoAliases() {
+    // MurmurPartitionFunction is the only built-in that overrides getNames(); 
verify the
+    // override surfaces both aliases.
+    MurmurPartitionFunction fn = new MurmurPartitionFunction(4, null);
+    assertEquals(fn.getNames(), java.util.List.of("Murmur", "Murmur2"));
+  }
+
+  @Test
+  public void testUnknownNameThrows() {
+    IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+        () -> PartitionFunctionFactory.getPartitionFunction("DoesNotExist", 4, 
null));
+    assertTrue(e.getMessage().contains("DoesNotExist"));
+  }
+
+  @Test
+  public void testInitIsIdempotent() {
+    // Multiple components in the same JVM (e.g. controller + embedded broker 
in a quickstart) call
+    // init() independently. Repeated calls must not blow up.
+    PartitionFunctionFactory.init();
+    PartitionFunctionFactory.init();
+    PartitionFunctionFactory.init();
+    assertTrue(
+        PartitionFunctionFactory.getPartitionFunction("Modulo", 4, null) 
instanceof ModuloPartitionFunction);
+  }
+
+  @Test
+  public void testGetPartitionIdNormalizerPerImpl() {
+    // Locks the descriptive normalizer label that each built-in impl reports.
+    assertEquals(new ModuloPartitionFunction(4, 
null).getPartitionIdNormalizer(),
+        PartitionIdNormalizer.POSITIVE_MODULO);
+    assertEquals(new MurmurPartitionFunction(4, 
null).getPartitionIdNormalizer(),
+        PartitionIdNormalizer.MASK);
+    assertEquals(new Murmur3PartitionFunction(4, 
null).getPartitionIdNormalizer(),
+        PartitionIdNormalizer.MASK);
+    assertEquals(new HashCodePartitionFunction(4, 
null).getPartitionIdNormalizer(),
+        PartitionIdNormalizer.PRE_MODULO_ABS);
+    assertEquals(new ByteArrayPartitionFunction(4, 
null).getPartitionIdNormalizer(),
+        PartitionIdNormalizer.PRE_MODULO_ABS);
+    // FNV defaults to MASK; any normalizer is selectable through the 
partitionIdNormalizer config.
+    assertEquals(new FnvPartitionFunction(4, null).getPartitionIdNormalizer(),
+        PartitionIdNormalizer.MASK);
+    assertEquals(new FnvPartitionFunction(4, Map.of("partitionIdNormalizer", 
"abs")).getPartitionIdNormalizer(),
+        PartitionIdNormalizer.ABS);
+    // BoundedColumnValue's output is already in [0, N); reports NO_OP 
(identity).
+    PartitionFunction boundedColumnValue = new 
BoundedColumnValuePartitionFunction(2,
+        Map.of("columnValues", "a", "columnValuesDelimiter", "|"));
+    assertEquals(boundedColumnValue.getPartitionIdNormalizer(), 
PartitionIdNormalizer.NO_OP);
+  }
+
+  @Test
+  public void testPartitionIdNormalizerConfigOverridesDefaultAcrossImpls() {
+    // Every impl exposes the same `partitionIdNormalizer` config key. Verify 
that overriding the
+    // default rewires the actual partition-id computation (not just the 
reported label).
+    Map<String, String> mask = Map.of("partitionIdNormalizer", "MASK");
+
+    // HashCode: configured normalizer drives the output. Pick a value whose 
hashCode is negative
+    // (sweep until found) so PRE_MODULO_ABS vs MASK produces observably 
different partition ids.
+    String negativeHashValue = null;
+    int negativeHash = 0;
+    for (int i = 0; i < 1000 && negativeHashValue == null; i++) {
+      String candidate = "value-" + i;
+      if (candidate.hashCode() < 0) {
+        negativeHashValue = candidate;
+        negativeHash = candidate.hashCode();
+      }
+    }
+    assertTrue(negativeHashValue != null, "Failed to find a string with a 
negative hashCode in the search range");
+    assertEquals(new HashCodePartitionFunction(8, 
null).getPartition(negativeHashValue),
+        PartitionIdNormalizer.PRE_MODULO_ABS.getPartitionId(negativeHash, 8));
+    assertEquals(new HashCodePartitionFunction(8, 
mask).getPartition(negativeHashValue),
+        PartitionIdNormalizer.MASK.getPartitionId(negativeHash, 8));
+
+    // Modulo: explicit MASK on a negative input differs from the default 
POSITIVE_MODULO output.
+    long signedValue = -10L;
+    int posMod = new ModuloPartitionFunction(7, 
null).getPartition(Long.toString(signedValue));
+    int maskMod = new ModuloPartitionFunction(7, 
mask).getPartition(Long.toString(signedValue));
+    assertEquals(posMod, 
PartitionIdNormalizer.POSITIVE_MODULO.getPartitionId(signedValue, 7));
+    assertEquals(maskMod, 
PartitionIdNormalizer.MASK.getPartitionId(signedValue, 7));
+
+    // ByteArray: PRE_MODULO_ABS default; verify the override label 
round-trips on the SPI.
+    PartitionFunction byteArrayWithKafkaAbs = new ByteArrayPartitionFunction(4,
+        Map.of("partitionIdNormalizer", "PRE_MODULO_ABS"));
+    assertEquals(byteArrayWithKafkaAbs.getPartitionIdNormalizer(), 
PartitionIdNormalizer.PRE_MODULO_ABS);
+  }
+}
diff --git 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionTest.java
similarity index 98%
rename from 
pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
rename to 
pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionTest.java
index aa4537a23ac..3c852f7587c 100644
--- 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionTest.java
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.hash.FnvHashFunctions;
@@ -443,9 +445,10 @@ public class PartitionFunctionTest {
     assertEquals(partitionFunction.getNumPartitions(), numPartitions);
 
     JsonNode jsonNode = JsonUtils.objectToJsonNode(partitionFunction);
-    assertEquals(jsonNode.size(), 3);
+    assertEquals(jsonNode.size(), 4);
     assertEquals(jsonNode.get("name").asText().toLowerCase(), 
functionName.toLowerCase());
     assertEquals(jsonNode.get("numPartitions").asInt(), numPartitions);
+    assertTrue(jsonNode.has("partitionIdNormalizer"));
 
     JsonNode functionConfigNode = jsonNode.get("functionConfig");
     if (functionConfig == null) {
@@ -502,7 +505,7 @@ public class PartitionFunctionTest {
 
     // initialized {@link MurmurPartitionFunction} with 5 partitions
     int numPartitions = 5;
-    MurmurPartitionFunction murmurPartitionFunction = new 
MurmurPartitionFunction(numPartitions);
+    MurmurPartitionFunction murmurPartitionFunction = new 
MurmurPartitionFunction(numPartitions, null);
 
     // generate the same 10 String values
     // Apply the partition function and compare with stored results
@@ -618,7 +621,7 @@ public class PartitionFunctionTest {
     assertEquals(new FnvPartitionFunction(numPartitions, 
null).getPartition(value), javaCompatiblePartition);
 
     Map<String, String> functionConfig = new HashMap<>();
-    functionConfig.put("negativePartitionHandling", "abs");
+    functionConfig.put("partitionIdNormalizer", "abs");
     assertEquals(new FnvPartitionFunction(numPartitions, 
functionConfig).getPartition(value), saramaCompatPartition);
   }
 
@@ -636,19 +639,19 @@ public class PartitionFunctionTest {
     functionConfig.put("variant", "fnv1a_64");
     assertEquals(new FnvPartitionFunction(numPartitions, 
functionConfig).getPartition(value), javaCompatiblePartition);
 
-    functionConfig.put("negativePartitionHandling", "abs");
+    functionConfig.put("partitionIdNormalizer", "abs");
     assertEquals(new FnvPartitionFunction(numPartitions, 
functionConfig).getPartition(value), saramaCompatPartition);
   }
 
   @Test
   public void 
testFnvPartitionFunctionRejectsInvalidNegativePartitionHandling() {
     Map<String, String> functionConfig = new HashMap<>();
-    functionConfig.put("negativePartitionHandling", "saramaCompat");
+    functionConfig.put("partitionIdNormalizer", "saramaCompat");
 
     IllegalArgumentException exception =
         expectThrows(IllegalArgumentException.class, () -> new 
FnvPartitionFunction(4, functionConfig));
-    assertEquals(exception.getMessage(),
-        "FNV negative partition handling must be mask or abs, but was: 
'saramaCompat'");
+    assertTrue(exception.getMessage().contains("saramaCompat"),
+        "Expected error to mention the offending value, was: " + 
exception.getMessage());
   }
 
   @Test
@@ -768,7 +771,7 @@ public class PartitionFunctionTest {
 
     // initialized {@link ByteArrayPartitionFunction} with 5 partitions
     int numPartitions = 5;
-    ByteArrayPartitionFunction byteArrayPartitionFunction = new 
ByteArrayPartitionFunction(numPartitions);
+    ByteArrayPartitionFunction byteArrayPartitionFunction = new 
ByteArrayPartitionFunction(numPartitions, null);
 
     // generate the same 10 String values
     // Apply the partition function and compare with stored results
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index f266b490666..51807d8de87 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -145,6 +145,7 @@ import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
 import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.spi.config.instance.InstanceConfigValidatorRegistry;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableConfigValidatorRegistry;
@@ -287,9 +288,11 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       _helixResourceManager = null;
       _executorService = null;
     } else {
-      // Initialize FunctionRegistry before starting the admin application 
(PinotQueryResource requires it to compile
-      // queries)
+      // Initialize FunctionRegistry and PartitionFunctionFactory before 
starting the admin application
+      // (PinotQueryResource requires the function registry to compile 
queries; the partition factory must be
+      // populated before any segment metadata is read)
       FunctionRegistry.init();
+      PartitionFunctionFactory.init();
       _adminApp = createControllerAdminApp();
       // This executor service is used to do async tasks from multiget util or 
table rebalancing.
       _executorService = 
createExecutorService(_config.getControllerExecutorNumThreads(), 
"async-task-thread-%d");
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index a434327e291..99b5b23b7db 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -23,10 +23,10 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.partition.function.MurmurPartitionFunction;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.segment.spi.partition.MurmurPartitionFunction;
 import org.joda.time.Interval;
 import org.mockito.Mockito;
 
@@ -89,7 +89,7 @@ public class SegmentMetadataMockUtils {
     ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
     Set<Integer> partitions = Collections.singleton(partitionNumber);
     when(columnMetadata.getPartitions()).thenReturn(partitions);
-    when(columnMetadata.getPartitionFunction()).thenReturn(new 
MurmurPartitionFunction(5));
+    when(columnMetadata.getPartitionFunction()).thenReturn(new 
MurmurPartitionFunction(5, null));
 
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
     if (columnName != null) {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplDropRecordOnPartitionMismatchTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplDropRecordOnPartitionMismatchTest.java
index 400788c1b2c..65f546dc961 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplDropRecordOnPartitionMismatchTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplDropRecordOnPartitionMismatchTest.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.segment.local.indexsegment.mutable;
 
 import java.io.IOException;
-import org.apache.pinot.segment.spi.partition.ModuloPartitionFunction;
+import org.apache.pinot.common.partition.function.ModuloPartitionFunction;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -57,7 +57,7 @@ public class 
MutableSegmentImplDropRecordOnPartitionMismatchTest {
   public void testRecordsMatchingPartitionAreIndexed()
       throws IOException {
     _segment = MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA, 
PARTITION_COLUMN,
-        new ModuloPartitionFunction(NUM_PARTITIONS), MAIN_PARTITION_ID, true);
+        new ModuloPartitionFunction(NUM_PARTITIONS, null), MAIN_PARTITION_ID, 
true);
 
     // memberId values 0, 4, 8 all map to partition 0
     indexRow(_segment, 0, 100);
@@ -71,7 +71,7 @@ public class 
MutableSegmentImplDropRecordOnPartitionMismatchTest {
   public void testRecordsMismatchingPartitionAreDropped()
       throws IOException {
     _segment = MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA, 
PARTITION_COLUMN,
-        new ModuloPartitionFunction(NUM_PARTITIONS), MAIN_PARTITION_ID, true);
+        new ModuloPartitionFunction(NUM_PARTITIONS, null), MAIN_PARTITION_ID, 
true);
 
     // memberId values 1, 2, 3 map to partitions 1, 2, 3 — all mismatches
     indexRow(_segment, 1, 200);
@@ -85,7 +85,7 @@ public class 
MutableSegmentImplDropRecordOnPartitionMismatchTest {
   public void testMixedPartitionsOnlyIndexMatchingRecords()
       throws IOException {
     _segment = MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA, 
PARTITION_COLUMN,
-        new ModuloPartitionFunction(NUM_PARTITIONS), MAIN_PARTITION_ID, true);
+        new ModuloPartitionFunction(NUM_PARTITIONS, null), MAIN_PARTITION_ID, 
true);
 
     indexRow(_segment, 0, 100);  // partition 0 — indexed
     indexRow(_segment, 1, 101);  // partition 1 — dropped
@@ -99,7 +99,7 @@ public class 
MutableSegmentImplDropRecordOnPartitionMismatchTest {
   public void testConfigDisabledIndexesAllRecordsRegardlessOfPartition()
       throws IOException {
     _segment = MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA, 
PARTITION_COLUMN,
-        new ModuloPartitionFunction(NUM_PARTITIONS), MAIN_PARTITION_ID, false);
+        new ModuloPartitionFunction(NUM_PARTITIONS, null), MAIN_PARTITION_ID, 
false);
 
     indexRow(_segment, 0, 100);  // partition 0
     indexRow(_segment, 1, 101);  // partition 1
@@ -113,7 +113,7 @@ public class 
MutableSegmentImplDropRecordOnPartitionMismatchTest {
   public void testNullPartitionColumnValueThrowsException()
       throws IOException {
     _segment = MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA, 
PARTITION_COLUMN,
-        new ModuloPartitionFunction(NUM_PARTITIONS), MAIN_PARTITION_ID, true);
+        new ModuloPartitionFunction(NUM_PARTITIONS, null), MAIN_PARTITION_ID, 
true);
 
     GenericRow row = new GenericRow();
     row.putValue(PARTITION_COLUMN, null);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
index b00c69b87b4..66c87da4d6e 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
@@ -30,6 +30,7 @@ import java.util.stream.Stream;
 import org.apache.commons.configuration2.PropertiesConfiguration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.common.partition.function.BoundedColumnValuePartitionFunction;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.BaseSegmentCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -42,7 +43,6 @@ import org.apache.pinot.segment.spi.index.IndexService;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import 
org.apache.pinot.segment.spi.partition.BoundedColumnValuePartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
index 715e2af45b3..8f318e9b071 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.partition.function.ModuloPartitionFunction;
 import org.apache.pinot.segment.local.PinotBuffersAfterClassCheckRule;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -36,7 +37,6 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.partition.ModuloPartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
index 3e6d26146d6..c563a4ee3d7 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
@@ -18,43 +18,59 @@
  */
 package org.apache.pinot.segment.spi.partition;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 
 
-/**
- * Interface for partition function.
- *
- * Implementations of this interface are assumed not to be stateful.
- * That is, two invocations of {@code PartitionFunction.getPartition(value)}
- * with the same value are expected to produce the same result.
- */
+/// Interface for partition function.
+///
+/// Implementations of this interface are assumed not to be stateful. That is, 
two invocations of
+/// `PartitionFunction.getPartition(value)` with the same value are expected 
to produce the same
+/// result. Implementations must also be safe for concurrent invocation by 
multiple threads.
 public interface PartitionFunction extends Serializable {
 
-  /**
-   * Method to compute and return partition id for the given value.
-   * NOTE: The value is expected to be a string representation of the actual 
value.
-   *
-   * @param value Value for which to determine the partition id.
-   * @return partition id for the value.
-   */
+  /// Method to compute and return partition id for the given value.
+  /// NOTE: The value is expected to be a string representation of the actual 
value.
+  ///
+  /// @param value Value for which to determine the partition id.
+  /// @return partition id for the value.
   int getPartition(String value);
 
-  /**
-   * Returns the name of the partition function.
-   * @return Name of the partition function.
-   */
+  /// Returns the canonical name of the partition function.
+  ///
+  /// @return Name of the partition function.
   String getName();
 
-  /**
-   * Returns the total number of possible partitions.
-   * @return Number of possible partitions.
-   */
+  /// Returns every name (canonical + aliases) under which this partition 
function should be
+  /// registered with `PartitionFunctionFactory`. Defaults to a single-entry 
list containing
+  /// [#getName()]. Override only when you want additional aliases — e.g.
+  /// `MurmurPartitionFunction` registers under both `Murmur` and `Murmur2`.
+  @JsonIgnore
+  default List<String> getNames() {
+    return Collections.singletonList(getName());
+  }
+
+  /// Returns the total number of possible partitions.
+  ///
+  /// @return Number of possible partitions.
   int getNumPartitions();
 
   @Nullable
   default Map<String, String> getFunctionConfig() {
     return null;
   }
+
+  /// Reports the [PartitionIdNormalizer] that describes this partition 
function's int-to-id
+  /// mapping. The framework uses this for identity / staleness matching 
between config-side and
+  /// segment-side partition metadata, and (for the built-in implementations) 
as the actual driver
+  /// of the int-to-id computation.
+  ///
+  /// Each implementation must declare its own value — there is intentionally 
no default. Plug-ins
+  /// whose output is already in `[0, numPartitions)` (e.g. lookup-style 
functions) should return
+  /// [PartitionIdNormalizer#POSITIVE_MODULO] (a no-op label).
+  PartitionIdNormalizer getPartitionIdNormalizer();
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
index 43890a614c1..af25b5976d1 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
@@ -18,86 +18,147 @@
  */
 package org.apache.pinot.segment.spi.partition;
 
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-
-
-/**
- * Factory to build instances of {@link PartitionFunction}.
- */
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Dynamic registry for [PartitionFunction] implementations.
+///
+/// Discovery walks every public, concrete [PartitionFunction] subtype on the 
classpath under the
+/// `org.apache.pinot.*` package tree and registers each under the names 
returned by
+/// [PartitionFunction#getNames()] (defaults to `[getName()]`; override to 
declare aliases — e.g.
+/// `MurmurPartitionFunction` registers under both `Murmur` and `Murmur2`).
+///
+/// Each registrable class must be public, concrete, and expose a public 
constructor with signature
+/// `(int numPartitions, Map<String, String> functionConfig)` — the 
constructor used both for the
+/// startup `getNames()` probe (called with `(1, null)`) and for 
[#getPartitionFunction(String, int,
+/// Map)] at lookup time.
+///
+/// The static block scans the classpath once and builds an immutable 
(canonicalized name →
+/// constructor) map. To force eager initialization (e.g. so the scan happens 
before the first
+/// segment is read), call [#init()] from broker / server / controller startup.
 public class PartitionFunctionFactory {
-  // Enum for various partition functions to be added.
-  public enum PartitionFunctionType {
-    Modulo, Murmur, Murmur2, Murmur3, Fnv, ByteArray, HashCode, 
BoundedColumnValue;
-    // Add more functions here.
+  private PartitionFunctionFactory() {
+  }
 
-    private static final Map<String, PartitionFunctionType> VALUE_MAP = new 
HashMap<>();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionFunctionFactory.class);
+  private static final String SCAN_PACKAGE = "org.apache.pinot";
 
-    static {
-      for (PartitionFunctionType functionType : 
PartitionFunctionType.values()) {
-        VALUE_MAP.put(functionType.name().toLowerCase(), functionType);
+  private static final Map<String, Constructor<? extends PartitionFunction>> 
REGISTRY;
+
+  static {
+    long startTimeMs = System.currentTimeMillis();
+    Map<String, Constructor<? extends PartitionFunction>> registry = new 
HashMap<>();
+    Set<Class<? extends PartitionFunction>> subtypes = scanSubtypes();
+    for (Class<? extends PartitionFunction> clazz : subtypes) {
+      int mods = clazz.getModifiers();
+      if (!Modifier.isPublic(mods) || Modifier.isAbstract(mods) || 
clazz.isInterface()) {
+        continue;
+      }
+      Constructor<? extends PartitionFunction> constructor;
+      try {
+        constructor = clazz.getConstructor(int.class, Map.class);
+      } catch (NoSuchMethodException e) {
+        LOGGER.warn("Skipping {}: missing public constructor (int, Map<String, 
String>)", clazz.getName());
+        continue;
+      }
+      List<String> names = probeNames(clazz, constructor);
+      if (names == null) {
+        continue;
+      }
+      for (String name : names) {
+        if (name == null || name.trim().isEmpty()) {
+          LOGGER.warn("Skipping blank name for {}", clazz.getName());
+          continue;
+        }
+        String canonical = canonicalize(name.trim());
+        Constructor<? extends PartitionFunction> existing = 
registry.put(canonical, constructor);
+        Preconditions.checkState(existing == null || 
existing.getDeclaringClass().equals(clazz),
+            "Partition function name '%s' is registered to both %s and %s", 
name,
+            existing == null ? null : existing.getDeclaringClass().getName(), 
clazz.getName());
       }
     }
+    REGISTRY = Collections.unmodifiableMap(registry);
+    LOGGER.info("Initialized PartitionFunctionFactory with {} functions: {} in 
{}ms", REGISTRY.size(),
+        REGISTRY.keySet(), System.currentTimeMillis() - startTimeMs);
+  }
 
-    public static PartitionFunctionType fromString(String name) {
-      PartitionFunctionType functionType = VALUE_MAP.get(name.toLowerCase());
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private static Set<Class<? extends PartitionFunction>> scanSubtypes() {
+    final Set<?>[] result = new Set<?>[1];
+    PinotReflectionUtils.runWithLock(() ->
+        result[0] = new Reflections(new ConfigurationBuilder()
+            .setUrls(ClasspathHelper.forPackage(SCAN_PACKAGE))
+            .setScanners(new SubTypesScanner()))
+            .getSubTypesOf(PartitionFunction.class));
+    return (Set) result[0];
+  }
 
-      if (functionType == null) {
-        throw new IllegalArgumentException("No enum constant for: " + name);
+  /// Instantiates `clazz` with `(numPartitions = 1, functionConfig = null)` 
and returns the
+  /// names list reported by [PartitionFunction#getNames()]. Returns `null` 
(with a warning log)
+  /// when the probe construction fails — typically a function whose ctor 
requires non-null config
+  /// (e.g. `BoundedColumnValuePartitionFunction`); such functions need to 
either supply a usable
+  /// default config or skip auto-registration.
+  @Nullable
+  private static List<String> probeNames(Class<? extends PartitionFunction> 
clazz,
+      Constructor<? extends PartitionFunction> constructor) {
+    try {
+      PartitionFunction probe = constructor.newInstance(1, null);
+      List<String> names = probe.getNames();
+      if (names == null || names.isEmpty()) {
+        LOGGER.warn("Skipping {}: getNames() returned null/empty", 
clazz.getName());
+        return null;
       }
-      return functionType;
+      return names;
+    } catch (ReflectiveOperationException e) {
+      LOGGER.warn("Skipping {}: probing getNames() with (1, null) failed: {}", 
clazz.getName(),
+          e.getCause() != null ? e.getCause().getMessage() : e.getMessage());
+      return null;
     }
   }
 
-  /**
-   * Private constructor so that the class cannot be instantiated.
-   */
-  private PartitionFunctionFactory() {
+  /// No-op call that exists to force the static initializer of this class to 
run. Mirrors
+  /// `FunctionRegistry.init()` so callers can eagerly trigger classpath 
scanning during
+  /// service startup instead of paying the cost on the first partition 
function lookup.
+  public static void init() {
   }
 
-  /**
-   * This method generates and returns a partition function based on the 
provided string.
-   *
-   * @param functionName Name of partition function
-   * @param numPartitions Number of partitions.
-   * @param functionConfig The function configuration for given function.
-   * @return Partition function
-   */
-  // TODO: introduce a way to inject custom partition function
-  // a custom partition function could be used in the realtime stream 
partitioning or offline segment partitioning.
-  // The PartitionFunctionFactory should be able to support these default 
implementations, as well as instantiate
-  // based on config
+  /// Builds an instance of the partition function registered under 
`functionName`.
+  ///
+  /// @param functionName matched case-insensitively
+  /// @param numPartitions positive partition count
+  /// @param functionConfig optional, function-specific configuration; may be 
`null`
   public static PartitionFunction getPartitionFunction(String functionName, 
int numPartitions,
       @Nullable Map<String, String> functionConfig) {
-    PartitionFunctionType function = 
PartitionFunctionType.fromString(functionName);
-    switch (function) {
-      case Modulo:
-        return new ModuloPartitionFunction(numPartitions);
-
-      case Murmur:
-      case Murmur2:
-        return new MurmurPartitionFunction(numPartitions, functionConfig);
-
-      case Murmur3:
-        return new Murmur3PartitionFunction(numPartitions, functionConfig);
-
-      case Fnv:
-        return new FnvPartitionFunction(numPartitions, functionConfig);
-
-      case ByteArray:
-        return new ByteArrayPartitionFunction(numPartitions);
-
-      case HashCode:
-        return new HashCodePartitionFunction(numPartitions);
-
-      case BoundedColumnValue:
-        return new BoundedColumnValuePartitionFunction(numPartitions, 
functionConfig);
-
-      default:
-        throw new IllegalArgumentException("Illegal partition function name: " 
+ functionName);
+    Constructor<? extends PartitionFunction> constructor = 
REGISTRY.get(canonicalize(functionName));
+    Preconditions.checkArgument(constructor != null, "No partition function 
registered for name: %s", functionName);
+    try {
+      return constructor.newInstance(numPartitions, functionConfig);
+    } catch (ReflectiveOperationException e) {
+      Throwable cause = e.getCause() != null ? e.getCause() : e;
+      if (cause instanceof RuntimeException) {
+        throw (RuntimeException) cause;
+      }
+      throw new IllegalStateException(
+          "Failed to instantiate partition function '" + functionName + "' 
with " + numPartitions + " partitions",
+          cause);
     }
   }
 
@@ -108,4 +169,8 @@ public class PartitionFunctionFactory {
   public static PartitionFunction getPartitionFunction(ColumnPartitionMetadata 
metadata) {
     return getPartitionFunction(metadata.getFunctionName(), 
metadata.getNumPartitions(), metadata.getFunctionConfig());
   }
+
+  private static String canonicalize(String name) {
+    return name.toLowerCase(Locale.ROOT);
+  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizer.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizer.java
new file mode 100644
index 00000000000..8f5c4fed9a4
--- /dev/null
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition;
+
+import com.google.common.base.Preconditions;
+import java.util.Locale;
+
+
+/// Maps a raw signed integer hash output to a non-negative partition id in 
`[0, numPartitions)`.
+///
+/// [PartitionFunction] implementations apply the configured normalizer in 
their
+/// `getPartition(...)` body and report it via 
[PartitionFunction#getPartitionIdNormalizer()].
+/// The framework also uses the reported value for identity / staleness 
matching between
+/// config-side and segment-side partition metadata.
+public enum PartitionIdNormalizer {
+  /// Compute the remainder, then shift negative remainders into the valid 
range with `+ numPartitions`.
+  POSITIVE_MODULO {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      int partition = value % numPartitions;
+      return partition < 0 ? partition + numPartitions : partition;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      long partition = value % numPartitions;
+      return (int) (partition < 0 ? partition + numPartitions : partition);
+    }
+  },
+  /// Compute the remainder, then take its absolute value.
+  ABS {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      int partition = value % numPartitions;
+      return partition < 0 ? -partition : partition;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      long partition = value % numPartitions;
+      return (int) (partition < 0 ? -partition : partition);
+    }
+  },
+  /// Mask the sign bit before applying modulo.
+  MASK {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      return (value & Integer.MAX_VALUE) % numPartitions;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      return (int) ((value & Long.MAX_VALUE) % numPartitions);
+    }
+  },
+  /// Pre-modulo abs (Kafka-style) `abs(value) % numPartitions` that handles 
`Integer.MIN_VALUE -> 0`
+  /// (and `Long.MIN_VALUE -> 0`) to avoid the `Math.abs` overflow corner. 
Matches the
+  /// legacy semantics of `HashCodePartitionFunction` and 
`ByteArrayPartitionFunction`.
+  PRE_MODULO_ABS {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      int abs = (value == Integer.MIN_VALUE) ? 0 : Math.abs(value);
+      return abs % numPartitions;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      long abs = (value == Long.MIN_VALUE) ? 0L : Math.abs(value);
+      return (int) (abs % numPartitions);
+    }
+  },
+  /// Identity. Returns the input unchanged (narrowed to `int` for the long 
overload). Use only
+  /// when the upstream `PartitionFunction#getPartition` value is already 
guaranteed to be in
+  /// `[0, numPartitions)` — e.g. lookup-style functions like 
`BoundedColumnValuePartitionFunction`.
+  /// The framework does NOT validate that the input is in range; passing an 
out-of-range value
+  /// yields an out-of-range partition id.
+  NO_OP {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      return value;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      return (int) value;
+    }
+  };
+
+  public final int getPartitionId(int value, int numPartitions) {
+    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0");
+    return toPartitionId(value, numPartitions);
+  }
+
+  public final int getPartitionId(long value, int numPartitions) {
+    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0");
+    return toPartitionId(value, numPartitions);
+  }
+
+  public static PartitionIdNormalizer fromConfigString(String 
partitionIdNormalizer) {
+    Preconditions.checkArgument(partitionIdNormalizer != null && 
!partitionIdNormalizer.trim().isEmpty(),
+        "'partitionIdNormalizer' must not be blank");
+    try {
+      return valueOf(partitionIdNormalizer.trim().toUpperCase(Locale.ROOT));
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Unsupported partitionIdNormalizer: " 
+ partitionIdNormalizer, e);
+    }
+  }
+
+  abstract int toPartitionId(int value, int numPartitions);
+
+  abstract int toPartitionId(long value, int numPartitions);
+}
diff --git 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizerTest.java
 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizerTest.java
new file mode 100644
index 00000000000..b93af8b93a9
--- /dev/null
+++ 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizerTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+public class PartitionIdNormalizerTest {
+
+  @Test
+  public void testPositiveModulo() {
+    PartitionIdNormalizer n = PartitionIdNormalizer.POSITIVE_MODULO;
+    assertEquals(n.getPartitionId(0, 7), 0);
+    assertEquals(n.getPartitionId(10, 7), 3);
+    assertEquals(n.getPartitionId(-1, 7), 6);
+    assertEquals(n.getPartitionId(Integer.MIN_VALUE, 7), 
shiftedMod(Integer.MIN_VALUE, 7));
+    assertEquals(n.getPartitionId(Integer.MAX_VALUE, 1024), Integer.MAX_VALUE 
% 1024);
+    assertEquals(n.getPartitionId(7L, 7), 0);
+    assertEquals(n.getPartitionId(Long.MIN_VALUE, 7), 
shiftedMod(Long.MIN_VALUE, 7));
+  }
+
+  @Test
+  public void testAbs() {
+    PartitionIdNormalizer n = PartitionIdNormalizer.ABS;
+    assertEquals(n.getPartitionId(10, 7), 3);
+    assertEquals(n.getPartitionId(-10, 7), 3);
+    assertEquals(n.getPartitionId(Integer.MIN_VALUE, 7), 
Math.abs(Integer.MIN_VALUE % 7));
+    assertEquals(n.getPartitionId(-10L, 7), 3);
+  }
+
+  @Test
+  public void testMask() {
+    PartitionIdNormalizer n = PartitionIdNormalizer.MASK;
+    assertEquals(n.getPartitionId(0, 7), 0);
+    assertEquals(n.getPartitionId(-1, 7), Integer.MAX_VALUE % 7);
+    assertEquals(n.getPartitionId(Integer.MIN_VALUE, 7), 0);
+    assertEquals(n.getPartitionId(Long.MIN_VALUE, 7), 0);
+  }
+
+  @Test
+  public void testPreModuloAbs() {
+    PartitionIdNormalizer n = PartitionIdNormalizer.PRE_MODULO_ABS;
+    // Plain positive / negative: matches abs(hash) % N.
+    assertEquals(n.getPartitionId(10, 7), 10 % 7);
+    assertEquals(n.getPartitionId(-10, 7), 10 % 7);
+    // The defining corner case: Math.abs(MIN_VALUE) overflows back to 
MIN_VALUE, so PRE_MODULO_ABS
+    // patches that to 0 before the modulo.
+    assertEquals(n.getPartitionId(Integer.MIN_VALUE, 7), 0);
+    assertEquals(n.getPartitionId(Long.MIN_VALUE, 7), 0);
+    // For non-MIN_VALUE inputs, PRE_MODULO_ABS and (Math.abs(value) % N) 
agree.
+    assertEquals(n.getPartitionId(Integer.MAX_VALUE, 1024), Integer.MAX_VALUE 
% 1024);
+    assertEquals(n.getPartitionId(-1234L, 17), 1234 % 17);
+  }
+
+  @Test
+  public void testNoOpIsIdentity() {
+    PartitionIdNormalizer n = PartitionIdNormalizer.NO_OP;
+    // Identity for every input — the caller is asserting the value is already 
in [0, N).
+    assertEquals(n.getPartitionId(0, 7), 0);
+    assertEquals(n.getPartitionId(3, 7), 3);
+    // The framework does NOT validate the input is in range; passing 
out-of-range is the
+    // caller's responsibility.
+    assertEquals(n.getPartitionId(-5, 7), -5);
+    assertEquals(n.getPartitionId(99, 7), 99);
+    // long overload narrows to int.
+    assertEquals(n.getPartitionId(42L, 7), 42);
+    assertEquals(n.getPartitionId((long) Integer.MAX_VALUE, 1024), 
Integer.MAX_VALUE);
+  }
+
+  @Test
+  public void testRangeFoldingNormalizersReturnInRange() {
+    int[] partitionCounts = {1, 2, 7, 1024};
+    int[] hashes = {0, 1, -1, 7, -7, Integer.MIN_VALUE, Integer.MAX_VALUE, 13, 
-13};
+    // NO_OP is excluded — it's identity and only safe when the caller already 
produced an
+    // in-range value. The other four normalizers fold every signed input into 
[0, N).
+    for (PartitionIdNormalizer n : PartitionIdNormalizer.values()) {
+      if (n == PartitionIdNormalizer.NO_OP) {
+        continue;
+      }
+      for (int p : partitionCounts) {
+        for (int h : hashes) {
+          int pid = n.getPartitionId(h, p);
+          assertTrue(pid >= 0 && pid < p,
+              n + " produced out-of-range partition " + pid + " for hash=" + h 
+ ", N=" + p);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testFromConfigStringRoundTrip() {
+    for (PartitionIdNormalizer n : PartitionIdNormalizer.values()) {
+      assertEquals(PartitionIdNormalizer.fromConfigString(n.name()), n);
+      
assertEquals(PartitionIdNormalizer.fromConfigString(n.name().toLowerCase()), n);
+      assertEquals(PartitionIdNormalizer.fromConfigString("  " + n.name() + "  
"), n);
+    }
+  }
+
+  @Test
+  public void testFromConfigStringRejectsBlank() {
+    expectThrows(IllegalArgumentException.class, () -> 
PartitionIdNormalizer.fromConfigString(null));
+    expectThrows(IllegalArgumentException.class, () -> 
PartitionIdNormalizer.fromConfigString(""));
+    expectThrows(IllegalArgumentException.class, () -> 
PartitionIdNormalizer.fromConfigString("   "));
+  }
+
+  @Test
+  public void testFromConfigStringRejectsUnknown() {
+    IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+        () -> PartitionIdNormalizer.fromConfigString("not-a-real-normalizer"));
+    assertTrue(e.getMessage().contains("not-a-real-normalizer"));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testNonPositivePartitionsRejected() {
+    PartitionIdNormalizer.POSITIVE_MODULO.getPartitionId(5, 0);
+  }
+
+  private static int shiftedMod(long value, int numPartitions) {
+    long m = value % numPartitions;
+    return (int) (m < 0 ? m + numPartitions : m);
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 3e66c49b61e..9bc1da9edc7 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -46,6 +46,7 @@ import 
org.apache.pinot.query.runtime.KeepPipelineBreakerStatsPredicate;
 import org.apache.pinot.query.runtime.SendStatsPredicate;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
 import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.server.access.AccessControl;
 import org.apache.pinot.server.access.AccessControlFactory;
 import org.apache.pinot.server.access.AllowAllAccessFactory;
@@ -107,10 +108,11 @@ public class ServerInstance {
     _instanceDataManager.init(serverConf.getInstanceDataManagerConfig(), 
helixManager, _serverMetrics,
         segmentOperationsThrottlerSet, _reloadJobStatusCache);
 
-    // Initialize ServerQueryLogger and FunctionRegistry before starting the 
query executor
+    // Initialize ServerQueryLogger, FunctionRegistry and 
PartitionFunctionFactory before starting the query executor
     ServerQueryLogger.init(serverConf.getQueryLogMaxRate(), 
serverConf.getQueryLogDroppedReportMaxRate(),
         _serverMetrics);
     FunctionRegistry.init();
+    PartitionFunctionFactory.init();
     String queryExecutorClassName = serverConf.getQueryExecutorClassName();
     LOGGER.info("Initializing query executor of class: {}", 
queryExecutorClassName);
     _queryExecutor = 
PluginManager.get().createInstance(queryExecutorClassName);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to