This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c61d3dc10a9 [Query Engine] ANY VALUE Support (#16678)
c61d3dc10a9 is described below

commit c61d3dc10a9a54874401170af51abc54129acbc5
Author: Praveen <[email protected]>
AuthorDate: Thu Dec 25 06:24:35 2025 -0800

    [Query Engine] ANY VALUE Support (#16678)
    
    * Any value
    
    * Tests
    
    * ser/der
    
    * review comments
    
    * Enums & MSE test
---
 .../blocks/results/AggregationResultsBlock.java    |  11 +-
 .../function/AggregationFunctionFactory.java       |   2 +
 .../function/AggregationFunctionUtils.java         |   6 +
 .../function/AnyValueAggregationFunction.java      | 370 +++++++++++++++++++++
 .../function/AnyValueAggregationFunctionTest.java  | 315 ++++++++++++++++++
 .../tests/OfflineClusterIntegrationTest.java       |  75 +++++
 .../pinot/segment/spi/AggregationFunctionType.java |   1 +
 7 files changed, 779 insertions(+), 1 deletion(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 91b4f14e3c2..668e396ae27 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -185,7 +185,7 @@ public class AggregationResultsBlock extends 
BaseResultsBlock {
   }
 
   private void setIntermediateResult(DataTableBuilder dataTableBuilder, 
ColumnDataType[] columnDataTypes, int index,
-      Object result) {
+      Object result) throws IOException {
     ColumnDataType columnDataType = columnDataTypes[index];
     switch (columnDataType) {
       case INT:
@@ -200,6 +200,15 @@ public class AggregationResultsBlock extends 
BaseResultsBlock {
       case STRING:
         dataTableBuilder.setColumn(index, result.toString());
         break;
+      case FLOAT:
+        dataTableBuilder.setColumn(index, (float) result);
+        break;
+      case BIG_DECIMAL:
+        dataTableBuilder.setColumn(index, (BigDecimal) result);
+        break;
+      case BYTES:
+        dataTableBuilder.setColumn(index, (ByteArray) result);
+        break;
       default:
         throw new IllegalStateException("Illegal column data type in 
intermediate result: " + columnDataType);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 0fed9fe5e8b..16fa17ab840 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -237,6 +237,8 @@ public class AggregationFunctionFactory {
             return new AvgAggregationFunction(arguments, nullHandlingEnabled);
           case MODE:
             return new ModeAggregationFunction(arguments, nullHandlingEnabled);
+          case ANYVALUE:
+            return new AnyValueAggregationFunction(arguments, 
nullHandlingEnabled);
           case FIRSTWITHTIME: {
             Preconditions.checkArgument(numArguments == 3,
                 "FIRST_WITH_TIME expects 3 arguments, got: %s. The function 
can be used as "
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index c9edf0bf00c..79913301a3d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -154,6 +154,12 @@ public class AggregationFunctionUtils {
         return dataTable.getDouble(rowId, colId);
       case STRING:
         return dataTable.getString(rowId, colId);
+      case FLOAT:
+        return dataTable.getFloat(rowId, colId);
+      case BIG_DECIMAL:
+        return dataTable.getBigDecimal(rowId, colId);
+      case BYTES:
+        return dataTable.getBytes(rowId, colId);
       case OBJECT:
         CustomObject customObject = dataTable.getCustomObject(rowId, colId);
         return customObject != null ? 
aggregationFunction.deserializeIntermediateResult(customObject) : null;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunction.java
new file mode 100644
index 00000000000..dac8774ee46
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunction.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * AnyValue aggregation function returns any arbitrary NON-NULL value from the 
column for each group.
+ * <p>
+ * This is useful for GROUP BY queries where you want to include a column in 
SELECT that has a 1:1 mapping with the
+ * GROUP BY columns, avoiding the need to add it to GROUP BY clause. The 
implementation is null-aware and will scan
+ * only until it finds the first non-null value in the current batch for each 
group/key. This makes it O(n) over the
+ * input once per group until the first value is set, with early-exit fast 
paths when there are no nulls.
+ * </p>
+ * <p><strong>Example:</strong></p>
+ * <pre>{@code
+ * SELECT CustomerID,
+ *        ANY_VALUE(CustomerName),
+ *        SUM(OrderValue)
+ * FROM Orders
+ * GROUP BY CustomerID
+ * }</pre>
+ */
+public class AnyValueAggregationFunction extends 
NullableSingleInputAggregationFunction<Object, Comparable<?>> {
+  private static final FieldSpec.DataType[] DATA_TYPE_VALUES = 
FieldSpec.DataType.values();
+  // Result type is determined at runtime based on input expression type
+  private ColumnDataType _resultType;
+
+  public AnyValueAggregationFunction(List<ExpressionContext> arguments, 
boolean nullHandlingEnabled) {
+    super(verifySingleArgument(arguments, "ANY_VALUE"), nullHandlingEnabled);
+    _resultType = null;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.ANYVALUE;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    // Default to STRING if result type is not yet determined
+    // TODO: See if UNKNOWN can be used instead
+    return _resultType != null ? _resultType : ColumnDataType.STRING;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return _resultType != null ? _resultType : ColumnDataType.STRING;
+  }
+
+  @Override
+  public Object extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return aggregationResultHolder.getResult();
+  }
+
+  @Override
+  public Object extractGroupByResult(GroupByResultHolder groupByResultHolder, 
int groupKey) {
+    return groupByResultHolder.getResult(groupKey);
+  }
+
+  @Override
+  public Comparable<?> extractFinalResult(Object intermediateResult) {
+    return (Comparable<?>) intermediateResult;
+  }
+
+  @Override
+  public Object merge(Object left, Object right) {
+    // For ANY_VALUE, we just need any non-null value, so merge by returning 
the first non-null value
+    return left != null ? left : right;
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder holder,
+                        Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    if (holder.getResult() != null) {
+      return;
+    }
+    BlockValSet bvs = blockValSetMap.get(_expression);
+    ensureResultType(bvs);
+    aggregateHelper(length, bvs, (i, value) -> {
+      holder.setValue(value);
+      return true; // Stop after first value found
+    });
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeys, 
GroupByResultHolder holder,
+                                 Map<ExpressionContext, BlockValSet> map) {
+    BlockValSet bvs = map.get(_expression);
+    ensureResultType(bvs);
+    aggregateHelper(length, bvs, (i, value) -> {
+      int g = groupKeys[i];
+      if (holder.getResult(g) == null) {
+        holder.setValueForKey(g, value);
+      }
+      return false; // Continue processing for other groups
+    });
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder holder,
+                                 Map<ExpressionContext, BlockValSet> map) {
+    BlockValSet bvs = map.get(_expression);
+    ensureResultType(bvs);
+    aggregateHelper(length, bvs, (i, value) -> {
+      int[] keys = groupKeysArray[i];
+      for (int g : keys) {
+        if (holder.getResult(g) == null) {
+          holder.setValueForKey(g, value);
+        }
+      }
+      return false; // Continue processing for other groups
+    });
+  }
+
+  @Override
+  public SerializedIntermediateResult serializeIntermediateResult(Object 
value) {
+    if (value == null) {
+      return new SerializedIntermediateResult(0, new byte[0]);
+    }
+    byte[] bytes = serializeValue(value);
+    return new SerializedIntermediateResult(1, bytes);
+  }
+
+  @Override
+  public Object deserializeIntermediateResult(CustomObject customObject) {
+    if (customObject.getBuffer().remaining() == 0) {
+      return null;
+    }
+    return deserializeValue(customObject.getBuffer());
+  }
+
+  @FunctionalInterface
+  private interface ValueProcessor<T> {
+    boolean process(int index, T value); // Returns true to stop processing, 
false to continue
+  }
+
+  /**
+   * Generic helper for processing values with dictionary optimization for all 
supported data types
+   */
+  private void aggregateHelper(int length, BlockValSet bvs, 
ValueProcessor<Object> processor) {
+    // Use dictionary-based access for efficiency when available
+    if (bvs.getDictionary() != null) {
+      final int[] dictIds = bvs.getDictionaryIdsSV();
+      final Dictionary dict = bvs.getDictionary();
+      forEachNotNull(length, bvs, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          Object value = getDictionaryValue(dict, dictIds[i], 
bvs.getValueType().getStoredType());
+          if (processor.process(i, value)) {
+            break;
+          }
+        }
+      });
+    } else {
+      // Fall back to direct value access based on type
+      forEachNotNull(length, bvs, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          Object value = getDirectValue(bvs, i);
+          if (value != null && processor.process(i, value)) {
+            break;
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Get value from dictionary based on data type
+   */
+  private Object getDictionaryValue(Dictionary dict, int dictId, 
FieldSpec.DataType storedType) {
+    switch (storedType) {
+      case INT:
+        return dict.getIntValue(dictId);
+      case LONG:
+        return dict.getLongValue(dictId);
+      case FLOAT:
+        return dict.getFloatValue(dictId);
+      case DOUBLE:
+        return dict.getDoubleValue(dictId);
+      case STRING:
+        return dict.getStringValue(dictId);
+      case BIG_DECIMAL:
+        return dict.getBigDecimalValue(dictId);
+      case BYTES:
+        return dict.getBytesValue(dictId);
+      default:
+        throw new IllegalStateException("Unsupported dictionary type: " + 
storedType);
+    }
+  }
+
+  /**
+   * Get value directly from BlockValSet based on data type
+   */
+  private Object getDirectValue(BlockValSet bvs, int index) {
+    switch (bvs.getValueType().getStoredType()) {
+      case INT:
+        return bvs.getIntValuesSV()[index];
+      case LONG:
+        return bvs.getLongValuesSV()[index];
+      case FLOAT:
+        return bvs.getFloatValuesSV()[index];
+      case DOUBLE:
+        return bvs.getDoubleValuesSV()[index];
+      case STRING:
+        return bvs.getStringValuesSV()[index];
+      case BIG_DECIMAL:
+        return bvs.getBigDecimalValuesSV()[index];
+      case BYTES:
+        return bvs.getBytesValuesSV()[index];
+      default:
+        throw new IllegalStateException("Unsupported direct access type: " + 
bvs.getValueType().getStoredType());
+    }
+  }
+
+  /**
+   * Custom serialization for ANY_VALUE that handles all supported data types 
efficiently
+   * Note: value is never null - null is handled at the 
serializeIntermediateResult layer
+   */
+  private byte[] serializeValue(Object value) {
+    if (value instanceof Integer) {
+      return serializeFixedValue(FieldSpec.DataType.INT, 4, buffer -> 
buffer.putInt((Integer) value));
+    } else if (value instanceof Long) {
+      return serializeFixedValue(FieldSpec.DataType.LONG, 8, buffer -> 
buffer.putLong((Long) value));
+    } else if (value instanceof Float) {
+      return serializeFixedValue(FieldSpec.DataType.FLOAT, 4, buffer -> 
buffer.putFloat((Float) value));
+    } else if (value instanceof Double) {
+      return serializeFixedValue(FieldSpec.DataType.DOUBLE, 8, buffer -> 
buffer.putDouble((Double) value));
+    } else if (value instanceof String) {
+      return serializeVariableValue(FieldSpec.DataType.STRING, ((String) 
value).getBytes(StandardCharsets.UTF_8));
+    } else if (value instanceof BigDecimal) {
+      return serializeVariableValue(FieldSpec.DataType.BIG_DECIMAL, 
value.toString().getBytes(StandardCharsets.UTF_8));
+    } else if (value instanceof byte[]) {
+      return serializeVariableValue(FieldSpec.DataType.BYTES, (byte[]) value);
+    } else {
+      throw new IllegalStateException("Unsupported value type for 
serialization: " + value.getClass().getName());
+    }
+  }
+
+  /**
+   * Helper method for serializing fixed-length values
+   */
+  private byte[] serializeFixedValue(FieldSpec.DataType dataType, int 
valueSize, Consumer<ByteBuffer> valueWriter) {
+    ByteBuffer buffer = ByteBuffer.allocate(4 + valueSize); // 4 bytes for 
type ordinal + value bytes
+    buffer.putInt(dataType.ordinal());
+    valueWriter.accept(buffer);
+    return buffer.array();
+  }
+
+  /**
+   * Helper method for serializing variable-length values
+   */
+  private byte[] serializeVariableValue(FieldSpec.DataType dataType, byte[] 
data) {
+    ByteBuffer buffer = ByteBuffer.allocate(8 + data.length); // 4 bytes type 
ordinal + 4 bytes length + data
+    buffer.putInt(dataType.ordinal());
+    buffer.putInt(data.length);
+    buffer.put(data);
+    return buffer.array();
+  }
+
+  /**
+   * Custom deserialization for ANY_VALUE that handles all supported data 
types efficiently
+   * Note: empty buffer (null) is handled at the deserializeIntermediateResult 
layer
+   */
+  private Object deserializeValue(ByteBuffer buffer) {
+    int typeOrdinal = buffer.getInt();
+    FieldSpec.DataType dataType = DATA_TYPE_VALUES[typeOrdinal];
+    switch (dataType) {
+      case INT:
+        return buffer.getInt();
+      case LONG:
+        return buffer.getLong();
+      case FLOAT:
+        return buffer.getFloat();
+      case DOUBLE:
+        return buffer.getDouble();
+      case STRING:
+        return new String(deserializeVariableBytes(buffer), 
StandardCharsets.UTF_8);
+      case BIG_DECIMAL:
+        return new BigDecimal(new String(deserializeVariableBytes(buffer), 
StandardCharsets.UTF_8));
+      case BYTES:
+        return deserializeVariableBytes(buffer);
+      default:
+        throw new IllegalStateException("Unsupported data type for 
deserialization: " + dataType);
+    }
+  }
+
+  /**
+   * Helper method for deserializing variable-length byte arrays
+   */
+  private byte[] deserializeVariableBytes(ByteBuffer buffer) {
+    int length = buffer.getInt();
+    byte[] bytes = new byte[length];
+    buffer.get(bytes);
+    return bytes;
+  }
+
+  private void ensureResultType(BlockValSet bvs) {
+    if (_resultType != null) {
+      return;
+    }
+    switch (bvs.getValueType().getStoredType()) {
+      case INT:
+        _resultType = ColumnDataType.INT;
+        return;
+      case LONG:
+        _resultType = ColumnDataType.LONG;
+        return;
+      case FLOAT:
+        _resultType = ColumnDataType.FLOAT;
+        return;
+      case DOUBLE:
+        _resultType = ColumnDataType.DOUBLE;
+        return;
+      case STRING:
+        _resultType = ColumnDataType.STRING;
+        return;
+      case BIG_DECIMAL:
+        _resultType = ColumnDataType.BIG_DECIMAL;
+        return;
+      case BYTES:
+        _resultType = ColumnDataType.BYTES;
+        return;
+      default:
+        throw new IllegalStateException("ANY_VALUE unsupported type: " + 
bvs.getValueType());
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunctionTest.java
new file mode 100644
index 00000000000..af3d9ab660f
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunctionTest.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class AnyValueAggregationFunctionTest extends 
AbstractAggregationFunctionTest {
+
+  // Constants for standardized test queries and expected results
+  private static final String STANDARD_GROUP_BY_QUERY_TEMPLATE =
+      "select 'testResult', anyValue(myField) from testTable group by 
'testResult'";
+  private static final String EXPECTED_COLUMN_TYPES = "STRING | STRING";
+  private static final String EXPECTED_NULL_RESULT = "testResult | null";
+
+  @DataProvider(name = "scenarios")
+  Object[] scenarios() {
+    return new Object[] {
+        new DataTypeScenario(FieldSpec.DataType.STRING),
+        new DataTypeScenario(FieldSpec.DataType.INT),
+        new DataTypeScenario(FieldSpec.DataType.LONG),
+        new DataTypeScenario(FieldSpec.DataType.FLOAT),
+        new DataTypeScenario(FieldSpec.DataType.DOUBLE),
+        new DataTypeScenario(FieldSpec.DataType.BOOLEAN),
+    };
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationWithNullHandlingDisabled(DataTypeScenario scenario) {
+    // Use appropriate test data based on data type
+    String testValue = getTestValueForDataType(scenario.getDataType());
+    String expectedResult = 
getExpectedResultForDataType(scenario.getDataType(), testValue);
+
+    // Test that ANY_VALUE returns a non-null result when non-null values exist
+    FluentQueryTest.QueryExecuted result = scenario.getDeclaringTable(false)
+        .onFirstInstance("myField", testValue, "null", testValue)
+        .andOnSecondInstance("myField", "null", testValue, "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    // Validate that ANY_VALUE returned something non-null (exact value 
doesn't matter)
+    validateAnyValueBehavior(result, true); // true = should return non-null
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationWithNullHandlingEnabled(DataTypeScenario scenario) {
+    // Use appropriate test data based on data type
+    String testValue = getTestValueForDataType(scenario.getDataType());
+    String expectedResult = 
getExpectedResultForDataType(scenario.getDataType(), testValue);
+
+    // Test that ANY_VALUE returns a non-null result when non-null values exist
+    FluentQueryTest.QueryExecuted result = scenario.getDeclaringTable(true)
+        .onFirstInstance("myField", testValue, "null", testValue)
+        .andOnSecondInstance("myField", "null", testValue, "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    // Validate that ANY_VALUE returned something non-null (exact value 
doesn't matter)
+    validateAnyValueBehavior(result, true); // true = should return non-null
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVWithNullHandlingDisabled(DataTypeScenario scenario) 
{
+    // Use appropriate test data based on data type
+    String testValue = getTestValueForDataType(scenario.getDataType());
+    String expectedResult = 
getExpectedResultForDataType(scenario.getDataType(), testValue);
+
+    // Test that ANY_VALUE returns a non-null result when non-null values exist
+    // We don't check the exact value since ANY_VALUE can return any valid 
value
+    FluentQueryTest.QueryExecuted result = scenario.getDeclaringTable(false)
+        .onFirstInstance("myField", testValue, "null", testValue)
+        .andOnSecondInstance("myField", "null", testValue, "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    // Validate that ANY_VALUE returned something non-null (exact value 
doesn't matter)
+    validateAnyValueBehavior(result, true); // true = should return non-null
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggregationGroupBySVWithNullHandlingEnabled(DataTypeScenario scenario) {
+    // Use appropriate test data based on data type
+    String testValue = getTestValueForDataType(scenario.getDataType());
+    String expectedResult = 
getExpectedResultForDataType(scenario.getDataType(), testValue);
+
+    // Test that ANY_VALUE returns a non-null result when non-null values exist
+    // We don't check the exact value since ANY_VALUE can return any valid 
value
+    FluentQueryTest.QueryExecuted result = scenario.getDeclaringTable(true)
+        .onFirstInstance("myField", testValue, "null", testValue)
+        .andOnSecondInstance("myField", "null", testValue, "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    // Validate that ANY_VALUE returned something non-null (exact value 
doesn't matter)
+    validateAnyValueBehavior(result, true); // true = should return non-null
+  }
+
+  // Test for different data types with specific values
+  @Test
+  void testIntegerDataType() {
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.INT)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "100", "null", "100") // Same non-null 
values
+        .andOnSecondInstance("myField", "null", "100", "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, true); // Should return non-null
+  }
+
+  @Test
+  void testLongDataType() {
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.LONG)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "1000", "null", "1000") // Same non-null 
values
+        .andOnSecondInstance("myField", "null", "1000", "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, true); // Should return non-null
+  }
+
+  @Test
+  void testFloatDataType() {
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.FLOAT)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "3.14", "null", "3.14") // Same non-null 
values
+        .andOnSecondInstance("myField", "null", "3.14", "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, true); // Should return non-null
+  }
+
+  @Test
+  void testDoubleDataType() {
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.DOUBLE)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "2.718", "null", "2.718") // Same non-null 
values
+        .andOnSecondInstance("myField", "null", "2.718", "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, true); // Should return non-null
+  }
+
+  @Test
+  void testBooleanDataType() {
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.BOOLEAN)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "1", "null", "1") // Use 1/0 for boolean, 
same values
+        .andOnSecondInstance("myField", "null", "1", "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, true); // Should return non-null
+  }
+
+  // Edge case tests
+  @Test
+  void testAllNullValues() {
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.STRING)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "null", "null", "null")
+        .andOnSecondInstance("myField", "null", "null", "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, false); // false = should return null 
(all values are null)
+  }
+
+  @Test
+  void testSingleNonNullValue() {
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.STRING)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "null", "null", "null")
+        .andOnSecondInstance("myField", "null", "unique_value", "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, true); // Should return non-null 
(unique_value exists)
+  }
+
+  // GROUP BY tests with different data types
+  @Test
+  void testGroupByWithMultipleGroups() {
+    // ANY_VALUE can return any of the values (value1, value2, value3)
+    // This test has mixed values, so ANY_VALUE could return any of them
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.STRING)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "value1", "value1", "value2")
+        .andOnSecondInstance("myField", "value2", "value3", "value3")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, true); // Should return non-null 
(multiple values available)
+  }
+
+  @Test
+  void testGroupByWithNullsInGroups() {
+    // ANY_VALUE can return any non-null value (100, 200, or 300)
+    // This test has mixed values, so ANY_VALUE could return any of them
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.INT)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "100", "null", "200")
+        .andOnSecondInstance("myField", "null", "300", "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, true); // Should return non-null 
(multiple non-null values available)
+  }
+
+  // Performance validation test - ensures ANY_VALUE doesn't require all 
values to be processed
+  @Test
+  void testPerformanceWithLargeDataset() {
+    // ANY_VALUE can return any of the values in the dataset
+    // This test has mixed values, so ANY_VALUE could return any of them
+    DataTypeScenario scenario = new 
DataTypeScenario(FieldSpec.DataType.STRING);
+    FluentQueryTest.DeclaringTable table = scenario.getDeclaringTable(true);
+
+    // Create a large dataset where ANY_VALUE can return any value
+    FluentQueryTest.QueryExecuted result = table
+        .onFirstInstance("myField", "first_value", "value_1", "value_2", 
"value_3", "value_4")
+        .andOnSecondInstance("myField", "value_5", "value_6", "value_7", 
"value_8", "value_9")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+    validateAnyValueBehavior(result, true); // Should return non-null (many 
values available)
+  }
+
+  // Test serialization/deserialization behavior
+  @Test
+  void testSerializationWithComplexValues() {
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.STRING)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "test_value", "null", "test_value") // 
Same values for deterministic results
+        .andOnSecondInstance("myField", "null", "test_value", "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, true); // Should return non-null 
(test_value exists)
+  }
+
+  // Test numeric edge cases
+  @Test
+  void testNumericEdgeCases() {
+    FluentQueryTest.QueryExecuted result = new 
DataTypeScenario(FieldSpec.DataType.DOUBLE)
+        .getDeclaringTable(true)
+        .onFirstInstance("myField", "0.0", "null", "0.0") // Same values
+        .andOnSecondInstance("myField", "null", "0.0", "null")
+        .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+    validateAnyValueBehavior(result, true); // Should return non-null (0.0 is 
a valid value)
+  }
+
+  // Helper methods to handle different data types in parameterized tests
+  private String getTestValueForDataType(FieldSpec.DataType dataType) {
+    switch (dataType) {
+      case STRING:
+        return "test_value";
+      case INT:
+        return "100";
+      case LONG:
+        return "1000";
+      case FLOAT:
+        return "3.14";
+      case DOUBLE:
+        return "2.718";
+      case BOOLEAN:
+        return "1"; // Use 1/0 for boolean values
+      case BYTES:
+        return "74657374"; // "test" in hex
+      default:
+        return "test_value";
+    }
+  }
+
+  private String getExpectedResultForDataType(FieldSpec.DataType dataType, 
String testValue) {
+    switch (dataType) {
+      case BOOLEAN:
+        return "1"; // Boolean values are stored as integers in Pinot, so 
ANY_VALUE returns "1"
+      default:
+        return testValue; // Most types return the same value
+    }
+  }
+
+  /**
+   * Validates ANY_VALUE behavior by checking if it returns non-null when 
expected.
+   * This makes tests deterministic by focusing on behavior rather than exact 
values.
+   */
+  private void validateAnyValueBehavior(FluentQueryTest.QueryExecuted 
queryResult, boolean shouldReturnNonNull) {
+    if (shouldReturnNonNull) {
+      // For cases where non-null values exist, ANY_VALUE should return 
something non-null
+      // We use a simple approach: try to match null, if it fails then 
ANY_VALUE returned non-null (good!)
+      boolean returnedNull = false;
+      try {
+        queryResult.thenResultIs(EXPECTED_COLUMN_TYPES, EXPECTED_NULL_RESULT);
+        returnedNull = true; // If we reach here, ANY_VALUE returned null
+      } catch (AssertionError e) {
+        // Good! ANY_VALUE returned a non-null value, which is what we expect
+        returnedNull = false;
+      }
+
+      if (returnedNull) {
+        throw new AssertionError("ANY_VALUE returned null when non-null values 
were available in the dataset");
+      }
+      // Test passes - ANY_VALUE returned a non-null value as expected
+    } else {
+      // For cases where all values are null, ANY_VALUE should return null
+      queryResult.thenResultIs(EXPECTED_COLUMN_TYPES, EXPECTED_NULL_RESULT);
+    }
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 6cf2dfccb44..ad2071aa666 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -4332,4 +4332,79 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
     Assert.assertTrue(columnPresent, "Column " + newAddedColumn + " not 
present in result set");
   }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testAnyValueFunctionality(boolean useMultiStageQueryEngine) 
throws Exception {
+    // Test 1: Basic ANY_VALUE functionality with GROUP BY
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = "SELECT Carrier, ANY_VALUE(Origin), COUNT(*) FROM mytable 
GROUP BY Carrier ORDER BY Carrier LIMIT 5";
+    JsonNode response = postQuery(query);
+    JsonNode rows = response.get("resultTable").get("rows");
+    assertTrue(rows.size() > 0, "Should have results");
+
+    for (int i = 0; i < rows.size(); i++) {
+      JsonNode row = rows.get(i);
+      assertNotNull(row.get(0).asText(), "Carrier should not be null");
+      assertNotNull(row.get(1).asText(), "ANY_VALUE(Origin) should not be 
null");
+      assertTrue(row.get(2).asInt() > 0, "COUNT should be greater than 0");
+    }
+
+    // Test 2: ANY_VALUE without GROUP BY - should return single values
+    query = "SELECT ANY_VALUE(Carrier), ANY_VALUE(Origin), COUNT(*) FROM 
mytable";
+    response = postQuery(query);
+    rows = response.get("resultTable").get("rows");
+    assertEquals(rows.size(), 1, "Should have 1 row without GROUP BY");
+
+    JsonNode row = rows.get(0);
+    assertNotNull(row.get(0).asText(), "ANY_VALUE(Carrier) should not be 
null");
+    assertNotNull(row.get(1).asText(), "ANY_VALUE(Origin) should not be null");
+    assertTrue(row.get(2).asInt() > 0, "COUNT should be greater than 0");
+
+    // Test 3: ANY_VALUE with multiple GROUP BY columns
+    query = "SELECT Carrier, Origin, ANY_VALUE(Dest), COUNT(*) FROM mytable"
+        + " GROUP BY Carrier, Origin ORDER BY Carrier, Origin LIMIT 10";
+    response = postQuery(query);
+    rows = response.get("resultTable").get("rows");
+    assertTrue(rows.size() > 0, "Should have results for multiple GROUP BY");
+
+    for (int i = 0; i < rows.size(); i++) {
+      row = rows.get(i);
+      assertNotNull(row.get(0).asText(), "Carrier should not be null");
+      assertNotNull(row.get(1).asText(), "Origin should not be null");
+      assertNotNull(row.get(2).asText(), "ANY_VALUE(Dest) should not be null");
+      assertTrue(row.get(3).asInt() > 0, "COUNT should be greater than 0");
+    }
+
+    // Test 4: ANY_VALUE with different data types
+    query = "SELECT ANY_VALUE(Carrier) as StringValue, ANY_VALUE(AirlineID) as 
IntValue,"
+        + " ANY_VALUE(FlightNum) as IntValue2, ANY_VALUE(ArrDelay) as 
DoubleValue FROM mytable";
+    response = postQuery(query);
+    rows = response.get("resultTable").get("rows");
+    assertEquals(rows.size(), 1, "Should have 1 row for data types test");
+
+    row = rows.get(0);
+    assertNotNull(row.get(0).asText(), "String ANY_VALUE should not be null");
+    assertTrue(row.get(1).asInt() >= 0, "Int ANY_VALUE should be valid");
+    assertTrue(row.get(2).asInt() >= 0, "Int ANY_VALUE should be valid");
+    // ArrDelay can be negative, so just check it's a valid number
+    assertNotNull(row.get(3), "Double ANY_VALUE should not be null");
+
+    // Test 5: ANY_VALUE in complex query with multiple aggregations
+    query = "SELECT Origin, ANY_VALUE(Carrier) as SampleCarrier, 
ANY_VALUE(Dest) as SampleDest,"
+        + " COUNT(*) as FlightCount, AVG(ArrDelay) as AvgDelay FROM mytable"
+        + " GROUP BY Origin ORDER BY FlightCount DESC LIMIT 5";
+    response = postQuery(query);
+    rows = response.get("resultTable").get("rows");
+    assertTrue(rows.size() > 0, "Should have results for complex query");
+
+    for (int i = 0; i < rows.size(); i++) {
+      row = rows.get(i);
+      assertNotNull(row.get(0).asText(), "Origin should not be null");
+      assertNotNull(row.get(1).asText(), "ANY_VALUE(Carrier) should not be 
null");
+      assertNotNull(row.get(2).asText(), "ANY_VALUE(Dest) should not be null");
+      assertTrue(row.get(3).asInt() > 0, "FlightCount should be positive");
+      // AvgDelay can be negative, so just verify it's a number
+      assertNotNull(row.get(4), "AvgDelay should not be null");
+    }
+  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 6d130d89321..a7e4a72a825 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -66,6 +66,7 @@ public enum AggregationFunctionType {
   // TODO: Support MV types after next release (see 
https://github.com/apache/pinot/pull/17109)
   AVG("avg", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
   MODE("mode", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
+  ANYVALUE("anyValue", ReturnTypes.ARG0, OperandTypes.ANY, SqlTypeName.OTHER),
   FIRSTWITHTIME("firstWithTime", ReturnTypes.ARG0,
       OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.ANY, 
SqlTypeFamily.CHARACTER), SqlTypeName.OTHER),
   LASTWITHTIME("lastWithTime", ReturnTypes.ARG0,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to