yashmayya commented on code in PR #17279:
URL: https://github.com/apache/pinot/pull/17279#discussion_r2590894473


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgPrecisionAggregationFunction.java:
##########
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.AvgPrecisionPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * This function is used for BigDecimal average calculations.
+ * It supports the average aggregation using precision and scale.
+ * <p>The function can be used as AVGPRECISION(expression, precision, scale, 
roundingMode)
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>Expression: expression that contains the values to be averaged, can 
be serialized BigDecimal objects</li>

Review Comment:
   This doesn't actually support serialized `BigDecimal` objects right? It 
seems to support serialized `AvgPrecisionPair` objects, but given that we 
haven't wired in support for this function with the star-tree index, that case 
won't ever arise anyway.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgPrecisionAggregationFunction.java:
##########
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.AvgPrecisionPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * This function is used for BigDecimal average calculations.
+ * It supports the average aggregation using precision and scale.
+ * <p>The function can be used as AVGPRECISION(expression, precision, scale, 
roundingMode)
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>Expression: expression that contains the values to be averaged, can 
be serialized BigDecimal objects</li>
+ *   <li>Precision (optional): precision to be set to the final result</li>
+ *   <li>Scale (optional): scale to be set to the final result</li>
+ *   <li>RoundingMode (optional): rounding mode to be used (default: 
HALF_EVEN)</li>
+ * </ul>
+ */
+public class AvgPrecisionAggregationFunction
+    extends NullableSingleInputAggregationFunction<AvgPrecisionPair, 
BigDecimal> {
+  private final Integer _precision;
+  private final Integer _scale;
+  private final RoundingMode _roundingMode;
+
+  public AvgPrecisionAggregationFunction(List<ExpressionContext> arguments, 
boolean nullHandlingEnabled) {
+    super(arguments.get(0), nullHandlingEnabled);
+
+    int numArguments = arguments.size();
+    Preconditions.checkArgument(numArguments <= 4, "AvgPrecision expects at 
most 4 arguments, got: %s", numArguments);
+
+    if (numArguments > 1) {
+      _precision = arguments.get(1).getLiteral().getIntValue();
+      if (numArguments > 2) {
+        _scale = arguments.get(2).getLiteral().getIntValue();
+        if (numArguments > 3) {
+          String roundingModeStr = 
arguments.get(3).getLiteral().getStringValue();
+          _roundingMode = RoundingMode.valueOf(roundingModeStr.toUpperCase());
+        } else {
+          _roundingMode = RoundingMode.HALF_EVEN;
+        }
+      } else {
+        _scale = null;
+        _roundingMode = RoundingMode.HALF_EVEN;
+      }
+    } else {
+      _precision = null;
+      _scale = null;
+      _roundingMode = RoundingMode.HALF_EVEN;

Review Comment:
   nit: we can initialize these as the default values instead of setting it in 
multiple different else branches



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/AvgPrecisionPairTest.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class AvgPrecisionPairTest {
+
+  @Test
+  public void testConstructorAndGetters() {
+    BigDecimal sum = new BigDecimal("123.456");
+    long count = 10L;
+
+    AvgPrecisionPair pair = new AvgPrecisionPair(sum, count);
+
+    assertEquals(pair.getSum(), sum);
+    assertEquals(pair.getCount(), count);
+  }
+
+  @Test
+  public void testApply() {
+    AvgPrecisionPair pair = new AvgPrecisionPair(new BigDecimal("100"), 5L);
+
+    pair.apply(new BigDecimal("50"), 3L);
+
+    assertEquals(pair.getSum(), new BigDecimal("150"));
+    assertEquals(pair.getCount(), 8L);
+  }
+
+  @Test
+  public void testApplyWithNegativeValues() {
+    AvgPrecisionPair pair = new AvgPrecisionPair(new BigDecimal("100"), 5L);
+
+    pair.apply(new BigDecimal("-30"), 2L);
+
+    assertEquals(pair.getSum(), new BigDecimal("70"));
+    assertEquals(pair.getCount(), 7L);
+  }
+
+  @Test
+  public void testApplyWithZeroCount() {
+    AvgPrecisionPair pair = new AvgPrecisionPair(new BigDecimal("100"), 5L);
+
+    pair.apply(new BigDecimal("50"), 0L);
+
+    assertEquals(pair.getSum(), new BigDecimal("150"));
+    assertEquals(pair.getCount(), 5L);
+  }
+
+  @Test
+  public void testApplyWithLargeNumbers() {
+    BigDecimal largeSum = new 
BigDecimal("999999999999999999999999999999.999999999");
+    AvgPrecisionPair pair = new AvgPrecisionPair(largeSum, 1000000L);
+
+    BigDecimal additionalSum = new 
BigDecimal("111111111111111111111111111111.111111111");
+    pair.apply(additionalSum, 500000L);
+
+    assertEquals(pair.getSum(), largeSum.add(additionalSum));
+    assertEquals(pair.getCount(), 1500000L);
+  }
+
+  @Test
+  public void testSerializationDeserialization() {
+    BigDecimal sum = new BigDecimal("12345.6789");
+    long count = 42L;
+    AvgPrecisionPair original = new AvgPrecisionPair(sum, count);
+
+    byte[] bytes = original.toBytes();
+    AvgPrecisionPair deserialized = AvgPrecisionPair.fromBytes(bytes);
+
+    assertEquals(deserialized.getSum(), sum);
+    assertEquals(deserialized.getCount(), count);
+  }
+
+  @Test
+  public void testSerializationDeserializationWithByteBuffer() {
+    BigDecimal sum = new BigDecimal("98765.4321");
+    long count = 100L;
+    AvgPrecisionPair original = new AvgPrecisionPair(sum, count);
+
+    byte[] bytes = original.toBytes();
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    AvgPrecisionPair deserialized = 
AvgPrecisionPair.fromByteBuffer(byteBuffer);
+
+    assertEquals(deserialized.getSum(), sum);
+    assertEquals(deserialized.getCount(), count);
+  }
+
+  @Test
+  public void testSerializationWithZeroValues() {
+    AvgPrecisionPair original = new AvgPrecisionPair(BigDecimal.ZERO, 0L);
+
+    byte[] bytes = original.toBytes();
+    AvgPrecisionPair deserialized = AvgPrecisionPair.fromBytes(bytes);
+
+    assertEquals(deserialized.getSum(), BigDecimal.ZERO);
+    assertEquals(deserialized.getCount(), 0L);
+  }
+
+  @Test
+  public void testSerializationWithNegativeSum() {
+    BigDecimal negativeSum = new BigDecimal("-12345.6789");
+    long count = 10L;
+    AvgPrecisionPair original = new AvgPrecisionPair(negativeSum, count);
+
+    byte[] bytes = original.toBytes();
+    AvgPrecisionPair deserialized = AvgPrecisionPair.fromBytes(bytes);
+
+    assertEquals(deserialized.getSum(), negativeSum);
+    assertEquals(deserialized.getCount(), count);
+  }
+
+  @Test
+  public void testCompareTo() {
+    AvgPrecisionPair pair1 = new AvgPrecisionPair(new BigDecimal("100"), 10L);
+    AvgPrecisionPair pair2 = new AvgPrecisionPair(new BigDecimal("200"), 10L);
+    AvgPrecisionPair pair3 = new AvgPrecisionPair(new BigDecimal("100"), 10L);
+
+    assertTrue(pair1.compareTo(pair2) < 0);
+    assertTrue(pair2.compareTo(pair1) > 0);
+    assertEquals(pair1.compareTo(pair3), 0);
+  }
+
+  @Test
+  public void testCompareToWithDifferentCounts() {
+    // Average of pair1: 100/10 = 10
+    AvgPrecisionPair pair1 = new AvgPrecisionPair(new BigDecimal("100"), 10L);
+    // Average of pair2: 200/10 = 20
+    AvgPrecisionPair pair2 = new AvgPrecisionPair(new BigDecimal("200"), 10L);
+    // Average of pair3: 150/10 = 15
+    AvgPrecisionPair pair3 = new AvgPrecisionPair(new BigDecimal("150"), 10L);
+
+    assertTrue(pair1.compareTo(pair2) < 0);
+    assertTrue(pair2.compareTo(pair3) > 0);
+    assertTrue(pair1.compareTo(pair3) < 0);
+  }
+
+  @Test
+  public void testCompareToWithZeroCount() {
+    AvgPrecisionPair pair1 = new AvgPrecisionPair(new BigDecimal("100"), 0L);
+    AvgPrecisionPair pair2 = new AvgPrecisionPair(new BigDecimal("200"), 0L);
+
+    // Both have zero count, averages are undefined, so they are equal
+    assertEquals(pair1.compareTo(pair2), 0);
+  }
+
+  @Test
+  public void testRoundTripSerializationWithVeryLargeNumbers() {
+    BigDecimal veryLargeSum = new 
BigDecimal("123456789012345678901234567890.123456789012345678901234567890");
+    long veryLargeCount = Long.MAX_VALUE;
+    AvgPrecisionPair original = new AvgPrecisionPair(veryLargeSum, 
veryLargeCount);
+
+    byte[] bytes = original.toBytes();
+    AvgPrecisionPair deserialized = AvgPrecisionPair.fromBytes(bytes);
+
+    assertEquals(deserialized.getSum(), veryLargeSum);
+    assertEquals(deserialized.getCount(), veryLargeCount);
+  }
+
+  @Test
+  public void testMultipleApplyCalls() {
+    AvgPrecisionPair pair = new AvgPrecisionPair(BigDecimal.ZERO, 0L);
+
+    pair.apply(new BigDecimal("10"), 1L);
+    pair.apply(new BigDecimal("20"), 2L);
+    pair.apply(new BigDecimal("30"), 3L);
+
+    assertEquals(pair.getSum(), new BigDecimal("60"));
+    assertEquals(pair.getCount(), 6L);
+  }
+
+  @Test
+  public void testPrecisionPreservation() {
+    // Test that BigDecimal precision is preserved through serialization
+    BigDecimal preciseSum = new BigDecimal("0.123456789012345678901234567890");
+    AvgPrecisionPair original = new AvgPrecisionPair(preciseSum, 1L);
+
+    byte[] bytes = original.toBytes();
+    AvgPrecisionPair deserialized = AvgPrecisionPair.fromBytes(bytes);
+
+    assertEquals(deserialized.getSum(), preciseSum);
+    assertEquals(deserialized.getSum().scale(), preciseSum.scale());

Review Comment:
   The test says it's checking precision preservation but we're actually only 
comparing the scale and not the precision?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgPrecisionAggregationFunction.java:
##########
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.AvgPrecisionPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * This function is used for BigDecimal average calculations.
+ * It supports the average aggregation using precision and scale.
+ * <p>The function can be used as AVGPRECISION(expression, precision, scale, 
roundingMode)
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>Expression: expression that contains the values to be averaged, can 
be serialized BigDecimal objects</li>
+ *   <li>Precision (optional): precision to be set to the final result</li>
+ *   <li>Scale (optional): scale to be set to the final result</li>
+ *   <li>RoundingMode (optional): rounding mode to be used (default: 
HALF_EVEN)</li>
+ * </ul>
+ */
+public class AvgPrecisionAggregationFunction
+    extends NullableSingleInputAggregationFunction<AvgPrecisionPair, 
BigDecimal> {
+  private final Integer _precision;
+  private final Integer _scale;
+  private final RoundingMode _roundingMode;
+
+  public AvgPrecisionAggregationFunction(List<ExpressionContext> arguments, 
boolean nullHandlingEnabled) {
+    super(arguments.get(0), nullHandlingEnabled);
+
+    int numArguments = arguments.size();
+    Preconditions.checkArgument(numArguments <= 4, "AvgPrecision expects at 
most 4 arguments, got: %s", numArguments);
+
+    if (numArguments > 1) {
+      _precision = arguments.get(1).getLiteral().getIntValue();
+      if (numArguments > 2) {
+        _scale = arguments.get(2).getLiteral().getIntValue();
+        if (numArguments > 3) {
+          String roundingModeStr = 
arguments.get(3).getLiteral().getStringValue();
+          _roundingMode = RoundingMode.valueOf(roundingModeStr.toUpperCase());
+        } else {
+          _roundingMode = RoundingMode.HALF_EVEN;
+        }
+      } else {
+        _scale = null;
+        _roundingMode = RoundingMode.HALF_EVEN;
+      }
+    } else {
+      _precision = null;
+      _scale = null;
+      _roundingMode = RoundingMode.HALF_EVEN;
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGPRECISION;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    AvgPrecisionPair avgPair;
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(BigDecimal.valueOf(intValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(BigDecimal.valueOf(longValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(new BigDecimal(stringValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(bigDecimalValues[i]);
+          }
+          return innerPair;
+        });
+
+        break;
+      case BYTES:
+        // Serialized AvgPrecisionPair
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        avgPair = new AvgPrecisionPair();
+        for (int i = 0; i < length; i++) {
+          AvgPrecisionPair value = 
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.deserialize(bytesValues[i]);
+          avgPair.apply(value);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unsupported value type: " + 
blockValSet.getValueType());
+    }
+    updateAggregationResult(aggregationResultHolder, avgPair);
+  }
+
+  protected void updateAggregationResult(AggregationResultHolder 
aggregationResultHolder, AvgPrecisionPair avgPair) {
+    if (_nullHandlingEnabled) {
+      if (avgPair != null && avgPair.getCount() > 0) {
+        AvgPrecisionPair otherPair = aggregationResultHolder.getResult();
+        if (otherPair == null) {
+          aggregationResultHolder.setValue(avgPair);
+        } else {
+          otherPair.apply(avgPair);
+        }
+      }
+    } else {
+      if (avgPair == null) {
+        avgPair = new AvgPrecisionPair();
+      }
+      AvgPrecisionPair otherPair = aggregationResultHolder.getResult();
+      if (otherPair == null) {
+        aggregationResultHolder.setValue(avgPair);
+      } else {
+        otherPair.apply(avgPair);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
BigDecimal.valueOf(intValues[i]));
+          }
+        });
+
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
BigDecimal.valueOf(longValues[i]));
+          }
+        });
+
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, new 
BigDecimal(stringValues[i]));
+          }
+        });
+
+        break;

Review Comment:
   Same comment as above (and also for group by MV).



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgPrecisionAggregationFunction.java:
##########
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.AvgPrecisionPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * This function is used for BigDecimal average calculations.
+ * It supports the average aggregation using precision and scale.
+ * <p>The function can be used as AVGPRECISION(expression, precision, scale, 
roundingMode)
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>Expression: expression that contains the values to be averaged, can 
be serialized BigDecimal objects</li>
+ *   <li>Precision (optional): precision to be set to the final result</li>
+ *   <li>Scale (optional): scale to be set to the final result</li>
+ *   <li>RoundingMode (optional): rounding mode to be used (default: 
HALF_EVEN)</li>
+ * </ul>
+ */
+public class AvgPrecisionAggregationFunction
+    extends NullableSingleInputAggregationFunction<AvgPrecisionPair, 
BigDecimal> {
+  private final Integer _precision;
+  private final Integer _scale;
+  private final RoundingMode _roundingMode;
+
+  public AvgPrecisionAggregationFunction(List<ExpressionContext> arguments, 
boolean nullHandlingEnabled) {
+    super(arguments.get(0), nullHandlingEnabled);
+
+    int numArguments = arguments.size();
+    Preconditions.checkArgument(numArguments <= 4, "AvgPrecision expects at 
most 4 arguments, got: %s", numArguments);
+
+    if (numArguments > 1) {
+      _precision = arguments.get(1).getLiteral().getIntValue();
+      if (numArguments > 2) {
+        _scale = arguments.get(2).getLiteral().getIntValue();
+        if (numArguments > 3) {
+          String roundingModeStr = 
arguments.get(3).getLiteral().getStringValue();
+          _roundingMode = RoundingMode.valueOf(roundingModeStr.toUpperCase());
+        } else {
+          _roundingMode = RoundingMode.HALF_EVEN;
+        }
+      } else {
+        _scale = null;
+        _roundingMode = RoundingMode.HALF_EVEN;
+      }
+    } else {
+      _precision = null;
+      _scale = null;
+      _roundingMode = RoundingMode.HALF_EVEN;
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGPRECISION;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    AvgPrecisionPair avgPair;
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(BigDecimal.valueOf(intValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(BigDecimal.valueOf(longValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(new BigDecimal(stringValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(bigDecimalValues[i]);
+          }
+          return innerPair;
+        });
+
+        break;
+      case BYTES:
+        // Serialized AvgPrecisionPair
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        avgPair = new AvgPrecisionPair();
+        for (int i = 0; i < length; i++) {
+          AvgPrecisionPair value = 
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.deserialize(bytesValues[i]);
+          avgPair.apply(value);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unsupported value type: " + 
blockValSet.getValueType());
+    }
+    updateAggregationResult(aggregationResultHolder, avgPair);
+  }
+
+  protected void updateAggregationResult(AggregationResultHolder 
aggregationResultHolder, AvgPrecisionPair avgPair) {
+    if (_nullHandlingEnabled) {
+      if (avgPair != null && avgPair.getCount() > 0) {
+        AvgPrecisionPair otherPair = aggregationResultHolder.getResult();
+        if (otherPair == null) {
+          aggregationResultHolder.setValue(avgPair);
+        } else {
+          otherPair.apply(avgPair);
+        }
+      }
+    } else {
+      if (avgPair == null) {
+        avgPair = new AvgPrecisionPair();
+      }
+      AvgPrecisionPair otherPair = aggregationResultHolder.getResult();
+      if (otherPair == null) {
+        aggregationResultHolder.setValue(avgPair);
+      } else {
+        otherPair.apply(avgPair);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
BigDecimal.valueOf(intValues[i]));
+          }
+        });
+
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
BigDecimal.valueOf(longValues[i]));
+          }
+        });
+
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, new 
BigDecimal(stringValues[i]));
+          }
+        });
+
+        break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
bigDecimalValues[i]);
+          }
+        });
+
+        break;
+      case BYTES:
+        // Serialized AvgPrecisionPair
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        for (int i = 0; i < length; i++) {
+          AvgPrecisionPair avgPair = 
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.deserialize(bytesValues[i]);
+          updateGroupByResult(groupKeyArray[i], groupByResultHolder, avgPair);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unsupported value type: " + 
blockValSet.getValueType());
+    }
+  }
+
+  private void updateGroupByResult(int groupKey, GroupByResultHolder 
groupByResultHolder, BigDecimal value) {
+    AvgPrecisionPair avgPair = groupByResultHolder.getResult(groupKey);
+    if (avgPair == null) {
+      avgPair = new AvgPrecisionPair();
+      groupByResultHolder.setValueForKey(groupKey, avgPair);
+    }
+    avgPair.apply(value);
+  }
+
+  private void updateGroupByResult(int groupKey, GroupByResultHolder 
groupByResultHolder, AvgPrecisionPair value) {
+    AvgPrecisionPair avgPair = groupByResultHolder.getResult(groupKey);
+    if (avgPair == null) {
+      groupByResultHolder.setValueForKey(groupKey, value);
+    } else {
+      avgPair.apply(value);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            for (int groupKey : groupKeysArray[i]) {
+              updateGroupByResult(groupKey, groupByResultHolder, 
BigDecimal.valueOf(intValues[i]));
+            }
+          }
+        });
+
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            for (int groupKey : groupKeysArray[i]) {
+              updateGroupByResult(groupKey, groupByResultHolder, 
BigDecimal.valueOf(longValues[i]));
+            }
+          }
+        });
+
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            for (int groupKey : groupKeysArray[i]) {
+              updateGroupByResult(groupKey, groupByResultHolder, new 
BigDecimal(stringValues[i]));
+            }
+          }
+        });
+
+        break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            for (int groupKey : groupKeysArray[i]) {
+              updateGroupByResult(groupKey, groupByResultHolder, 
bigDecimalValues[i]);
+            }
+          }
+        });
+
+        break;
+      case BYTES:
+        // Serialized AvgPrecisionPair
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        for (int i = 0; i < length; i++) {
+          AvgPrecisionPair avgPair = 
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.deserialize(bytesValues[i]);
+          for (int groupKey : groupKeysArray[i]) {
+            updateGroupByResult(groupKey, groupByResultHolder, avgPair);
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unsupported value type: " + 
blockValSet.getValueType());
+    }
+  }
+
+  @Override
+  public AvgPrecisionPair extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    AvgPrecisionPair result = aggregationResultHolder.getResult();
+    if (result == null) {
+      return _nullHandlingEnabled ? null : new AvgPrecisionPair();
+    }
+    return result;
+  }
+
+  @Override
+  public AvgPrecisionPair extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    AvgPrecisionPair result = groupByResultHolder.getResult(groupKey);
+    if (result == null) {
+      return _nullHandlingEnabled ? null : new AvgPrecisionPair();
+    }
+    return result;
+  }
+
+  @Override
+  public AvgPrecisionPair merge(AvgPrecisionPair intermediateResult1, 
AvgPrecisionPair intermediateResult2) {
+    if (_nullHandlingEnabled) {
+      if (intermediateResult1 == null) {
+        return intermediateResult2;
+      }
+      if (intermediateResult2 == null) {
+        return intermediateResult1;
+      }
+    }
+    intermediateResult1.apply(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public SerializedIntermediateResult 
serializeIntermediateResult(AvgPrecisionPair avgPrecisionPair) {
+    return new 
SerializedIntermediateResult(ObjectSerDeUtils.ObjectType.AvgPrecisionPair.getValue(),
+        
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.serialize(avgPrecisionPair));
+  }
+
+  @Override
+  public AvgPrecisionPair deserializeIntermediateResult(CustomObject 
customObject) {
+    return 
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.deserialize(customObject.getBuffer());
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+
+  @Override
+  public BigDecimal extractFinalResult(AvgPrecisionPair intermediateResult) {
+    if (intermediateResult == null || intermediateResult.getCount() == 0) {
+      return null;
+    }
+
+    BigDecimal sum = intermediateResult.getSum();
+    long count = intermediateResult.getCount();
+
+    MathContext mathContext = _precision != null
+        ? new MathContext(_precision, _roundingMode)
+        : MathContext.DECIMAL128;
+
+    BigDecimal average = sum.divide(BigDecimal.valueOf(count), mathContext);
+    return _scale != null
+        ? average.setScale(_scale, _roundingMode)
+        : average;

Review Comment:
   Please take a look at this.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgPrecisionAggregationFunction.java:
##########
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.AvgPrecisionPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * This function is used for BigDecimal average calculations.
+ * It supports the average aggregation using precision and scale.
+ * <p>The function can be used as AVGPRECISION(expression, precision, scale, 
roundingMode)
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>Expression: expression that contains the values to be averaged, can 
be serialized BigDecimal objects</li>
+ *   <li>Precision (optional): precision to be set to the final result</li>
+ *   <li>Scale (optional): scale to be set to the final result</li>
+ *   <li>RoundingMode (optional): rounding mode to be used (default: 
HALF_EVEN)</li>
+ * </ul>
+ */
+public class AvgPrecisionAggregationFunction
+    extends NullableSingleInputAggregationFunction<AvgPrecisionPair, 
BigDecimal> {
+  private final Integer _precision;
+  private final Integer _scale;
+  private final RoundingMode _roundingMode;
+
+  public AvgPrecisionAggregationFunction(List<ExpressionContext> arguments, 
boolean nullHandlingEnabled) {
+    super(arguments.get(0), nullHandlingEnabled);
+
+    int numArguments = arguments.size();
+    Preconditions.checkArgument(numArguments <= 4, "AvgPrecision expects at 
most 4 arguments, got: %s", numArguments);
+
+    if (numArguments > 1) {
+      _precision = arguments.get(1).getLiteral().getIntValue();
+      if (numArguments > 2) {
+        _scale = arguments.get(2).getLiteral().getIntValue();
+        if (numArguments > 3) {
+          String roundingModeStr = 
arguments.get(3).getLiteral().getStringValue();
+          _roundingMode = RoundingMode.valueOf(roundingModeStr.toUpperCase());
+        } else {
+          _roundingMode = RoundingMode.HALF_EVEN;
+        }
+      } else {
+        _scale = null;
+        _roundingMode = RoundingMode.HALF_EVEN;
+      }
+    } else {
+      _precision = null;
+      _scale = null;
+      _roundingMode = RoundingMode.HALF_EVEN;
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGPRECISION;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    AvgPrecisionPair avgPair;
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(BigDecimal.valueOf(intValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(BigDecimal.valueOf(longValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case STRING:

Review Comment:
   Do we really need to support all these other types? Seems like it would 
mostly be useful only for `BigDecimal` columns? Also, in either case, we can 
simply rely on `blockValSet.getBigDecimalValuesSV()` to handle the conversions.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgPrecisionAggregationFunction.java:
##########
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.AvgPrecisionPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * This function is used for BigDecimal average calculations.
+ * It supports the average aggregation using precision and scale.
+ * <p>The function can be used as AVGPRECISION(expression, precision, scale, 
roundingMode)
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>Expression: expression that contains the values to be averaged, can 
be serialized BigDecimal objects</li>
+ *   <li>Precision (optional): precision to be set to the final result</li>
+ *   <li>Scale (optional): scale to be set to the final result</li>
+ *   <li>RoundingMode (optional): rounding mode to be used (default: 
HALF_EVEN)</li>
+ * </ul>
+ */
+public class AvgPrecisionAggregationFunction
+    extends NullableSingleInputAggregationFunction<AvgPrecisionPair, 
BigDecimal> {
+  private final Integer _precision;
+  private final Integer _scale;
+  private final RoundingMode _roundingMode;
+
+  public AvgPrecisionAggregationFunction(List<ExpressionContext> arguments, 
boolean nullHandlingEnabled) {
+    super(arguments.get(0), nullHandlingEnabled);
+
+    int numArguments = arguments.size();
+    Preconditions.checkArgument(numArguments <= 4, "AvgPrecision expects at 
most 4 arguments, got: %s", numArguments);
+
+    if (numArguments > 1) {
+      _precision = arguments.get(1).getLiteral().getIntValue();
+      if (numArguments > 2) {
+        _scale = arguments.get(2).getLiteral().getIntValue();
+        if (numArguments > 3) {
+          String roundingModeStr = 
arguments.get(3).getLiteral().getStringValue();
+          _roundingMode = RoundingMode.valueOf(roundingModeStr.toUpperCase());
+        } else {
+          _roundingMode = RoundingMode.HALF_EVEN;
+        }
+      } else {
+        _scale = null;
+        _roundingMode = RoundingMode.HALF_EVEN;
+      }
+    } else {
+      _precision = null;
+      _scale = null;
+      _roundingMode = RoundingMode.HALF_EVEN;
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGPRECISION;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    AvgPrecisionPair avgPair;
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(BigDecimal.valueOf(intValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(BigDecimal.valueOf(longValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(new BigDecimal(stringValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(bigDecimalValues[i]);
+          }
+          return innerPair;
+        });
+
+        break;
+      case BYTES:
+        // Serialized AvgPrecisionPair
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        avgPair = new AvgPrecisionPair();
+        for (int i = 0; i < length; i++) {
+          AvgPrecisionPair value = 
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.deserialize(bytesValues[i]);
+          avgPair.apply(value);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unsupported value type: " + 
blockValSet.getValueType());
+    }
+    updateAggregationResult(aggregationResultHolder, avgPair);
+  }
+
+  protected void updateAggregationResult(AggregationResultHolder 
aggregationResultHolder, AvgPrecisionPair avgPair) {
+    if (_nullHandlingEnabled) {
+      if (avgPair != null && avgPair.getCount() > 0) {
+        AvgPrecisionPair otherPair = aggregationResultHolder.getResult();
+        if (otherPair == null) {
+          aggregationResultHolder.setValue(avgPair);
+        } else {
+          otherPair.apply(avgPair);
+        }
+      }
+    } else {
+      if (avgPair == null) {
+        avgPair = new AvgPrecisionPair();
+      }
+      AvgPrecisionPair otherPair = aggregationResultHolder.getResult();
+      if (otherPair == null) {
+        aggregationResultHolder.setValue(avgPair);
+      } else {
+        otherPair.apply(avgPair);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
BigDecimal.valueOf(intValues[i]));
+          }
+        });
+
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
BigDecimal.valueOf(longValues[i]));
+          }
+        });
+
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, new 
BigDecimal(stringValues[i]));
+          }
+        });
+
+        break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            updateGroupByResult(groupKeyArray[i], groupByResultHolder, 
bigDecimalValues[i]);
+          }
+        });
+
+        break;
+      case BYTES:
+        // Serialized AvgPrecisionPair
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        for (int i = 0; i < length; i++) {
+          AvgPrecisionPair avgPair = 
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.deserialize(bytesValues[i]);
+          updateGroupByResult(groupKeyArray[i], groupByResultHolder, avgPair);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unsupported value type: " + 
blockValSet.getValueType());
+    }
+  }
+
+  private void updateGroupByResult(int groupKey, GroupByResultHolder 
groupByResultHolder, BigDecimal value) {
+    AvgPrecisionPair avgPair = groupByResultHolder.getResult(groupKey);
+    if (avgPair == null) {
+      avgPair = new AvgPrecisionPair();
+      groupByResultHolder.setValueForKey(groupKey, avgPair);
+    }
+    avgPair.apply(value);
+  }
+
+  private void updateGroupByResult(int groupKey, GroupByResultHolder 
groupByResultHolder, AvgPrecisionPair value) {
+    AvgPrecisionPair avgPair = groupByResultHolder.getResult(groupKey);
+    if (avgPair == null) {
+      groupByResultHolder.setValueForKey(groupKey, value);
+    } else {
+      avgPair.apply(value);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            for (int groupKey : groupKeysArray[i]) {
+              updateGroupByResult(groupKey, groupByResultHolder, 
BigDecimal.valueOf(intValues[i]));
+            }
+          }
+        });
+
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            for (int groupKey : groupKeysArray[i]) {
+              updateGroupByResult(groupKey, groupByResultHolder, 
BigDecimal.valueOf(longValues[i]));
+            }
+          }
+        });
+
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            for (int groupKey : groupKeysArray[i]) {
+              updateGroupByResult(groupKey, groupByResultHolder, new 
BigDecimal(stringValues[i]));
+            }
+          }
+        });
+
+        break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+
+        forEachNotNull(length, blockValSet, (from, to) -> {
+          for (int i = from; i < to; i++) {
+            for (int groupKey : groupKeysArray[i]) {
+              updateGroupByResult(groupKey, groupByResultHolder, 
bigDecimalValues[i]);
+            }
+          }
+        });
+
+        break;
+      case BYTES:
+        // Serialized AvgPrecisionPair
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        for (int i = 0; i < length; i++) {
+          AvgPrecisionPair avgPair = 
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.deserialize(bytesValues[i]);
+          for (int groupKey : groupKeysArray[i]) {
+            updateGroupByResult(groupKey, groupByResultHolder, avgPair);
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unsupported value type: " + 
blockValSet.getValueType());
+    }
+  }
+
+  @Override
+  public AvgPrecisionPair extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    AvgPrecisionPair result = aggregationResultHolder.getResult();
+    if (result == null) {
+      return _nullHandlingEnabled ? null : new AvgPrecisionPair();
+    }
+    return result;
+  }
+
+  @Override
+  public AvgPrecisionPair extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    AvgPrecisionPair result = groupByResultHolder.getResult(groupKey);
+    if (result == null) {
+      return _nullHandlingEnabled ? null : new AvgPrecisionPair();
+    }
+    return result;
+  }
+
+  @Override
+  public AvgPrecisionPair merge(AvgPrecisionPair intermediateResult1, 
AvgPrecisionPair intermediateResult2) {
+    if (_nullHandlingEnabled) {
+      if (intermediateResult1 == null) {
+        return intermediateResult2;
+      }
+      if (intermediateResult2 == null) {
+        return intermediateResult1;
+      }
+    }
+    intermediateResult1.apply(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public SerializedIntermediateResult 
serializeIntermediateResult(AvgPrecisionPair avgPrecisionPair) {
+    return new 
SerializedIntermediateResult(ObjectSerDeUtils.ObjectType.AvgPrecisionPair.getValue(),
+        
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.serialize(avgPrecisionPair));
+  }
+
+  @Override
+  public AvgPrecisionPair deserializeIntermediateResult(CustomObject 
customObject) {
+    return 
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.deserialize(customObject.getBuffer());
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+
+  @Override
+  public BigDecimal extractFinalResult(AvgPrecisionPair intermediateResult) {
+    if (intermediateResult == null || intermediateResult.getCount() == 0) {
+      return null;
+    }

Review Comment:
   We shouldn't be returning `null` when null handling is disabled. There 
should be some default null value (see other aggregation functions)



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgPrecisionAggregationFunction.java:
##########
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.AvgPrecisionPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * This function is used for BigDecimal average calculations.
+ * It supports the average aggregation using precision and scale.
+ * <p>The function can be used as AVGPRECISION(expression, precision, scale, 
roundingMode)
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>Expression: expression that contains the values to be averaged, can 
be serialized BigDecimal objects</li>
+ *   <li>Precision (optional): precision to be set to the final result</li>
+ *   <li>Scale (optional): scale to be set to the final result</li>
+ *   <li>RoundingMode (optional): rounding mode to be used (default: 
HALF_EVEN)</li>
+ * </ul>
+ */
+public class AvgPrecisionAggregationFunction
+    extends NullableSingleInputAggregationFunction<AvgPrecisionPair, 
BigDecimal> {
+  private final Integer _precision;
+  private final Integer _scale;
+  private final RoundingMode _roundingMode;
+
+  public AvgPrecisionAggregationFunction(List<ExpressionContext> arguments, 
boolean nullHandlingEnabled) {
+    super(arguments.get(0), nullHandlingEnabled);
+
+    int numArguments = arguments.size();
+    Preconditions.checkArgument(numArguments <= 4, "AvgPrecision expects at 
most 4 arguments, got: %s", numArguments);
+
+    if (numArguments > 1) {
+      _precision = arguments.get(1).getLiteral().getIntValue();
+      if (numArguments > 2) {
+        _scale = arguments.get(2).getLiteral().getIntValue();
+        if (numArguments > 3) {
+          String roundingModeStr = 
arguments.get(3).getLiteral().getStringValue();
+          _roundingMode = RoundingMode.valueOf(roundingModeStr.toUpperCase());
+        } else {
+          _roundingMode = RoundingMode.HALF_EVEN;
+        }
+      } else {
+        _scale = null;
+        _roundingMode = RoundingMode.HALF_EVEN;
+      }
+    } else {
+      _precision = null;
+      _scale = null;
+      _roundingMode = RoundingMode.HALF_EVEN;
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGPRECISION;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    AvgPrecisionPair avgPair;
+    switch (blockValSet.getValueType().getStoredType()) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(BigDecimal.valueOf(intValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(BigDecimal.valueOf(longValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(new BigDecimal(stringValues[i]));
+          }
+          return innerPair;
+        });
+
+        break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+
+        avgPair = foldNotNull(length, blockValSet, null, (acum, from, to) -> {
+          AvgPrecisionPair innerPair = acum == null ? new AvgPrecisionPair() : 
acum;
+          for (int i = from; i < to; i++) {
+            innerPair.apply(bigDecimalValues[i]);
+          }
+          return innerPair;
+        });
+
+        break;
+      case BYTES:
+        // Serialized AvgPrecisionPair
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        avgPair = new AvgPrecisionPair();
+        for (int i = 0; i < length; i++) {
+          AvgPrecisionPair value = 
ObjectSerDeUtils.AVG_PRECISION_PAIR_SER_DE.deserialize(bytesValues[i]);
+          avgPair.apply(value);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unsupported value type: " + 
blockValSet.getValueType());
+    }
+    updateAggregationResult(aggregationResultHolder, avgPair);
+  }
+
+  protected void updateAggregationResult(AggregationResultHolder 
aggregationResultHolder, AvgPrecisionPair avgPair) {
+    if (_nullHandlingEnabled) {
+      if (avgPair != null && avgPair.getCount() > 0) {
+        AvgPrecisionPair otherPair = aggregationResultHolder.getResult();
+        if (otherPair == null) {
+          aggregationResultHolder.setValue(avgPair);
+        } else {
+          otherPair.apply(avgPair);
+        }
+      }
+    } else {
+      if (avgPair == null) {
+        avgPair = new AvgPrecisionPair();
+      }
+      AvgPrecisionPair otherPair = aggregationResultHolder.getResult();
+      if (otherPair == null) {
+        aggregationResultHolder.setValue(avgPair);
+      } else {
+        otherPair.apply(avgPair);
+      }
+    }

Review Comment:
   Could you please explain the null handling logic here? You can check the 
regular average function implementation for pointers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to