This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ecac6c9376 Add ArrayAgg aggregation function (#11822)
ecac6c9376 is described below
commit ecac6c9376e44d355b9363a0a9b6c2cd59d142ea
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Oct 21 09:07:36 2023 -0700
Add ArrayAgg aggregation function (#11822)
---
.../org/apache/pinot/common/utils/DataSchema.java | 51 ++++-
.../apache/pinot/core/common/ObjectSerDeUtils.java | 168 ++++++++++++++++-
.../blocks/results/AggregationResultsBlock.java | 16 +-
.../function/AggregationFunctionFactory.java | 63 ++++++-
.../array/ArrayAggDistinctDoubleFunction.java | 70 +++++++
.../array/ArrayAggDistinctFloatFunction.java | 70 +++++++
.../array/ArrayAggDistinctIntFunction.java | 74 ++++++++
.../array/ArrayAggDistinctLongFunction.java | 70 +++++++
.../array/ArrayAggDistinctStringFunction.java | 69 +++++++
.../function/array/ArrayAggDoubleFunction.java | 70 +++++++
.../function/array/ArrayAggFloatFunction.java | 71 +++++++
.../function/array/ArrayAggIntFunction.java | 67 +++++++
.../function/array/ArrayAggLongFunction.java | 69 +++++++
.../function/array/ArrayAggStringFunction.java | 69 +++++++
.../function/array/BaseArrayAggDoubleFunction.java | 98 ++++++++++
.../function/array/BaseArrayAggFloatFunction.java | 98 ++++++++++
.../function/array/BaseArrayAggFunction.java | 141 ++++++++++++++
.../function/array/BaseArrayAggIntFunction.java | 103 ++++++++++
.../function/array/BaseArrayAggLongFunction.java | 101 ++++++++++
.../function/array/BaseArrayAggStringFunction.java | 98 ++++++++++
.../pinot/core/common/ObjectSerDeUtilsTest.java | 60 ++++++
.../apache/pinot/queries/ArrayAggQueriesTest.java | 209 +++++++++++++++++++++
.../pinot/integration/tests/custom/ArrayTest.java | 174 +++++++++++++++++
.../query/runtime/operator/utils/TypeUtils.java | 28 ++-
.../pinot/segment/spi/AggregationFunctionType.java | 17 +-
25 files changed, 2104 insertions(+), 20 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index e9a7e858fe..9bd8dacef1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -23,7 +23,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -52,7 +55,9 @@ public class DataSchema {
private final ColumnDataType[] _columnDataTypes;
private ColumnDataType[] _storedColumnDataTypes;
- /** Used by both Broker and Server to generate results for EXPLAIN PLAN
queries. */
+ /**
+ * Used by both Broker and Server to generate results for EXPLAIN PLAN
queries.
+ */
public static final DataSchema EXPLAIN_RESULT_SCHEMA =
new DataSchema(new String[]{"Operator", "Operator_Id", "Parent_Id"}, new
ColumnDataType[]{
ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.INT
@@ -425,19 +430,19 @@ public class DataSchema {
case BYTES:
return ((ByteArray) value).getBytes();
case INT_ARRAY:
- return (int[]) value;
+ return toIntArray(value);
case LONG_ARRAY:
return toLongArray(value);
case FLOAT_ARRAY:
- return (float[]) value;
+ return toFloatArray(value);
case DOUBLE_ARRAY:
return toDoubleArray(value);
case STRING_ARRAY:
- return (String[]) value;
+ return toStringArray(value);
case BOOLEAN_ARRAY:
- return toBooleanArray((int[]) value);
+ return toBooleanArray(toIntArray(value));
case TIMESTAMP_ARRAY:
- return toTimestampArray((long[]) value);
+ return toTimestampArray(toLongArray(value));
case BYTES_ARRAY:
return (byte[][]) value;
case UNKNOWN: // fall through
@@ -513,11 +518,31 @@ public class DataSchema {
}
}
+ private static int[] toIntArray(Object value) {
+ if (value instanceof int[]) {
+ return (int[]) value;
+ } else if (value instanceof IntArrayList) {
+ // For ArrayAggregationFunction
+ return ((IntArrayList) value).elements();
+ }
+ throw new IllegalStateException(String.format("Cannot convert: '%s' to
int[]", value));
+ }
+
+ private static float[] toFloatArray(Object value) {
+ if (value instanceof float[]) {
+ return (float[]) value;
+ } else if (value instanceof FloatArrayList) {
+ // For ArrayAggregationFunction
+ return ((FloatArrayList) value).elements();
+ }
+ throw new IllegalStateException(String.format("Cannot convert: '%s' to
float[]", value));
+ }
+
private static double[] toDoubleArray(Object value) {
if (value instanceof double[]) {
return (double[]) value;
} else if (value instanceof DoubleArrayList) {
- // For HistogramAggregationFunction
+ // For HistogramAggregationFunction and ArrayAggregationFunction
return ((DoubleArrayList) value).elements();
} else if (value instanceof int[]) {
int[] intValues = (int[]) value;
@@ -550,7 +575,7 @@ public class DataSchema {
if (value instanceof long[]) {
return (long[]) value;
} else if (value instanceof LongArrayList) {
- // For FunnelCountAggregationFunction
+ // For FunnelCountAggregationFunction and ArrayAggregationFunction
return ((LongArrayList) value).elements();
} else {
int[] intValues = (int[]) value;
@@ -563,6 +588,16 @@ public class DataSchema {
}
}
+ private static String[] toStringArray(Object value) {
+ if (value instanceof String[]) {
+ return (String[]) value;
+ } else if (value instanceof ObjectArrayList) {
+ // For ArrayAggregationFunction
+ return ((ObjectArrayList<String>) value).toArray(new String[0]);
+ }
+ throw new IllegalStateException(String.format("Cannot convert: '%s' to
String[]", value));
+ }
+
private static boolean[] toBooleanArray(int[] intArray) {
int length = intArray.length;
boolean[] booleanArray = new boolean[length];
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 45f1ebd489..0359d99609 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -34,19 +34,23 @@ import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
import it.unimi.dsi.fastutil.doubles.DoubleSet;
import it.unimi.dsi.fastutil.floats.Float2LongMap;
import it.unimi.dsi.fastutil.floats.Float2LongOpenHashMap;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.floats.FloatIterator;
import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
import it.unimi.dsi.fastutil.floats.FloatSet;
import it.unimi.dsi.fastutil.ints.Int2LongMap;
import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.Long2LongMap;
import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongIterator;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import it.unimi.dsi.fastutil.objects.ObjectSet;
import java.io.IOException;
@@ -144,7 +148,11 @@ public class ObjectSerDeUtils {
FrequentStringsSketch(38),
FrequentLongsSketch(39),
HyperLogLogPlus(40),
- CompressedProbabilisticCounting(41);
+ CompressedProbabilisticCounting(41),
+ IntArrayList(42),
+ LongArrayList(43),
+ FloatArrayList(44),
+ StringArrayList(45);
private final int _value;
@@ -165,8 +173,25 @@ public class ObjectSerDeUtils {
return ObjectType.Double;
} else if (value instanceof BigDecimal) {
return ObjectType.BigDecimal;
+ } else if (value instanceof IntArrayList) {
+ return ObjectType.IntArrayList;
+ } else if (value instanceof LongArrayList) {
+ return ObjectType.LongArrayList;
+ } else if (value instanceof FloatArrayList) {
+ return ObjectType.FloatArrayList;
} else if (value instanceof DoubleArrayList) {
return ObjectType.DoubleArrayList;
+ } else if (value instanceof ObjectArrayList) {
+ ObjectArrayList objectArrayList = (ObjectArrayList) value;
+ if (!objectArrayList.isEmpty()) {
+ Object next = objectArrayList.get(0);
+ if (next instanceof String) {
+ return ObjectType.StringArrayList;
+ }
+ throw new IllegalArgumentException(
+ "Unsupported type of value: " + next.getClass().getSimpleName());
+ }
+ return ObjectType.StringArrayList;
} else if (value instanceof AvgPair) {
return ObjectType.AvgPair;
} else if (value instanceof MinMaxRangePair) {
@@ -329,6 +354,99 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<IntArrayList> INT_ARRAY_LIST_SER_DE = new
ObjectSerDe<IntArrayList>() {
+
+ @Override
+ public byte[] serialize(IntArrayList intArrayList) {
+ int size = intArrayList.size();
+ byte[] bytes = new byte[Integer.BYTES + size * Integer.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(size);
+ int[] values = intArrayList.elements();
+ for (int i = 0; i < size; i++) {
+ byteBuffer.putInt(values[i]);
+ }
+ return bytes;
+ }
+
+ @Override
+ public IntArrayList deserialize(byte[] bytes) {
+ return deserialize(ByteBuffer.wrap(bytes));
+ }
+
+ @Override
+ public IntArrayList deserialize(ByteBuffer byteBuffer) {
+ int numValues = byteBuffer.getInt();
+ IntArrayList intArrayList = new IntArrayList(numValues);
+ for (int i = 0; i < numValues; i++) {
+ intArrayList.add(byteBuffer.getInt());
+ }
+ return intArrayList;
+ }
+ };
+
+ public static final ObjectSerDe<LongArrayList> LONG_ARRAY_LIST_SER_DE = new
ObjectSerDe<LongArrayList>() {
+
+ @Override
+ public byte[] serialize(LongArrayList longArrayList) {
+ int size = longArrayList.size();
+ byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(size);
+ long[] values = longArrayList.elements();
+ for (int i = 0; i < size; i++) {
+ byteBuffer.putLong(values[i]);
+ }
+ return bytes;
+ }
+
+ @Override
+ public LongArrayList deserialize(byte[] bytes) {
+ return deserialize(ByteBuffer.wrap(bytes));
+ }
+
+ @Override
+ public LongArrayList deserialize(ByteBuffer byteBuffer) {
+ int numValues = byteBuffer.getInt();
+ LongArrayList longArrayList = new LongArrayList(numValues);
+ for (int i = 0; i < numValues; i++) {
+ longArrayList.add(byteBuffer.getLong());
+ }
+ return longArrayList;
+ }
+ };
+
+ public static final ObjectSerDe<FloatArrayList> FLOAT_ARRAY_LIST_SER_DE =
new ObjectSerDe<FloatArrayList>() {
+
+ @Override
+ public byte[] serialize(FloatArrayList floatArrayList) {
+ int size = floatArrayList.size();
+ byte[] bytes = new byte[Integer.BYTES + size * Float.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(size);
+ float[] values = floatArrayList.elements();
+ for (int i = 0; i < size; i++) {
+ byteBuffer.putFloat(values[i]);
+ }
+ return bytes;
+ }
+
+ @Override
+ public FloatArrayList deserialize(byte[] bytes) {
+ return deserialize(ByteBuffer.wrap(bytes));
+ }
+
+ @Override
+ public FloatArrayList deserialize(ByteBuffer byteBuffer) {
+ int numValues = byteBuffer.getInt();
+ FloatArrayList floatArrayList = new FloatArrayList(numValues);
+ for (int i = 0; i < numValues; i++) {
+ floatArrayList.add(byteBuffer.getFloat());
+ }
+ return floatArrayList;
+ }
+ };
+
public static final ObjectSerDe<DoubleArrayList> DOUBLE_ARRAY_LIST_SER_DE =
new ObjectSerDe<DoubleArrayList>() {
@Override
@@ -360,6 +478,50 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<ObjectArrayList> STRING_ARRAY_LIST_SER_DE =
+ new ObjectSerDe<ObjectArrayList>() {
+ @Override
+ public byte[] serialize(ObjectArrayList stringArrayList) {
+ int size = stringArrayList.size();
+ // Besides the value bytes, we store: size, length for each value
+ long bufferSize = (1 + (long) size) * Integer.BYTES;
+ byte[][] valueBytesArray = new byte[size][];
+ for (int index = 0; index < size; index++) {
+ Object value = stringArrayList.get(index);
+ byte[] valueBytes = value.toString().getBytes(UTF_8);
+ bufferSize += valueBytes.length;
+ valueBytesArray[index] = valueBytes;
+ }
+ Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer
size exceeds 2GB");
+ byte[] bytes = new byte[(int) bufferSize];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(size);
+ for (byte[] valueBytes : valueBytesArray) {
+ byteBuffer.putInt(valueBytes.length);
+ byteBuffer.put(valueBytes);
+ }
+ return bytes;
+ }
+
+ @Override
+ public ObjectArrayList deserialize(byte[] bytes) {
+ return deserialize(ByteBuffer.wrap(bytes));
+ }
+
+ @Override
+ public ObjectArrayList deserialize(ByteBuffer byteBuffer) {
+ int size = byteBuffer.getInt();
+ ObjectArrayList stringArrayList = new ObjectArrayList(size);
+ for (int i = 0; i < size; i++) {
+ int length = byteBuffer.getInt();
+ byte[] valueBytes = new byte[length];
+ byteBuffer.get(valueBytes);
+ stringArrayList.add(new String(valueBytes, UTF_8));
+ }
+ return stringArrayList;
+ }
+ };
+
public static final ObjectSerDe<AvgPair> AVG_PAIR_SER_DE = new
ObjectSerDe<AvgPair>() {
@Override
@@ -1435,6 +1597,10 @@ public class ObjectSerDeUtils {
FREQUENT_LONGS_SKETCH_SER_DE,
HYPER_LOG_LOG_PLUS_SER_DE,
DATA_SKETCH_CPC_SER_DE,
+ INT_ARRAY_LIST_SER_DE,
+ LONG_ARRAY_LIST_SER_DE,
+ FLOAT_ARRAY_LIST_SER_DE,
+ STRING_ARRAY_LIST_SER_DE,
};
//@formatter:on
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 6801d90305..cf13255f8b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -19,7 +19,10 @@
package org.apache.pinot.core.operator.blocks.results;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Collections;
@@ -191,12 +194,21 @@ public class AggregationResultsBlock extends
BaseResultsBlock {
case BYTES:
dataTableBuilder.setColumn(index, (ByteArray) result);
break;
- case DOUBLE_ARRAY:
- dataTableBuilder.setColumn(index, ((DoubleArrayList)
result).elements());
+ case INT_ARRAY:
+ dataTableBuilder.setColumn(index, ((IntArrayList) result).elements());
break;
case LONG_ARRAY:
dataTableBuilder.setColumn(index, ((LongArrayList) result).elements());
break;
+ case FLOAT_ARRAY:
+ dataTableBuilder.setColumn(index, ((FloatArrayList)
result).elements());
+ break;
+ case DOUBLE_ARRAY:
+ dataTableBuilder.setColumn(index, ((DoubleArrayList)
result).elements());
+ break;
+ case STRING_ARRAY:
+ dataTableBuilder.setColumn(index, ((ObjectArrayList<String>)
result).toArray(new String[0]));
+ break;
default:
throw new IllegalStateException("Illegal column data type in final
result: " + columnDataType);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 0480141f17..987342ac1a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -23,6 +23,16 @@ import java.util.List;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
+import
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDistinctDoubleFunction;
+import
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDistinctFloatFunction;
+import
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDistinctIntFunction;
+import
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDistinctLongFunction;
+import
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDistinctStringFunction;
+import
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDoubleFunction;
+import
org.apache.pinot.core.query.aggregation.function.array.ArrayAggFloatFunction;
+import
org.apache.pinot.core.query.aggregation.function.array.ArrayAggIntFunction;
+import
org.apache.pinot.core.query.aggregation.function.array.ArrayAggLongFunction;
+import
org.apache.pinot.core.query.aggregation.function.array.ArrayAggStringFunction;
import
org.apache.pinot.core.query.aggregation.function.funnel.FunnelCountAggregationFunctionFactory;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -226,6 +236,58 @@ public class AggregationFunctionFactory {
throw new IllegalArgumentException("Unsupported data type for
FIRST_WITH_TIME: " + dataType);
}
}
+ case ARRAYAGG: {
+ Preconditions.checkArgument(numArguments >= 2,
+ "ARRAY_AGG expects 2 or 3 arguments, got: %s. The function can
be used as "
+ + "arrayAgg(dataColumn, 'dataType', ['isDistinct'])",
numArguments);
+ ExpressionContext dataTypeExp = arguments.get(1);
+ Preconditions.checkArgument(dataTypeExp.getType() ==
ExpressionContext.Type.LITERAL,
+ "ARRAY_AGG expects the 2nd argument to be literal, got: %s.
The function can be used as "
+ + "arrayAgg(dataColumn, 'dataType', ['isDistinct'])",
dataTypeExp.getType());
+ DataType dataType =
DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase());
+ boolean isDistinct = false;
+ if (numArguments == 3) {
+ ExpressionContext isDistinctExp = arguments.get(2);
+ Preconditions.checkArgument(isDistinctExp.getType() ==
ExpressionContext.Type.LITERAL,
+ "ARRAY_AGG expects the 3rd argument to be literal, got: %s.
The function can be used as "
+ + "arrayAgg(dataColumn, 'dataType', ['isDistinct'])",
isDistinctExp.getType());
+ isDistinct = isDistinctExp.getLiteral().getBooleanValue();
+ }
+ if (isDistinct) {
+ switch (dataType) {
+ case BOOLEAN:
+ case INT:
+ return new ArrayAggDistinctIntFunction(firstArgument,
dataType, nullHandlingEnabled);
+ case LONG:
+ case TIMESTAMP:
+ return new ArrayAggDistinctLongFunction(firstArgument,
dataType, nullHandlingEnabled);
+ case FLOAT:
+ return new ArrayAggDistinctFloatFunction(firstArgument,
nullHandlingEnabled);
+ case DOUBLE:
+ return new ArrayAggDistinctDoubleFunction(firstArgument,
nullHandlingEnabled);
+ case STRING:
+ return new ArrayAggDistinctStringFunction(firstArgument,
nullHandlingEnabled);
+ default:
+ throw new IllegalArgumentException("Unsupported data type
for ARRAY_AGG: " + dataType);
+ }
+ }
+ switch (dataType) {
+ case BOOLEAN:
+ case INT:
+ return new ArrayAggIntFunction(firstArgument, dataType,
nullHandlingEnabled);
+ case LONG:
+ case TIMESTAMP:
+ return new ArrayAggLongFunction(firstArgument, dataType,
nullHandlingEnabled);
+ case FLOAT:
+ return new ArrayAggFloatFunction(firstArgument,
nullHandlingEnabled);
+ case DOUBLE:
+ return new ArrayAggDoubleFunction(firstArgument,
nullHandlingEnabled);
+ case STRING:
+ return new ArrayAggStringFunction(firstArgument,
nullHandlingEnabled);
+ default:
+ throw new IllegalArgumentException("Unsupported data type for
ARRAY_AGG: " + dataType);
+ }
+ }
case LASTWITHTIME: {
Preconditions.checkArgument(numArguments == 3,
"LAST_WITH_TIME expects 3 arguments, got: %s. The function can
be used as "
@@ -368,7 +430,6 @@ public class AggregationFunctionFactory {
return new DistinctCountCPCSketchAggregationFunction(arguments);
case DISTINCTCOUNTRAWCPCSKETCH:
return new DistinctCountRawCPCSketchAggregationFunction(arguments);
-
default:
throw new IllegalArgumentException("Unsupported aggregation
function type: " + functionType);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctDoubleFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctDoubleFunction.java
new file mode 100644
index 0000000000..1493ad82aa
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctDoubleFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDistinctDoubleFunction extends
BaseArrayAggDoubleFunction<DoubleOpenHashSet> {
+ public ArrayAggDistinctDoubleFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled);
+ }
+
+ @Override
+ protected void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder, BlockValSet blockValSet) {
+ DoubleOpenHashSet valueArray = new DoubleOpenHashSet(length);
+ double[] value = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ valueArray.add(value[i]);
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void aggregateArrayWithNull(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+ DoubleOpenHashSet valueArray = new DoubleOpenHashSet(length);
+ double[] value = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ valueArray.add(value[i]);
+ }
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, double value) {
+ ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder)
groupByResultHolder;
+ if (resultHolder.getResult(groupKey) == null) {
+ DoubleOpenHashSet valueArray = new DoubleOpenHashSet();
+ valueArray.add(value);
+ resultHolder.setValueForKey(groupKey, valueArray);
+ } else {
+ DoubleOpenHashSet valueArray = resultHolder.getResult(groupKey);
+ valueArray.add(value);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctFloatFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctFloatFunction.java
new file mode 100644
index 0000000000..d9ad31ca35
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctFloatFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDistinctFloatFunction extends
BaseArrayAggFloatFunction<FloatOpenHashSet> {
+ public ArrayAggDistinctFloatFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled);
+ }
+
+ @Override
+ protected void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder, BlockValSet blockValSet) {
+ FloatOpenHashSet valueArray = new FloatOpenHashSet(length);
+ float[] value = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ valueArray.add(value[i]);
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void aggregateArrayWithNull(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+ FloatOpenHashSet valueArray = new FloatOpenHashSet(length);
+ float[] value = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ valueArray.add(value[i]);
+ }
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, float value) {
+ ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder)
groupByResultHolder;
+ if (resultHolder.getResult(groupKey) == null) {
+ FloatOpenHashSet valueArray = new FloatOpenHashSet();
+ valueArray.add(value);
+ resultHolder.setValueForKey(groupKey, valueArray);
+ } else {
+ FloatOpenHashSet valueArray = resultHolder.getResult(groupKey);
+ valueArray.add(value);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctIntFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctIntFunction.java
new file mode 100644
index 0000000000..3aa757e311
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctIntFunction.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDistinctIntFunction extends
BaseArrayAggIntFunction<IntOpenHashSet> {
+ public ArrayAggDistinctIntFunction(ExpressionContext expression,
FieldSpec.DataType dataType,
+ boolean nullHandlingEnabled) {
+ super(expression, dataType, nullHandlingEnabled);
+ }
+
+ @Override
+ protected void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet) {
+ ObjectAggregationResultHolder resultHolder =
(ObjectAggregationResultHolder) aggregationResultHolder;
+ int[] value = blockValSet.getIntValuesSV();
+ IntOpenHashSet valueArray = new IntOpenHashSet();
+ for (int i = 0; i < length; i++) {
+ valueArray.add(value[i]);
+ }
+ resultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void aggregateArrayWithNull(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+ ObjectAggregationResultHolder resultHolder =
(ObjectAggregationResultHolder) aggregationResultHolder;
+ int[] value = blockValSet.getIntValuesSV();
+ IntOpenHashSet valueArray = new IntOpenHashSet(length);
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ valueArray.add(value[i]);
+ }
+ }
+ resultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, int value) {
+ ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder)
groupByResultHolder;
+ IntOpenHashSet groupValue = resultHolder.getResult(groupKey);
+ if (groupValue == null) {
+ resultHolder.setValueForKey(groupKey, new IntOpenHashSet(value));
+ } else {
+ groupValue.add(value);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctLongFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctLongFunction.java
new file mode 100644
index 0000000000..34ba9fd91b
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctLongFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDistinctLongFunction extends
BaseArrayAggLongFunction<LongOpenHashSet> {
+ public ArrayAggDistinctLongFunction(ExpressionContext expression,
FieldSpec.DataType dataType,
+ boolean nullHandlingEnabled) {
+ super(expression, dataType, nullHandlingEnabled);
+ }
+
+ @Override
+ protected void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet) {
+ long[] value = blockValSet.getLongValuesSV();
+ LongOpenHashSet valueArray = new LongOpenHashSet(length);
+ for (int i = 0; i < length; i++) {
+ valueArray.add(value[i]);
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void aggregateArrayWithNull(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+ long[] value = blockValSet.getLongValuesSV();
+ LongOpenHashSet valueArray = new LongOpenHashSet(length);
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ valueArray.add(value[i]);
+ }
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, long value) {
+ ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder)
groupByResultHolder;
+ if (resultHolder.getResult(groupKey) == null) {
+ resultHolder.setValueForKey(groupKey, new LongOpenHashSet());
+ }
+ LongOpenHashSet groupValue = resultHolder.getResult(groupKey);
+ groupValue.add(value);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctStringFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctStringFunction.java
new file mode 100644
index 0000000000..947fd59ba1
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctStringFunction.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.Arrays;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDistinctStringFunction extends
BaseArrayAggStringFunction<ObjectOpenHashSet<String>> {
+ public ArrayAggDistinctStringFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled);
+ }
+
+ @Override
+ protected void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder, BlockValSet blockValSet) {
+ ObjectOpenHashSet<String> valueArray = new ObjectOpenHashSet<>(length);
+ String[] value = blockValSet.getStringValuesSV();
+ valueArray.addAll(Arrays.asList(value).subList(0, length));
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void aggregateArrayWithNull(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+ ObjectOpenHashSet<String> valueArray = new ObjectOpenHashSet<>(length);
+ String[] value = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ valueArray.add(value[i]);
+ }
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, String value) {
+ ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder)
groupByResultHolder;
+ if (resultHolder.getResult(groupKey) == null) {
+ ObjectOpenHashSet<String> valueArray = new ObjectOpenHashSet<>();
+ valueArray.add(value);
+ resultHolder.setValueForKey(groupKey, valueArray);
+ } else {
+ ObjectOpenHashSet<String> valueArray = resultHolder.getResult(groupKey);
+ valueArray.add(value);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDoubleFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDoubleFunction.java
new file mode 100644
index 0000000000..20c22592c6
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDoubleFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDoubleFunction extends
BaseArrayAggDoubleFunction<DoubleArrayList> {
+ public ArrayAggDoubleFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled);
+ }
+
+ @Override
+ protected void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder, BlockValSet blockValSet) {
+ DoubleArrayList valueArray = new DoubleArrayList(length);
+ double[] value = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ valueArray.add(value[i]);
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void aggregateArrayWithNull(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+ DoubleArrayList valueArray = new DoubleArrayList(length);
+ double[] value = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ valueArray.add(value[i]);
+ }
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, double value) {
+ ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder)
groupByResultHolder;
+ if (resultHolder.getResult(groupKey) == null) {
+ DoubleArrayList valueArray = new DoubleArrayList();
+ valueArray.add(value);
+ resultHolder.setValueForKey(groupKey, valueArray);
+ } else {
+ DoubleArrayList valueArray = resultHolder.getResult(groupKey);
+ valueArray.add(value);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggFloatFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggFloatFunction.java
new file mode 100644
index 0000000000..9c43cd1138
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggFloatFunction.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggFloatFunction extends
BaseArrayAggFloatFunction<FloatArrayList> {
+ public ArrayAggFloatFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled);
+ }
+
+ @Override
+ protected void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder, BlockValSet blockValSet) {
+ FloatArrayList valueArray = new FloatArrayList(length);
+ float[] value = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ valueArray.add(value[i]);
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void aggregateArrayWithNull(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+ FloatArrayList valueArray = new FloatArrayList(length);
+ float[] value = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ valueArray.add(value[i]);
+ }
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+
+ protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, float value) {
+ ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder)
groupByResultHolder;
+ if (resultHolder.getResult(groupKey) == null) {
+ FloatArrayList valueArray = new FloatArrayList();
+ valueArray.add(value);
+ resultHolder.setValueForKey(groupKey, valueArray);
+ } else {
+ FloatArrayList valueArray = resultHolder.getResult(groupKey);
+ valueArray.add(value);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggIntFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggIntFunction.java
new file mode 100644
index 0000000000..2e6764b190
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggIntFunction.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggIntFunction extends BaseArrayAggIntFunction<IntArrayList>
{
+ public ArrayAggIntFunction(ExpressionContext expression, FieldSpec.DataType
dataType, boolean nullHandlingEnabled) {
+ super(expression, dataType, nullHandlingEnabled);
+ }
+
+ @Override
+ protected void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet) {
+ int[] value = blockValSet.getIntValuesSV();
+ IntArrayList valueArray = new IntArrayList(length);
+ for (int i = 0; i < length; i++) {
+ valueArray.add(value[i]);
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void aggregateArrayWithNull(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+ int[] value = blockValSet.getIntValuesSV();
+ IntArrayList valueArray = new IntArrayList(length);
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ valueArray.add(value[i]);
+ }
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, int value) {
+ IntArrayList groupValue = groupByResultHolder.getResult(groupKey);
+ if (groupValue == null) {
+ groupByResultHolder.setValueForKey(groupKey, new IntArrayList(value));
+ } else {
+ groupValue.add(value);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggLongFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggLongFunction.java
new file mode 100644
index 0000000000..95fd5954e0
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggLongFunction.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggLongFunction extends
BaseArrayAggLongFunction<LongArrayList> {
+ public ArrayAggLongFunction(ExpressionContext expression, FieldSpec.DataType
dataType, boolean nullHandlingEnabled) {
+ super(expression, dataType, nullHandlingEnabled);
+ }
+
+ @Override
+ protected void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet) {
+ long[] value = blockValSet.getLongValuesSV();
+ LongArrayList valueArray = new LongArrayList(length);
+ for (int i = 0; i < length; i++) {
+ valueArray.add(value[i]);
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void aggregateArrayWithNull(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+ long[] value = blockValSet.getLongValuesSV();
+ LongArrayList valueArray = new LongArrayList(length);
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ valueArray.add(value[i]);
+ }
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, long value) {
+ ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder)
groupByResultHolder;
+ if (resultHolder.getResult(groupKey) == null) {
+ resultHolder.setValueForKey(groupKey, new LongArrayList());
+ }
+ LongArrayList groupValue = resultHolder.getResult(groupKey);
+ groupValue.add(value);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggStringFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggStringFunction.java
new file mode 100644
index 0000000000..ad13868373
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggStringFunction.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import java.util.Arrays;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggStringFunction extends
BaseArrayAggStringFunction<ObjectArrayList<String>> {
+ public ArrayAggStringFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled);
+ }
+
+ @Override
+ protected void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder, BlockValSet blockValSet) {
+ ObjectArrayList<String> valueArray = new ObjectArrayList<>(length);
+ String[] value = blockValSet.getStringValuesSV();
+ valueArray.addAll(Arrays.asList(value).subList(0, length));
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void aggregateArrayWithNull(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+ ObjectArrayList<String> valueArray = new ObjectArrayList<>(length);
+ String[] value = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ valueArray.add(value[i]);
+ }
+ }
+ aggregationResultHolder.setValue(valueArray);
+ }
+
+ @Override
+ protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, String value) {
+ ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder)
groupByResultHolder;
+ if (resultHolder.getResult(groupKey) == null) {
+ ObjectArrayList<String> valueArray = new ObjectArrayList<>();
+ valueArray.add(value);
+ resultHolder.setValueForKey(groupKey, valueArray);
+ } else {
+ ObjectArrayList<String> valueArray = resultHolder.getResult(groupKey);
+ valueArray.add(value);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggDoubleFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggDoubleFunction.java
new file mode 100644
index 0000000000..e9cb01dfe8
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggDoubleFunction.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.doubles.AbstractDoubleCollection;
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggDoubleFunction<I extends
AbstractDoubleCollection>
+ extends BaseArrayAggFunction<I, DoubleArrayList> {
+ public BaseArrayAggDoubleFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, FieldSpec.DataType.DOUBLE, nullHandlingEnabled);
+ }
+
+ abstract void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, double value);
+
+ @Override
+ protected void aggregateArrayGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet) {
+ double[] values = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupBySVWithNull(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap) {
+ double[] values = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+ }
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet) {
+ double[] values = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupByResultHolder, groupKey, values[i]);
+ }
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupByMVWithNull(int length, int[][]
groupKeysArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap) {
+ double[] values = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupByResultHolder, groupKey, values[i]);
+ }
+ }
+ }
+ }
+
+ @Override
+ public I merge(I intermediateResult1, I intermediateResult2) {
+ if (intermediateResult1 == null) {
+ return intermediateResult2;
+ }
+ if (intermediateResult2 == null) {
+ return intermediateResult1;
+ }
+ intermediateResult1.addAll(intermediateResult2);
+ return intermediateResult1;
+ }
+
+ @Override
+ public DoubleArrayList extractFinalResult(I doubleArrayList) {
+ return new DoubleArrayList(doubleArrayList);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFloatFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFloatFunction.java
new file mode 100644
index 0000000000..32314064bd
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFloatFunction.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.floats.AbstractFloatCollection;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggFloatFunction<I extends
AbstractFloatCollection>
+ extends BaseArrayAggFunction<I, FloatArrayList> {
+ public BaseArrayAggFloatFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, FieldSpec.DataType.FLOAT, nullHandlingEnabled);
+ }
+
+ abstract void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, float value);
+
+ @Override
+ protected void aggregateArrayGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet) {
+ float[] values = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupBySVWithNull(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap) {
+ float[] values = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+ }
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet) {
+ float[] values = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupByResultHolder, groupKey, values[i]);
+ }
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupByMVWithNull(int length, int[][]
groupKeysArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap) {
+ float[] values = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ if (!nullBitmap.contains(i)) {
+ setGroupByResult(groupByResultHolder, groupKey, values[i]);
+ }
+ }
+ }
+ }
+
+ @Override
+ public I merge(I intermediateResult1, I intermediateResult2) {
+ if (intermediateResult1 == null) {
+ return intermediateResult2;
+ }
+ if (intermediateResult2 == null) {
+ return intermediateResult1;
+ }
+ intermediateResult1.addAll(intermediateResult2);
+ return intermediateResult1;
+ }
+
+ @Override
+ public FloatArrayList extractFinalResult(I floatArrayList) {
+ return new FloatArrayList(floatArrayList);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFunction.java
new file mode 100644
index 0000000000..7a90d7ef30
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFunction.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import
org.apache.pinot.core.query.aggregation.function.BaseSingleInputAggregationFunction;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggFunction<I, F extends Comparable> extends
BaseSingleInputAggregationFunction<I, F> {
+
+ protected final boolean _nullHandlingEnabled;
+ private final DataSchema.ColumnDataType _resultColumnType;
+
+ public BaseArrayAggFunction(ExpressionContext expression, FieldSpec.DataType
dataType, boolean nullHandlingEnabled) {
+ super(expression);
+ _nullHandlingEnabled = nullHandlingEnabled;
+ _resultColumnType = DataSchema.ColumnDataType.fromDataTypeMV(dataType);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.ARRAYAGG;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+ return DataSchema.ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getFinalResultColumnType() {
+ return _resultColumnType;
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ if (_nullHandlingEnabled) {
+ RoaringBitmap nullBitmap = blockValSet.getNullBitmap();
+ if (nullBitmap != null && !nullBitmap.isEmpty()) {
+ aggregateArrayWithNull(length, aggregationResultHolder, blockValSet,
nullBitmap);
+ return;
+ }
+ }
+ aggregateArray(length, aggregationResultHolder, blockValSet);
+ }
+
+ protected abstract void aggregateArray(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet);
+
+ protected abstract void aggregateArrayWithNull(int length,
AggregationResultHolder aggregationResultHolder,
+ BlockValSet blockValSet, RoaringBitmap nullBitmap);
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ RoaringBitmap nullBitmap = null;
+ if (_nullHandlingEnabled) {
+ nullBitmap = blockValSet.getNullBitmap();
+ if (nullBitmap != null && !nullBitmap.isEmpty()) {
+ aggregateArrayGroupBySV(length, groupKeyArray, groupByResultHolder,
blockValSet);
+ return;
+ }
+ }
+ aggregateArrayGroupBySVWithNull(length, groupKeyArray,
groupByResultHolder, blockValSet, nullBitmap);
+ }
+
+ protected abstract void aggregateArrayGroupBySV(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet);
+
+ protected abstract void aggregateArrayGroupBySVWithNull(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap);
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ if (_nullHandlingEnabled) {
+ RoaringBitmap nullBitmap = blockValSet.getNullBitmap();
+ if (nullBitmap != null && !nullBitmap.isEmpty()) {
+ aggregateArrayGroupByMVWithNull(length, groupKeysArray,
groupByResultHolder, blockValSet, nullBitmap);
+ return;
+ }
+ }
+ aggregateArrayGroupByMV(length, groupKeysArray, groupByResultHolder,
blockValSet);
+ }
+
+ protected abstract void aggregateArrayGroupByMV(int length, int[][]
groupKeysArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet);
+
+ protected abstract void aggregateArrayGroupByMVWithNull(int length, int[][]
groupKeysArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap);
+
+ @Override
+ public I extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ return aggregationResultHolder.getResult();
+ }
+
+ @Override
+ public I extractGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey) {
+ return groupByResultHolder.getResult(groupKey);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggIntFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggIntFunction.java
new file mode 100644
index 0000000000..60af69fd36
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggIntFunction.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.ints.AbstractIntCollection;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggIntFunction<I extends AbstractIntCollection>
+ extends BaseArrayAggFunction<I, IntArrayList> {
+ public BaseArrayAggIntFunction(ExpressionContext expression,
FieldSpec.DataType dataType,
+ boolean nullHandlingEnabled) {
+ super(expression, dataType, nullHandlingEnabled);
+ }
+
+ abstract void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, int value);
+
+ @Override
+ protected void aggregateArrayGroupBySV(int length, int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet) {
+ int[] values = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupBySVWithNull(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap) {
+ int[] values = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+ }
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupByMV(int length, int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet) {
+ int[] values = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ int[] groupKeys = groupKeysArray[i];
+ int value = values[i];
+ for (int groupKey : groupKeys) {
+ setGroupByResult(groupByResultHolder, groupKey, value);
+ }
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupByMVWithNull(int length, int[][]
groupKeysArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap) {
+ int[] values = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ int[] groupKeys = groupKeysArray[i];
+ int value = values[i];
+ for (int groupKey : groupKeys) {
+ setGroupByResult(groupByResultHolder, groupKey, value);
+ }
+ }
+ }
+ }
+
+ @Override
+ public I merge(I intermediateResult1, I intermediateResult2) {
+ if (intermediateResult1 == null || intermediateResult1.isEmpty()) {
+ return intermediateResult2;
+ }
+ if (intermediateResult2 == null || intermediateResult2.isEmpty()) {
+ return intermediateResult1;
+ }
+ intermediateResult1.addAll(intermediateResult2);
+ return intermediateResult1;
+ }
+
+ @Override
+ public IntArrayList extractFinalResult(I intArrayList) {
+ return new IntArrayList(intArrayList);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggLongFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggLongFunction.java
new file mode 100644
index 0000000000..edb037f06d
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggLongFunction.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.longs.AbstractLongCollection;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggLongFunction<I extends
AbstractLongCollection>
+ extends BaseArrayAggFunction<I, LongArrayList> {
+ public BaseArrayAggLongFunction(ExpressionContext expression,
FieldSpec.DataType dataType,
+ boolean nullHandlingEnabled) {
+ super(expression, dataType, nullHandlingEnabled);
+ }
+
+ abstract void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, long value);
+
+ @Override
+ protected void aggregateArrayGroupBySV(int length, int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet) {
+ long[] values = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupBySVWithNull(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap) {
+ long[] values = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+ }
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupByMV(int length, int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet) {
+ long[] values = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ int[] groupKeys = groupKeysArray[i];
+ for (int groupKey : groupKeys) {
+ setGroupByResult(groupByResultHolder, groupKey, values[i]);
+ }
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupByMVWithNull(int length, int[][]
groupKeysArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap) {
+ long[] values = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ int[] groupKeys = groupKeysArray[i];
+ for (int groupKey : groupKeys) {
+ setGroupByResult(groupByResultHolder, groupKey, values[i]);
+ }
+ }
+ }
+ }
+
+ @Override
+ public I merge(I intermediateResult1, I intermediateResult2) {
+ if (intermediateResult1 == null || intermediateResult1.isEmpty()) {
+ return intermediateResult2;
+ }
+ if (intermediateResult2 == null || intermediateResult2.isEmpty()) {
+ return intermediateResult1;
+ }
+ intermediateResult1.addAll(intermediateResult2);
+ return intermediateResult1;
+ }
+
+ @Override
+ public LongArrayList extractFinalResult(I arrayList) {
+ return new LongArrayList(arrayList);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
new file mode 100644
index 0000000000..5f5b98f7c8
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.objects.AbstractObjectCollection;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggStringFunction<I extends
AbstractObjectCollection<String>>
+ extends BaseArrayAggFunction<I, ObjectArrayList<String>> {
+ public BaseArrayAggStringFunction(ExpressionContext expression, boolean
nullHandlingEnabled) {
+ super(expression, FieldSpec.DataType.STRING, nullHandlingEnabled);
+ }
+
+ abstract void setGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey, String value);
+
+ @Override
+ protected void aggregateArrayGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet) {
+ String[] values = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupBySVWithNull(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap) {
+ String[] values = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+ }
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet) {
+ String[] values = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupByResultHolder, groupKey, values[i]);
+ }
+ }
+ }
+
+ @Override
+ protected void aggregateArrayGroupByMVWithNull(int length, int[][]
groupKeysArray,
+ GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
RoaringBitmap nullBitmap) {
+ String[] values = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ if (!nullBitmap.contains(i)) {
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupByResultHolder, groupKey, values[i]);
+ }
+ }
+ }
+ }
+
+ @Override
+ public I merge(I intermediateResult1, I intermediateResult2) {
+ if (intermediateResult1 == null) {
+ return intermediateResult2;
+ }
+ if (intermediateResult2 == null) {
+ return intermediateResult1;
+ }
+ intermediateResult1.addAll(intermediateResult2);
+ return intermediateResult1;
+ }
+
+ @Override
+ public ObjectArrayList<String> extractFinalResult(I stringArrayList) {
+ return new ObjectArrayList<>(stringArrayList);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 8116a29373..dd2ad2e75a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -23,10 +23,14 @@ import com.tdunning.math.stats.TDigest;
import it.unimi.dsi.fastutil.doubles.Double2LongOpenHashMap;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.floats.Float2LongOpenHashMap;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -392,4 +396,60 @@ public class ObjectSerDeUtilsTest {
assertEquals(actual.getEstimate(), sketch.getEstimate(), ERROR_MESSAGE);
}
}
+
+ @Test
+ public void testIntArrayList() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ int size = RANDOM.nextInt(100);
+ IntArrayList expected = new IntArrayList(size);
+ for (int j = 0; j < size; j++) {
+ expected.add(RANDOM.nextInt());
+ }
+ byte[] bytes = ObjectSerDeUtils.serialize(expected);
+ IntArrayList actual = ObjectSerDeUtils.deserialize(bytes,
ObjectSerDeUtils.ObjectType.IntArrayList);
+ assertEquals(actual, expected, ERROR_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testLongArrayList() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ int size = RANDOM.nextInt(100);
+ LongArrayList expected = new LongArrayList(size);
+ for (int j = 0; j < size; j++) {
+ expected.add(RANDOM.nextLong());
+ }
+ byte[] bytes = ObjectSerDeUtils.serialize(expected);
+ LongArrayList actual = ObjectSerDeUtils.deserialize(bytes,
ObjectSerDeUtils.ObjectType.LongArrayList);
+ assertEquals(actual, expected, ERROR_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testFloatArrayList() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ int size = RANDOM.nextInt(100);
+ FloatArrayList expected = new FloatArrayList(size);
+ for (int j = 0; j < size; j++) {
+ expected.add(RANDOM.nextFloat());
+ }
+ byte[] bytes = ObjectSerDeUtils.serialize(expected);
+ FloatArrayList actual = ObjectSerDeUtils.deserialize(bytes,
ObjectSerDeUtils.ObjectType.FloatArrayList);
+ assertEquals(actual, expected, ERROR_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testStringArrayList() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ int size = RANDOM.nextInt(100);
+ ObjectArrayList<String> expected = new ObjectArrayList<>(size);
+ for (int j = 0; j < size; j++) {
+ expected.add(RandomStringUtils.random(RANDOM.nextInt(20)));
+ }
+ byte[] bytes = ObjectSerDeUtils.serialize(expected);
+ ObjectArrayList<String> actual = ObjectSerDeUtils.deserialize(bytes,
ObjectSerDeUtils.ObjectType.StringArrayList);
+ assertEquals(actual, expected, ERROR_MESSAGE);
+ }
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/ArrayAggQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/ArrayAggQueriesTest.java
new file mode 100644
index 0000000000..5dc8bc0a11
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ArrayAggQueriesTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Queries test for ArrayAgg queries.
+ */
+@SuppressWarnings("unchecked")
+public class ArrayAggQueriesTest extends BaseQueriesTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"ArrayAggQueriesTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
+ private static final Random RANDOM = new Random();
+
+ private static final int NUM_RECORDS = 2000;
+ private static final int MAX_VALUE = 1000;
+
+ private static final String INT_COLUMN = "intColumn";
+ private static final String LONG_COLUMN = "longColumn";
+ private static final String FLOAT_COLUMN = "floatColumn";
+ private static final String DOUBLE_COLUMN = "doubleColumn";
+ private static final String STRING_COLUMN = "stringColumn";
+ private static final Schema SCHEMA = new
Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+ .addSingleValueDimension(LONG_COLUMN,
DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+ .addSingleValueDimension(DOUBLE_COLUMN,
DataType.DOUBLE).addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+ .build();
+ private static final TableConfig TABLE_CONFIG =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+ private Set<Integer> _values;
+ private int[] _expectedCardinalityResults;
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+
+ @Override
+ protected String getFilter() {
+ return "";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+
+ List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+ int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE);
+ _values = new HashSet<>(hashMapCapacity);
+ Set<Integer> longResultSet = new HashSet<>(hashMapCapacity);
+ Set<Integer> floatResultSet = new HashSet<>(hashMapCapacity);
+ Set<Integer> doubleResultSet = new HashSet<>(hashMapCapacity);
+ Set<Integer> stringResultSet = new HashSet<>(hashMapCapacity);
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ int value = RANDOM.nextInt(MAX_VALUE);
+ GenericRow record = new GenericRow();
+ record.putValue(INT_COLUMN, value);
+ _values.add(Integer.hashCode(value));
+ record.putValue(LONG_COLUMN, (long) value);
+ longResultSet.add(Long.hashCode(value));
+ record.putValue(FLOAT_COLUMN, (float) value);
+ floatResultSet.add(Float.hashCode(value));
+ record.putValue(DOUBLE_COLUMN, (double) value);
+ doubleResultSet.add(Double.hashCode(value));
+ String stringValue = Integer.toString(value);
+ record.putValue(STRING_COLUMN, stringValue);
+ stringResultSet.add(stringValue.hashCode());
+ records.add(record);
+ }
+ _expectedCardinalityResults = new int[]{
+ _values.size(), longResultSet.size(), floatResultSet.size(),
doubleResultSet.size(), stringResultSet.size()
+ };
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+ driver.build();
+
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+ }
+
+ @Test
+ public void testArrayAggNonDistinct() {
+ String query =
+ "SELECT ArrayAgg(intColumn, 'INT'), ArrayAgg(longColumn, 'LONG'),
ArrayAgg(floatColumn, 'FLOAT'), "
+ + "ArrayAgg(doubleColumn, 'DOUBLE'), ArrayAgg(stringColumn,
'STRING')"
+ + " FROM testTable";
+
+ // Inner segment
+ AggregationOperator aggregationOperator = getOperator(query);
+ AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ 5 * NUM_RECORDS, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getResults();
+ assertNotNull(aggregationResult);
+ for (int i = 0; i < 5; i++) {
+ assertEquals(((List) aggregationResult.get(i)).size(), NUM_RECORDS);
+ }
+
+ // Inter segments
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+ assertEquals(resultTable.getRows().get(0).length, 5);
+ assertEquals(((int[]) resultTable.getRows().get(0)[0]).length, 4 *
NUM_RECORDS);
+ assertEquals(((long[]) resultTable.getRows().get(0)[1]).length, 4 *
NUM_RECORDS);
+ assertEquals(((float[]) resultTable.getRows().get(0)[2]).length, 4 *
NUM_RECORDS);
+ assertEquals(((double[]) resultTable.getRows().get(0)[3]).length, 4 *
NUM_RECORDS);
+ assertEquals(((String[]) resultTable.getRows().get(0)[4]).length, 4 *
NUM_RECORDS);
+ }
+
+ @Test
+ public void testArrayAggDistinct() {
+ String query =
+ "SELECT ArrayAgg(intColumn, 'INT', true), ArrayAgg(longColumn, 'LONG',
true), "
+ + "ArrayAgg(floatColumn, 'FLOAT', true), ArrayAgg(doubleColumn,
'DOUBLE', true), "
+ + "ArrayAgg(stringColumn, 'STRING', true)"
+ + " FROM testTable";
+
+ // Inner segment
+ AggregationOperator aggregationOperator = getOperator(query);
+ AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ 5 * NUM_RECORDS, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getResults();
+ assertNotNull(aggregationResult);
+ for (int i = 0; i < 5; i++) {
+ assertEquals(((Set) aggregationResult.get(i)).size(),
_expectedCardinalityResults[i]);
+ }
+
+ // Inter segments
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+ assertEquals(resultTable.getRows().get(0).length, 5);
+ assertEquals(((int[]) resultTable.getRows().get(0)[0]).length,
_expectedCardinalityResults[0]);
+ assertEquals(((long[]) resultTable.getRows().get(0)[1]).length,
_expectedCardinalityResults[1]);
+ assertEquals(((float[]) resultTable.getRows().get(0)[2]).length,
_expectedCardinalityResults[2]);
+ assertEquals(((double[]) resultTable.getRows().get(0)[3]).length,
_expectedCardinalityResults[3]);
+ assertEquals(((String[]) resultTable.getRows().get(0)[4]).length,
_expectedCardinalityResults[4]);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ _indexSegment.destroy();
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
new file mode 100644
index 0000000000..0cf7ebdbf8
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class ArrayTest extends CustomDataQueryClusterIntegrationTest {
+
+ private static final String DEFAULT_TABLE_NAME = "ArrayTest";
+ private static final String BOOLEAN_COLUMN = "boolCol";
+ private static final String INT_COLUMN = "intCol";
+ private static final String LONG_COLUMN = "longCol";
+ private static final String FLOAT_COLUMN = "floatCol";
+ private static final String DOUBLE_COLUMN = "doubleCol";
+ private static final String STRING_COLUMN = "stringCol";
+ private static final String TIMESTAMP_COLUMN = "timestampCol";
+
+ @Override
+ protected long getCountStarResult() {
+ return 1000;
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query =
+ String.format("SELECT "
+ + "arrayAgg(boolCol, 'BOOLEAN'), "
+ + "arrayAgg(intCol, 'INT'), "
+ + "arrayAgg(longCol, 'LONG'), "
+ // NOTE: FLOAT array is auto converted to DOUBLE array
+ + (useMultiStageQueryEngine ? "arrayAgg(floatCol, 'DOUBLE'), " :
"arrayAgg(floatCol, 'FLOAT'), ")
+ + "arrayAgg(doubleCol, 'DOUBLE'), "
+ + "arrayAgg(stringCol, 'STRING'), "
+ + "arrayAgg(timestampCol, 'TIMESTAMP') "
+ + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+ JsonNode jsonNode = postQuery(query);
+ System.out.println(jsonNode);
+ Assert.assertEquals(jsonNode.get("resultTable").get("rows").size(), 1);
+ Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).size(),
7);
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).size(),
getCountStarResult());
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).size(),
getCountStarResult());
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(2).size(),
getCountStarResult());
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(3).size(),
getCountStarResult());
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(4).size(),
getCountStarResult());
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testQueryWithDistinct(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query =
+ String.format("SELECT "
+ + "arrayAgg(boolCol, 'BOOLEAN', true), "
+ + "arrayAgg(intCol, 'INT', true), "
+ + "arrayAgg(longCol, 'LONG', true), "
+ // NOTE: FLOAT array is auto converted to DOUBLE array
+ + (useMultiStageQueryEngine ? "arrayAgg(floatCol, 'DOUBLE', true),
"
+ : "arrayAgg(floatCol, 'FLOAT', true), ")
+ + "arrayAgg(doubleCol, 'DOUBLE', true), "
+ + "arrayAgg(stringCol, 'STRING', true), "
+ + "arrayAgg(timestampCol, 'TIMESTAMP', true) "
+ + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+ JsonNode jsonNode = postQuery(query);
+ System.out.println(jsonNode);
+ Assert.assertEquals(jsonNode.get("resultTable").get("rows").size(), 1);
+ Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).size(),
7);
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).size(),
2);
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).size(),
getCountStarResult() / 10);
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(2).size(),
getCountStarResult() / 10);
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(3).size(),
getCountStarResult() / 10);
+
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(4).size(),
getCountStarResult() / 10);
+ }
+
+ @Override
+ public String getTableName() {
+ return DEFAULT_TABLE_NAME;
+ }
+
+ @Override
+ public Schema createSchema() {
+ return new Schema.SchemaBuilder().setSchemaName(getTableName())
+ .addSingleValueDimension(BOOLEAN_COLUMN, FieldSpec.DataType.BOOLEAN)
+ .addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
+ .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(FLOAT_COLUMN, FieldSpec.DataType.FLOAT)
+ .addSingleValueDimension(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE)
+ .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(TIMESTAMP_COLUMN,
FieldSpec.DataType.TIMESTAMP)
+ .build();
+ }
+
+ @Override
+ public File createAvroFile()
+ throws Exception {
+ // create avro schema
+ org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+ avroSchema.setFields(ImmutableList.of(
+ new org.apache.avro.Schema.Field(BOOLEAN_COLUMN,
+ org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN),
+ null, null),
+ new org.apache.avro.Schema.Field(INT_COLUMN,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT),
+ null, null),
+ new org.apache.avro.Schema.Field(LONG_COLUMN,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
+ null, null),
+ new org.apache.avro.Schema.Field(FLOAT_COLUMN,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT),
+ null, null),
+ new org.apache.avro.Schema.Field(DOUBLE_COLUMN,
+ org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE),
+ null, null),
+ new org.apache.avro.Schema.Field(STRING_COLUMN,
+ org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
+ null, null),
+ new org.apache.avro.Schema.Field(TIMESTAMP_COLUMN,
+ org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
+ null, null)
+ ));
+
+ // create avro file
+ File avroFile = new File(_tempDir, "data.avro");
+ Cache<Integer, GenericData.Record> recordCache =
CacheBuilder.newBuilder().build();
+ try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ fileWriter.create(avroSchema, avroFile);
+ for (int i = 0; i < getCountStarResult(); i++) {
+ // add avro record to file
+ fileWriter.append(recordCache.get((int) (i % (getCountStarResult() /
10)), () -> {
+ // create avro record
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put(BOOLEAN_COLUMN, RANDOM.nextBoolean());
+ record.put(INT_COLUMN, RANDOM.nextInt());
+ record.put(LONG_COLUMN, RANDOM.nextLong());
+ record.put(FLOAT_COLUMN, RANDOM.nextFloat());
+ record.put(DOUBLE_COLUMN, RANDOM.nextDouble());
+ record.put(STRING_COLUMN,
RandomStringUtils.random(RANDOM.nextInt(100)));
+ record.put(TIMESTAMP_COLUMN, RANDOM.nextLong());
+ return record;
+ }
+ ));
+ }
+ }
+ return avroFile;
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/TypeUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/TypeUtils.java
index 4a609c737e..a50d48b31a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/TypeUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/TypeUtils.java
@@ -19,7 +19,10 @@
package org.apache.pinot.query.runtime.operator.utils;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -45,20 +48,41 @@ public class TypeUtils {
// For AggregationFunctions that return serialized custom object, e.g.
DistinctCountRawHLLAggregationFunction
case STRING:
return value.toString();
+ case INT_ARRAY:
+ if (value instanceof IntArrayList) {
+ // For ArrayAggregationFunction
+ return ((IntArrayList) value).elements();
+ } else {
+ return value;
+ }
case LONG_ARRAY:
if (value instanceof LongArrayList) {
- // For FunnelCountAggregationFunction
+ // For FunnelCountAggregationFunction and ArrayAggregationFunction
return ((LongArrayList) value).elements();
} else {
return value;
}
+ case FLOAT_ARRAY:
+ if (value instanceof FloatArrayList) {
+ // For ArrayAggregationFunction
+ return ((FloatArrayList) value).elements();
+ } else {
+ return value;
+ }
case DOUBLE_ARRAY:
if (value instanceof DoubleArrayList) {
- // For HistogramAggregationFunction
+ // For HistogramAggregationFunction and ArrayAggregationFunction
return ((DoubleArrayList) value).elements();
} else {
return value;
}
+ case STRING_ARRAY:
+ if (value instanceof ObjectArrayList) {
+ // For ArrayAggregationFunction
+ return ((ObjectArrayList<String>) value).toArray(new String[0]);
+ } else {
+ return value;
+ }
// TODO: Add more conversions
default:
return value;
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 5a7c127e54..928670eeba 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -308,6 +308,11 @@ public enum AggregationFunctionType {
OperandTypes.family(ImmutableList.of(SqlTypeFamily.INTEGER,
SqlTypeFamily.ANY), ordinal -> ordinal > 3),
ReturnTypes.ARG1, ReturnTypes.explicit(SqlTypeName.OTHER)),
+ // Array aggregate functions
+ ARRAYAGG("arrayAgg", null, SqlKind.ARRAY_AGG,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.STRING, SqlTypeFamily.BOOLEAN),
+ ordinal -> ordinal > 1), ReturnTypes.TO_ARRAY,
ReturnTypes.explicit(SqlTypeName.OTHER)),
+
// funnel aggregate functions
// TODO: revisit support for funnel count in V2
FUNNELCOUNT("funnelCount");
@@ -357,12 +362,12 @@ public enum AggregationFunctionType {
* Constructor to use for aggregation functions which are supported in both
v1 and multistage engines with
* different behavior comparing to Calcite and requires literal operand
inputs.
*
- * @param name name of the agg function
- * @param alternativeNames alternative name of the agg function.
- * @param sqlKind sql kind indicator, used by Calcite
- * @param sqlFunctionCategory function catalog, used by Calcite
- * @param operandTypeChecker input operand type signature, used by Calcite
- * @param finalReturnType final output type signature, used by Calcite
+ * @param name name of the agg function
+ * @param alternativeNames alternative name of the agg function.
+ * @param sqlKind sql kind indicator, used by Calcite
+ * @param sqlFunctionCategory function catalog, used by Calcite
+ * @param operandTypeChecker input operand type signature, used by
Calcite
+ * @param finalReturnType final output type signature, used by Calcite
* @param intermediateReturnType intermediate output type signature, used by
Pinot and Calcite
*/
AggregationFunctionType(String name, @Nullable List<String>
alternativeNames, @Nullable SqlKind sqlKind,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]