Jackie-Jiang commented on code in PR #18165:
URL: https://github.com/apache/pinot/pull/18165#discussion_r3068433604


##########
pinot-tools/src/test/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommandTest.java:
##########
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.admin.command;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.client.admin.SegmentAdminClient;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import 
org.apache.pinot.segment.spi.partition.pipeline.PartitionPipelineFunction;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class LaunchBackfillIngestionJobCommandTest {

Review Comment:
   I don't think we need this test. The partition function related tests are 
covered on its own



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionIntNormalizer.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition.pipeline;
+
+import com.google.common.base.Preconditions;
+import java.util.Locale;
+import javax.annotation.Nullable;
+
+
+/**
+ * Normalizes the final INT output from a compiled partition pipeline into a 
partition id.
+ */
+public enum PartitionIntNormalizer {

Review Comment:
   For each mode, we need to differentiate pre-modulo vs post-modulo.
   For pre-modulo, there are 2 common ways to make the value positive:
   - `abs()` (need to handle MIN_VALUE)
   - mask
   For post-modulo, there are 2 common ways to make the partition id positive:
   - `abs()`
   - `+ numPartitions`
   
   For existing partition function, do you think we can put a default 
normalizer (based on current impl)? This way user can use a different 
normalizer if necessary for common partition function



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionIntNormalizer.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition.pipeline;
+
+import com.google.common.base.Preconditions;
+import java.util.Locale;
+import javax.annotation.Nullable;
+
+
+/**
+ * Normalizes the final INT output from a compiled partition pipeline into a 
partition id.
+ */
+public enum PartitionIntNormalizer {
+  POSITIVE_MODULO {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      int partition = value % numPartitions;
+      return partition < 0 ? partition + numPartitions : partition;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      long partition = value % numPartitions;
+      return (int) (partition < 0 ? partition + numPartitions : partition);
+    }
+  },
+  ABS {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      int partition = value % numPartitions;
+      return partition < 0 ? -partition : partition;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      long partition = value % numPartitions;
+      return (int) (partition < 0 ? -partition : partition);
+    }
+  },
+  MASK {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      return (value & Integer.MAX_VALUE) % numPartitions;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      return (int) ((value & Long.MAX_VALUE) % numPartitions);
+    }
+  };
+
+  public final int getPartitionId(int value, int numPartitions) {
+    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0");
+    return toPartitionId(value, numPartitions);
+  }
+
+  public final int getPartitionId(long value, int numPartitions) {
+    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must 
be > 0");
+    return toPartitionId(value, numPartitions);
+  }
+
+  public static PartitionIntNormalizer fromConfigString(@Nullable String 
partitionIdNormalizer) {
+    Preconditions.checkArgument(partitionIdNormalizer != null && 
!partitionIdNormalizer.trim().isEmpty(),

Review Comment:
   This pre-condition check means `partitionIdNormalizer` cannot be null



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompiler.java:
##########
@@ -0,0 +1,1206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition.pipeline;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.annotations.ScalarFunction;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.utils.ScalarFunctionUtils;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * Compiles a restricted partition-function expression into a typed {@link 
PartitionPipeline} backed by deterministic
+ * scalar functions.
+ */
+public final class PartitionFunctionExprCompiler {

Review Comment:
   Can we reuse existing `InbuiltFunctionEvaluator`? We don't want to maintain 
multiple logic of function evaluation. With `FunctionEvaluator`, you can even 
allow groovy



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionIntNormalizer.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition.pipeline;
+
+import com.google.common.base.Preconditions;
+import java.util.Locale;
+import javax.annotation.Nullable;
+
+
+/**
+ * Normalizes the final INT output from a compiled partition pipeline into a 
partition id.
+ */
+public enum PartitionIntNormalizer {
+  POSITIVE_MODULO {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      int partition = value % numPartitions;
+      return partition < 0 ? partition + numPartitions : partition;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      long partition = value % numPartitions;
+      return (int) (partition < 0 ? partition + numPartitions : partition);
+    }
+  },
+  ABS {

Review Comment:
   There are 2 ways to do abs:
   - The one showing here (used in `ModuloPartitionFunction`)
   - First calculate abs of value, then do modulo (used in 
`HashCodePartitionFunction`)



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java:
##########
@@ -72,10 +75,12 @@ public class SegmentPartitionMetadataManager implements 
SegmentZkMetadataFetchLi
   private transient TablePartitionReplicatedServersInfo 
_tablePartitionReplicatedServersInfo;
 
   public SegmentPartitionMetadataManager(String tableNameWithType, String 
partitionColumn, String partitionFunctionName,

Review Comment:
   Directly pass in `ColumnPartitionConfig`



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/ScalarFunctionUtils.java:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import java.lang.reflect.Method;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.annotations.ScalarFunction;
+
+
+/**
+ * Helper methods for discovering and registering scalar functions.
+ */
+public final class ScalarFunctionUtils {
+  public static final String SCALAR_FUNCTION_PACKAGE_REGEX = 
".*\\.function\\..*";
+
+  private ScalarFunctionUtils() {
+  }
+
+  public static Set<Class<?>> getScalarFunctionClasses() {
+    return 
PinotReflectionUtils.getClassesThroughReflection(SCALAR_FUNCTION_PACKAGE_REGEX, 
ScalarFunction.class);
+  }
+
+  public static Set<Method> getScalarFunctionMethods() {
+    return 
PinotReflectionUtils.getMethodsThroughReflection(SCALAR_FUNCTION_PACKAGE_REGEX, 
ScalarFunction.class);
+  }
+
+  public static List<String> getScalarFunctionNames(ScalarFunction 
scalarFunction, String defaultName) {
+    String[] names = scalarFunction.names();
+    if (names.length == 0) {
+      return List.of(canonicalize(defaultName));
+    }
+
+    Set<String> canonicalNames = new LinkedHashSet<>();
+    for (String name : names) {
+      canonicalNames.add(canonicalize(name));
+    }
+    return List.copyOf(canonicalNames);
+  }
+
+  public static String canonicalize(String name) {

Review Comment:
   Consider putting this into `FunctionUtils`. We have multiple 
`canonicalize()` utils, and we can unify them in the future



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/metadata/ColumnPartitionMetadata.java:
##########
@@ -139,6 +173,11 @@ private static void addRangeToPartitions(String 
rangeString, Set<Integer> partit
     }
   }
 
+  @Nullable
+  private static String normalizeOptionalText(@Nullable String value) {
+    return StringUtils.isBlank(value) || StringUtils.equalsIgnoreCase(value, 
"null") ? null : value;

Review Comment:
   (minor) `StringUtils.equalsIgnoreCase()` is deprecated



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/metadata/ColumnPartitionMetadata.java:
##########
@@ -62,12 +65,31 @@ public class ColumnPartitionMetadata {
    */
   public ColumnPartitionMetadata(String functionName, int numPartitions, 
Set<Integer> partitions,
       @Nullable Map<String, String> functionConfig) {
+    this(functionName, numPartitions, partitions, functionConfig, null, null);
+  }
+
+  public ColumnPartitionMetadata(String functionName, int numPartitions, 
Set<Integer> partitions,

Review Comment:
   Don't add these new constructors. We should deprecate the existing 
constructor and always take `PartitionFunction`



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/metadata/ColumnPartitionMetadata.java:
##########
@@ -92,15 +124,17 @@ public boolean equals(Object obj) {
     if (obj instanceof ColumnPartitionMetadata) {
       ColumnPartitionMetadata that = (ColumnPartitionMetadata) obj;
       return _functionName.equals(that._functionName) && _numPartitions == 
that._numPartitions && _partitions.equals(
-          that._partitions) && Objects.equals(_functionConfig, 
that._functionConfig);
+          that._partitions) && Objects.equals(_functionConfig, 
that._functionConfig) && Objects.equals(_functionExpr,
+          that._functionExpr) && Objects.equals(_partitionIdNormalizer, 
that._partitionIdNormalizer);
     }
     return false;
   }
 
   @Override
   public int hashCode() {
     return 37 * 37 * _functionName.hashCode() + 37 * _numPartitions + 
_partitions.hashCode()
-        + Objects.hashCode(_functionConfig);
+        + Objects.hashCode(_functionConfig) + Objects.hashCode(_functionExpr)

Review Comment:
   Consider adding equals and hash to `PartitionFunction`, or simply remove 
them. I don't think we need to compare metadata



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/ScalarFunctionUtils.java:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import java.lang.reflect.Method;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.annotations.ScalarFunction;
+
+
+/**
+ * Helper methods for discovering and registering scalar functions.
+ */
+public final class ScalarFunctionUtils {
+  public static final String SCALAR_FUNCTION_PACKAGE_REGEX = 
".*\\.function\\..*";
+
+  private ScalarFunctionUtils() {
+  }

Review Comment:
   (nit) We usually put this next to the class definition for util class for 
readability



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to