This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c61d3dc10a9 [Query Engine] ANY VALUE Support (#16678)
c61d3dc10a9 is described below
commit c61d3dc10a9a54874401170af51abc54129acbc5
Author: Praveen <[email protected]>
AuthorDate: Thu Dec 25 06:24:35 2025 -0800
[Query Engine] ANY VALUE Support (#16678)
* Any value
* Tests
* ser/der
* review comments
* Enums & MSE test
---
.../blocks/results/AggregationResultsBlock.java | 11 +-
.../function/AggregationFunctionFactory.java | 2 +
.../function/AggregationFunctionUtils.java | 6 +
.../function/AnyValueAggregationFunction.java | 370 +++++++++++++++++++++
.../function/AnyValueAggregationFunctionTest.java | 315 ++++++++++++++++++
.../tests/OfflineClusterIntegrationTest.java | 75 +++++
.../pinot/segment/spi/AggregationFunctionType.java | 1 +
7 files changed, 779 insertions(+), 1 deletion(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 91b4f14e3c2..668e396ae27 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -185,7 +185,7 @@ public class AggregationResultsBlock extends
BaseResultsBlock {
}
private void setIntermediateResult(DataTableBuilder dataTableBuilder,
ColumnDataType[] columnDataTypes, int index,
- Object result) {
+ Object result) throws IOException {
ColumnDataType columnDataType = columnDataTypes[index];
switch (columnDataType) {
case INT:
@@ -200,6 +200,15 @@ public class AggregationResultsBlock extends
BaseResultsBlock {
case STRING:
dataTableBuilder.setColumn(index, result.toString());
break;
+ case FLOAT:
+ dataTableBuilder.setColumn(index, (float) result);
+ break;
+ case BIG_DECIMAL:
+ dataTableBuilder.setColumn(index, (BigDecimal) result);
+ break;
+ case BYTES:
+ dataTableBuilder.setColumn(index, (ByteArray) result);
+ break;
default:
throw new IllegalStateException("Illegal column data type in
intermediate result: " + columnDataType);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 0fed9fe5e8b..16fa17ab840 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -237,6 +237,8 @@ public class AggregationFunctionFactory {
return new AvgAggregationFunction(arguments, nullHandlingEnabled);
case MODE:
return new ModeAggregationFunction(arguments, nullHandlingEnabled);
+ case ANYVALUE:
+ return new AnyValueAggregationFunction(arguments,
nullHandlingEnabled);
case FIRSTWITHTIME: {
Preconditions.checkArgument(numArguments == 3,
"FIRST_WITH_TIME expects 3 arguments, got: %s. The function
can be used as "
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index c9edf0bf00c..79913301a3d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -154,6 +154,12 @@ public class AggregationFunctionUtils {
return dataTable.getDouble(rowId, colId);
case STRING:
return dataTable.getString(rowId, colId);
+ case FLOAT:
+ return dataTable.getFloat(rowId, colId);
+ case BIG_DECIMAL:
+ return dataTable.getBigDecimal(rowId, colId);
+ case BYTES:
+ return dataTable.getBytes(rowId, colId);
case OBJECT:
CustomObject customObject = dataTable.getCustomObject(rowId, colId);
return customObject != null ?
aggregationFunction.deserializeIntermediateResult(customObject) : null;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunction.java
new file mode 100644
index 00000000000..dac8774ee46
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunction.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * AnyValue aggregation function returns any arbitrary NON-NULL value from the
column for each group.
+ * <p>
+ * This is useful for GROUP BY queries where you want to include a column in
SELECT that has a 1:1 mapping with the
+ * GROUP BY columns, avoiding the need to add it to GROUP BY clause. The
implementation is null-aware and will scan
+ * only until it finds the first non-null value in the current batch for each
group/key. This makes it O(n) over the
+ * input once per group until the first value is set, with early-exit fast
paths when there are no nulls.
+ * </p>
+ * <p><strong>Example:</strong></p>
+ * <pre>{@code
+ * SELECT CustomerID,
+ * ANY_VALUE(CustomerName),
+ * SUM(OrderValue)
+ * FROM Orders
+ * GROUP BY CustomerID
+ * }</pre>
+ */
+public class AnyValueAggregationFunction extends
NullableSingleInputAggregationFunction<Object, Comparable<?>> {
+ private static final FieldSpec.DataType[] DATA_TYPE_VALUES =
FieldSpec.DataType.values();
+ // Result type is determined at runtime based on input expression type
+ private ColumnDataType _resultType;
+
+ public AnyValueAggregationFunction(List<ExpressionContext> arguments,
boolean nullHandlingEnabled) {
+ super(verifySingleArgument(arguments, "ANY_VALUE"), nullHandlingEnabled);
+ _resultType = null;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.ANYVALUE;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ // Default to STRING if result type is not yet determined
+ // TODO: See if UNKNOWN can be used instead
+ return _resultType != null ? _resultType : ColumnDataType.STRING;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return _resultType != null ? _resultType : ColumnDataType.STRING;
+ }
+
+ @Override
+ public Object extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ return aggregationResultHolder.getResult();
+ }
+
+ @Override
+ public Object extractGroupByResult(GroupByResultHolder groupByResultHolder,
int groupKey) {
+ return groupByResultHolder.getResult(groupKey);
+ }
+
+ @Override
+ public Comparable<?> extractFinalResult(Object intermediateResult) {
+ return (Comparable<?>) intermediateResult;
+ }
+
+ @Override
+ public Object merge(Object left, Object right) {
+ // For ANY_VALUE, we just need any non-null value, so merge by returning
the first non-null value
+ return left != null ? left : right;
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder holder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ if (holder.getResult() != null) {
+ return;
+ }
+ BlockValSet bvs = blockValSetMap.get(_expression);
+ ensureResultType(bvs);
+ aggregateHelper(length, bvs, (i, value) -> {
+ holder.setValue(value);
+ return true; // Stop after first value found
+ });
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeys,
GroupByResultHolder holder,
+ Map<ExpressionContext, BlockValSet> map) {
+ BlockValSet bvs = map.get(_expression);
+ ensureResultType(bvs);
+ aggregateHelper(length, bvs, (i, value) -> {
+ int g = groupKeys[i];
+ if (holder.getResult(g) == null) {
+ holder.setValueForKey(g, value);
+ }
+ return false; // Continue processing for other groups
+ });
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder holder,
+ Map<ExpressionContext, BlockValSet> map) {
+ BlockValSet bvs = map.get(_expression);
+ ensureResultType(bvs);
+ aggregateHelper(length, bvs, (i, value) -> {
+ int[] keys = groupKeysArray[i];
+ for (int g : keys) {
+ if (holder.getResult(g) == null) {
+ holder.setValueForKey(g, value);
+ }
+ }
+ return false; // Continue processing for other groups
+ });
+ }
+
+ @Override
+ public SerializedIntermediateResult serializeIntermediateResult(Object
value) {
+ if (value == null) {
+ return new SerializedIntermediateResult(0, new byte[0]);
+ }
+ byte[] bytes = serializeValue(value);
+ return new SerializedIntermediateResult(1, bytes);
+ }
+
+ @Override
+ public Object deserializeIntermediateResult(CustomObject customObject) {
+ if (customObject.getBuffer().remaining() == 0) {
+ return null;
+ }
+ return deserializeValue(customObject.getBuffer());
+ }
+
+ @FunctionalInterface
+ private interface ValueProcessor<T> {
+ boolean process(int index, T value); // Returns true to stop processing,
false to continue
+ }
+
+ /**
+ * Generic helper for processing values with dictionary optimization for all
supported data types
+ */
+ private void aggregateHelper(int length, BlockValSet bvs,
ValueProcessor<Object> processor) {
+ // Use dictionary-based access for efficiency when available
+ if (bvs.getDictionary() != null) {
+ final int[] dictIds = bvs.getDictionaryIdsSV();
+ final Dictionary dict = bvs.getDictionary();
+ forEachNotNull(length, bvs, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ Object value = getDictionaryValue(dict, dictIds[i],
bvs.getValueType().getStoredType());
+ if (processor.process(i, value)) {
+ break;
+ }
+ }
+ });
+ } else {
+ // Fall back to direct value access based on type
+ forEachNotNull(length, bvs, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ Object value = getDirectValue(bvs, i);
+ if (value != null && processor.process(i, value)) {
+ break;
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Get value from dictionary based on data type
+ */
+ private Object getDictionaryValue(Dictionary dict, int dictId,
FieldSpec.DataType storedType) {
+ switch (storedType) {
+ case INT:
+ return dict.getIntValue(dictId);
+ case LONG:
+ return dict.getLongValue(dictId);
+ case FLOAT:
+ return dict.getFloatValue(dictId);
+ case DOUBLE:
+ return dict.getDoubleValue(dictId);
+ case STRING:
+ return dict.getStringValue(dictId);
+ case BIG_DECIMAL:
+ return dict.getBigDecimalValue(dictId);
+ case BYTES:
+ return dict.getBytesValue(dictId);
+ default:
+ throw new IllegalStateException("Unsupported dictionary type: " +
storedType);
+ }
+ }
+
+ /**
+ * Get value directly from BlockValSet based on data type
+ */
+ private Object getDirectValue(BlockValSet bvs, int index) {
+ switch (bvs.getValueType().getStoredType()) {
+ case INT:
+ return bvs.getIntValuesSV()[index];
+ case LONG:
+ return bvs.getLongValuesSV()[index];
+ case FLOAT:
+ return bvs.getFloatValuesSV()[index];
+ case DOUBLE:
+ return bvs.getDoubleValuesSV()[index];
+ case STRING:
+ return bvs.getStringValuesSV()[index];
+ case BIG_DECIMAL:
+ return bvs.getBigDecimalValuesSV()[index];
+ case BYTES:
+ return bvs.getBytesValuesSV()[index];
+ default:
+ throw new IllegalStateException("Unsupported direct access type: " +
bvs.getValueType().getStoredType());
+ }
+ }
+
+ /**
+ * Custom serialization for ANY_VALUE that handles all supported data types
efficiently
+ * Note: value is never null - null is handled at the
serializeIntermediateResult layer
+ */
+ private byte[] serializeValue(Object value) {
+ if (value instanceof Integer) {
+ return serializeFixedValue(FieldSpec.DataType.INT, 4, buffer ->
buffer.putInt((Integer) value));
+ } else if (value instanceof Long) {
+ return serializeFixedValue(FieldSpec.DataType.LONG, 8, buffer ->
buffer.putLong((Long) value));
+ } else if (value instanceof Float) {
+ return serializeFixedValue(FieldSpec.DataType.FLOAT, 4, buffer ->
buffer.putFloat((Float) value));
+ } else if (value instanceof Double) {
+ return serializeFixedValue(FieldSpec.DataType.DOUBLE, 8, buffer ->
buffer.putDouble((Double) value));
+ } else if (value instanceof String) {
+ return serializeVariableValue(FieldSpec.DataType.STRING, ((String)
value).getBytes(StandardCharsets.UTF_8));
+ } else if (value instanceof BigDecimal) {
+ return serializeVariableValue(FieldSpec.DataType.BIG_DECIMAL,
value.toString().getBytes(StandardCharsets.UTF_8));
+ } else if (value instanceof byte[]) {
+ return serializeVariableValue(FieldSpec.DataType.BYTES, (byte[]) value);
+ } else {
+ throw new IllegalStateException("Unsupported value type for
serialization: " + value.getClass().getName());
+ }
+ }
+
+ /**
+ * Helper method for serializing fixed-length values
+ */
+ private byte[] serializeFixedValue(FieldSpec.DataType dataType, int
valueSize, Consumer<ByteBuffer> valueWriter) {
+ ByteBuffer buffer = ByteBuffer.allocate(4 + valueSize); // 4 bytes for
type ordinal + value bytes
+ buffer.putInt(dataType.ordinal());
+ valueWriter.accept(buffer);
+ return buffer.array();
+ }
+
+ /**
+ * Helper method for serializing variable-length values
+ */
+ private byte[] serializeVariableValue(FieldSpec.DataType dataType, byte[]
data) {
+ ByteBuffer buffer = ByteBuffer.allocate(8 + data.length); // 4 bytes type
ordinal + 4 bytes length + data
+ buffer.putInt(dataType.ordinal());
+ buffer.putInt(data.length);
+ buffer.put(data);
+ return buffer.array();
+ }
+
+ /**
+ * Custom deserialization for ANY_VALUE that handles all supported data
types efficiently
+ * Note: empty buffer (null) is handled at the deserializeIntermediateResult
layer
+ */
+ private Object deserializeValue(ByteBuffer buffer) {
+ int typeOrdinal = buffer.getInt();
+ FieldSpec.DataType dataType = DATA_TYPE_VALUES[typeOrdinal];
+ switch (dataType) {
+ case INT:
+ return buffer.getInt();
+ case LONG:
+ return buffer.getLong();
+ case FLOAT:
+ return buffer.getFloat();
+ case DOUBLE:
+ return buffer.getDouble();
+ case STRING:
+ return new String(deserializeVariableBytes(buffer),
StandardCharsets.UTF_8);
+ case BIG_DECIMAL:
+ return new BigDecimal(new String(deserializeVariableBytes(buffer),
StandardCharsets.UTF_8));
+ case BYTES:
+ return deserializeVariableBytes(buffer);
+ default:
+ throw new IllegalStateException("Unsupported data type for
deserialization: " + dataType);
+ }
+ }
+
+ /**
+ * Helper method for deserializing variable-length byte arrays
+ */
+ private byte[] deserializeVariableBytes(ByteBuffer buffer) {
+ int length = buffer.getInt();
+ byte[] bytes = new byte[length];
+ buffer.get(bytes);
+ return bytes;
+ }
+
+ private void ensureResultType(BlockValSet bvs) {
+ if (_resultType != null) {
+ return;
+ }
+ switch (bvs.getValueType().getStoredType()) {
+ case INT:
+ _resultType = ColumnDataType.INT;
+ return;
+ case LONG:
+ _resultType = ColumnDataType.LONG;
+ return;
+ case FLOAT:
+ _resultType = ColumnDataType.FLOAT;
+ return;
+ case DOUBLE:
+ _resultType = ColumnDataType.DOUBLE;
+ return;
+ case STRING:
+ _resultType = ColumnDataType.STRING;
+ return;
+ case BIG_DECIMAL:
+ _resultType = ColumnDataType.BIG_DECIMAL;
+ return;
+ case BYTES:
+ _resultType = ColumnDataType.BYTES;
+ return;
+ default:
+ throw new IllegalStateException("ANY_VALUE unsupported type: " +
bvs.getValueType());
+ }
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunctionTest.java
new file mode 100644
index 00000000000..af3d9ab660f
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AnyValueAggregationFunctionTest.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class AnyValueAggregationFunctionTest extends
AbstractAggregationFunctionTest {
+
+ // Constants for standardized test queries and expected results
+ private static final String STANDARD_GROUP_BY_QUERY_TEMPLATE =
+ "select 'testResult', anyValue(myField) from testTable group by
'testResult'";
+ private static final String EXPECTED_COLUMN_TYPES = "STRING | STRING";
+ private static final String EXPECTED_NULL_RESULT = "testResult | null";
+
+ @DataProvider(name = "scenarios")
+ Object[] scenarios() {
+ return new Object[] {
+ new DataTypeScenario(FieldSpec.DataType.STRING),
+ new DataTypeScenario(FieldSpec.DataType.INT),
+ new DataTypeScenario(FieldSpec.DataType.LONG),
+ new DataTypeScenario(FieldSpec.DataType.FLOAT),
+ new DataTypeScenario(FieldSpec.DataType.DOUBLE),
+ new DataTypeScenario(FieldSpec.DataType.BOOLEAN),
+ };
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationWithNullHandlingDisabled(DataTypeScenario scenario) {
+ // Use appropriate test data based on data type
+ String testValue = getTestValueForDataType(scenario.getDataType());
+ String expectedResult =
getExpectedResultForDataType(scenario.getDataType(), testValue);
+
+ // Test that ANY_VALUE returns a non-null result when non-null values exist
+ FluentQueryTest.QueryExecuted result = scenario.getDeclaringTable(false)
+ .onFirstInstance("myField", testValue, "null", testValue)
+ .andOnSecondInstance("myField", "null", testValue, "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ // Validate that ANY_VALUE returned something non-null (exact value
doesn't matter)
+ validateAnyValueBehavior(result, true); // true = should return non-null
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationWithNullHandlingEnabled(DataTypeScenario scenario) {
+ // Use appropriate test data based on data type
+ String testValue = getTestValueForDataType(scenario.getDataType());
+ String expectedResult =
getExpectedResultForDataType(scenario.getDataType(), testValue);
+
+ // Test that ANY_VALUE returns a non-null result when non-null values exist
+ FluentQueryTest.QueryExecuted result = scenario.getDeclaringTable(true)
+ .onFirstInstance("myField", testValue, "null", testValue)
+ .andOnSecondInstance("myField", "null", testValue, "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ // Validate that ANY_VALUE returned something non-null (exact value
doesn't matter)
+ validateAnyValueBehavior(result, true); // true = should return non-null
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVWithNullHandlingDisabled(DataTypeScenario scenario)
{
+ // Use appropriate test data based on data type
+ String testValue = getTestValueForDataType(scenario.getDataType());
+ String expectedResult =
getExpectedResultForDataType(scenario.getDataType(), testValue);
+
+ // Test that ANY_VALUE returns a non-null result when non-null values exist
+ // We don't check the exact value since ANY_VALUE can return any valid
value
+ FluentQueryTest.QueryExecuted result = scenario.getDeclaringTable(false)
+ .onFirstInstance("myField", testValue, "null", testValue)
+ .andOnSecondInstance("myField", "null", testValue, "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ // Validate that ANY_VALUE returned something non-null (exact value
doesn't matter)
+ validateAnyValueBehavior(result, true); // true = should return non-null
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggregationGroupBySVWithNullHandlingEnabled(DataTypeScenario scenario) {
+ // Use appropriate test data based on data type
+ String testValue = getTestValueForDataType(scenario.getDataType());
+ String expectedResult =
getExpectedResultForDataType(scenario.getDataType(), testValue);
+
+ // Test that ANY_VALUE returns a non-null result when non-null values exist
+ // We don't check the exact value since ANY_VALUE can return any valid
value
+ FluentQueryTest.QueryExecuted result = scenario.getDeclaringTable(true)
+ .onFirstInstance("myField", testValue, "null", testValue)
+ .andOnSecondInstance("myField", "null", testValue, "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ // Validate that ANY_VALUE returned something non-null (exact value
doesn't matter)
+ validateAnyValueBehavior(result, true); // true = should return non-null
+ }
+
+ // Test for different data types with specific values
+ @Test
+ void testIntegerDataType() {
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.INT)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "100", "null", "100") // Same non-null
values
+ .andOnSecondInstance("myField", "null", "100", "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, true); // Should return non-null
+ }
+
+ @Test
+ void testLongDataType() {
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.LONG)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "1000", "null", "1000") // Same non-null
values
+ .andOnSecondInstance("myField", "null", "1000", "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, true); // Should return non-null
+ }
+
+ @Test
+ void testFloatDataType() {
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.FLOAT)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "3.14", "null", "3.14") // Same non-null
values
+ .andOnSecondInstance("myField", "null", "3.14", "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, true); // Should return non-null
+ }
+
+ @Test
+ void testDoubleDataType() {
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.DOUBLE)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "2.718", "null", "2.718") // Same non-null
values
+ .andOnSecondInstance("myField", "null", "2.718", "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, true); // Should return non-null
+ }
+
+ @Test
+ void testBooleanDataType() {
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.BOOLEAN)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "1", "null", "1") // Use 1/0 for boolean,
same values
+ .andOnSecondInstance("myField", "null", "1", "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, true); // Should return non-null
+ }
+
+ // Edge case tests
+ @Test
+ void testAllNullValues() {
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.STRING)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "null", "null", "null")
+ .andOnSecondInstance("myField", "null", "null", "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, false); // false = should return null
(all values are null)
+ }
+
+ @Test
+ void testSingleNonNullValue() {
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.STRING)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "null", "null", "null")
+ .andOnSecondInstance("myField", "null", "unique_value", "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, true); // Should return non-null
(unique_value exists)
+ }
+
+ // GROUP BY tests with different data types
+ @Test
+ void testGroupByWithMultipleGroups() {
+ // ANY_VALUE can return any of the values (value1, value2, value3)
+ // This test has mixed values, so ANY_VALUE could return any of them
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.STRING)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "value1", "value1", "value2")
+ .andOnSecondInstance("myField", "value2", "value3", "value3")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, true); // Should return non-null
(multiple values available)
+ }
+
+ @Test
+ void testGroupByWithNullsInGroups() {
+ // ANY_VALUE can return any non-null value (100, 200, or 300)
+ // This test has mixed values, so ANY_VALUE could return any of them
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.INT)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "100", "null", "200")
+ .andOnSecondInstance("myField", "null", "300", "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, true); // Should return non-null
(multiple non-null values available)
+ }
+
+ // Performance validation test - ensures ANY_VALUE doesn't require all
values to be processed
+ @Test
+ void testPerformanceWithLargeDataset() {
+ // ANY_VALUE can return any of the values in the dataset
+ // This test has mixed values, so ANY_VALUE could return any of them
+ DataTypeScenario scenario = new
DataTypeScenario(FieldSpec.DataType.STRING);
+ FluentQueryTest.DeclaringTable table = scenario.getDeclaringTable(true);
+
+ // Create a large dataset where ANY_VALUE can return any value
+ FluentQueryTest.QueryExecuted result = table
+ .onFirstInstance("myField", "first_value", "value_1", "value_2",
"value_3", "value_4")
+ .andOnSecondInstance("myField", "value_5", "value_6", "value_7",
"value_8", "value_9")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+ validateAnyValueBehavior(result, true); // Should return non-null (many
values available)
+ }
+
+ // Test serialization/deserialization behavior
+ @Test
+ void testSerializationWithComplexValues() {
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.STRING)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "test_value", "null", "test_value") //
Same values for deterministic results
+ .andOnSecondInstance("myField", "null", "test_value", "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, true); // Should return non-null
(test_value exists)
+ }
+
+ // Test numeric edge cases
+ @Test
+ void testNumericEdgeCases() {
+ FluentQueryTest.QueryExecuted result = new
DataTypeScenario(FieldSpec.DataType.DOUBLE)
+ .getDeclaringTable(true)
+ .onFirstInstance("myField", "0.0", "null", "0.0") // Same values
+ .andOnSecondInstance("myField", "null", "0.0", "null")
+ .whenQuery(STANDARD_GROUP_BY_QUERY_TEMPLATE);
+
+ validateAnyValueBehavior(result, true); // Should return non-null (0.0 is
a valid value)
+ }
+
+ // Helper methods to handle different data types in parameterized tests
+ private String getTestValueForDataType(FieldSpec.DataType dataType) {
+ switch (dataType) {
+ case STRING:
+ return "test_value";
+ case INT:
+ return "100";
+ case LONG:
+ return "1000";
+ case FLOAT:
+ return "3.14";
+ case DOUBLE:
+ return "2.718";
+ case BOOLEAN:
+ return "1"; // Use 1/0 for boolean values
+ case BYTES:
+ return "74657374"; // "test" in hex
+ default:
+ return "test_value";
+ }
+ }
+
+ private String getExpectedResultForDataType(FieldSpec.DataType dataType,
String testValue) {
+ switch (dataType) {
+ case BOOLEAN:
+ return "1"; // Boolean values are stored as integers in Pinot, so
ANY_VALUE returns "1"
+ default:
+ return testValue; // Most types return the same value
+ }
+ }
+
+ /**
+ * Validates ANY_VALUE behavior by checking if it returns non-null when
expected.
+ * This makes tests deterministic by focusing on behavior rather than exact
values.
+ */
+ private void validateAnyValueBehavior(FluentQueryTest.QueryExecuted
queryResult, boolean shouldReturnNonNull) {
+ if (shouldReturnNonNull) {
+ // For cases where non-null values exist, ANY_VALUE should return
something non-null
+ // We use a simple approach: try to match null, if it fails then
ANY_VALUE returned non-null (good!)
+ boolean returnedNull = false;
+ try {
+ queryResult.thenResultIs(EXPECTED_COLUMN_TYPES, EXPECTED_NULL_RESULT);
+ returnedNull = true; // If we reach here, ANY_VALUE returned null
+ } catch (AssertionError e) {
+ // Good! ANY_VALUE returned a non-null value, which is what we expect
+ returnedNull = false;
+ }
+
+ if (returnedNull) {
+ throw new AssertionError("ANY_VALUE returned null when non-null values
were available in the dataset");
+ }
+ // Test passes - ANY_VALUE returned a non-null value as expected
+ } else {
+ // For cases where all values are null, ANY_VALUE should return null
+ queryResult.thenResultIs(EXPECTED_COLUMN_TYPES, EXPECTED_NULL_RESULT);
+ }
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 6cf2dfccb44..ad2071aa666 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -4332,4 +4332,79 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
}
Assert.assertTrue(columnPresent, "Column " + newAddedColumn + " not
present in result set");
}
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testAnyValueFunctionality(boolean useMultiStageQueryEngine)
throws Exception {
+ // Test 1: Basic ANY_VALUE functionality with GROUP BY
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = "SELECT Carrier, ANY_VALUE(Origin), COUNT(*) FROM mytable
GROUP BY Carrier ORDER BY Carrier LIMIT 5";
+ JsonNode response = postQuery(query);
+ JsonNode rows = response.get("resultTable").get("rows");
+ assertTrue(rows.size() > 0, "Should have results");
+
+ for (int i = 0; i < rows.size(); i++) {
+ JsonNode row = rows.get(i);
+ assertNotNull(row.get(0).asText(), "Carrier should not be null");
+ assertNotNull(row.get(1).asText(), "ANY_VALUE(Origin) should not be
null");
+ assertTrue(row.get(2).asInt() > 0, "COUNT should be greater than 0");
+ }
+
+ // Test 2: ANY_VALUE without GROUP BY - should return single values
+ query = "SELECT ANY_VALUE(Carrier), ANY_VALUE(Origin), COUNT(*) FROM
mytable";
+ response = postQuery(query);
+ rows = response.get("resultTable").get("rows");
+ assertEquals(rows.size(), 1, "Should have 1 row without GROUP BY");
+
+ JsonNode row = rows.get(0);
+ assertNotNull(row.get(0).asText(), "ANY_VALUE(Carrier) should not be
null");
+ assertNotNull(row.get(1).asText(), "ANY_VALUE(Origin) should not be null");
+ assertTrue(row.get(2).asInt() > 0, "COUNT should be greater than 0");
+
+ // Test 3: ANY_VALUE with multiple GROUP BY columns
+ query = "SELECT Carrier, Origin, ANY_VALUE(Dest), COUNT(*) FROM mytable"
+ + " GROUP BY Carrier, Origin ORDER BY Carrier, Origin LIMIT 10";
+ response = postQuery(query);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(rows.size() > 0, "Should have results for multiple GROUP BY");
+
+ for (int i = 0; i < rows.size(); i++) {
+ row = rows.get(i);
+ assertNotNull(row.get(0).asText(), "Carrier should not be null");
+ assertNotNull(row.get(1).asText(), "Origin should not be null");
+ assertNotNull(row.get(2).asText(), "ANY_VALUE(Dest) should not be null");
+ assertTrue(row.get(3).asInt() > 0, "COUNT should be greater than 0");
+ }
+
+ // Test 4: ANY_VALUE with different data types
+ query = "SELECT ANY_VALUE(Carrier) as StringValue, ANY_VALUE(AirlineID) as
IntValue,"
+ + " ANY_VALUE(FlightNum) as IntValue2, ANY_VALUE(ArrDelay) as
DoubleValue FROM mytable";
+ response = postQuery(query);
+ rows = response.get("resultTable").get("rows");
+ assertEquals(rows.size(), 1, "Should have 1 row for data types test");
+
+ row = rows.get(0);
+ assertNotNull(row.get(0).asText(), "String ANY_VALUE should not be null");
+ assertTrue(row.get(1).asInt() >= 0, "Int ANY_VALUE should be valid");
+ assertTrue(row.get(2).asInt() >= 0, "Int ANY_VALUE should be valid");
+ // ArrDelay can be negative, so just check it's a valid number
+ assertNotNull(row.get(3), "Double ANY_VALUE should not be null");
+
+ // Test 5: ANY_VALUE in complex query with multiple aggregations
+ query = "SELECT Origin, ANY_VALUE(Carrier) as SampleCarrier,
ANY_VALUE(Dest) as SampleDest,"
+ + " COUNT(*) as FlightCount, AVG(ArrDelay) as AvgDelay FROM mytable"
+ + " GROUP BY Origin ORDER BY FlightCount DESC LIMIT 5";
+ response = postQuery(query);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(rows.size() > 0, "Should have results for complex query");
+
+ for (int i = 0; i < rows.size(); i++) {
+ row = rows.get(i);
+ assertNotNull(row.get(0).asText(), "Origin should not be null");
+ assertNotNull(row.get(1).asText(), "ANY_VALUE(Carrier) should not be
null");
+ assertNotNull(row.get(2).asText(), "ANY_VALUE(Dest) should not be null");
+ assertTrue(row.get(3).asInt() > 0, "FlightCount should be positive");
+ // AvgDelay can be negative, so just verify it's a number
+ assertNotNull(row.get(4), "AvgDelay should not be null");
+ }
+ }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 6d130d89321..a7e4a72a825 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -66,6 +66,7 @@ public enum AggregationFunctionType {
// TODO: Support MV types after next release (see
https://github.com/apache/pinot/pull/17109)
AVG("avg", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
MODE("mode", SqlTypeName.OTHER, SqlTypeName.DOUBLE),
+ ANYVALUE("anyValue", ReturnTypes.ARG0, OperandTypes.ANY, SqlTypeName.OTHER),
FIRSTWITHTIME("firstWithTime", ReturnTypes.ARG0,
OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.ANY,
SqlTypeFamily.CHARACTER), SqlTypeName.OTHER),
LASTWITHTIME("lastWithTime", ReturnTypes.ARG0,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]