This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ecac6c9376 Add ArrayAgg aggregation function (#11822)
ecac6c9376 is described below

commit ecac6c9376e44d355b9363a0a9b6c2cd59d142ea
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Oct 21 09:07:36 2023 -0700

    Add ArrayAgg aggregation function (#11822)
---
 .../org/apache/pinot/common/utils/DataSchema.java  |  51 ++++-
 .../apache/pinot/core/common/ObjectSerDeUtils.java | 168 ++++++++++++++++-
 .../blocks/results/AggregationResultsBlock.java    |  16 +-
 .../function/AggregationFunctionFactory.java       |  63 ++++++-
 .../array/ArrayAggDistinctDoubleFunction.java      |  70 +++++++
 .../array/ArrayAggDistinctFloatFunction.java       |  70 +++++++
 .../array/ArrayAggDistinctIntFunction.java         |  74 ++++++++
 .../array/ArrayAggDistinctLongFunction.java        |  70 +++++++
 .../array/ArrayAggDistinctStringFunction.java      |  69 +++++++
 .../function/array/ArrayAggDoubleFunction.java     |  70 +++++++
 .../function/array/ArrayAggFloatFunction.java      |  71 +++++++
 .../function/array/ArrayAggIntFunction.java        |  67 +++++++
 .../function/array/ArrayAggLongFunction.java       |  69 +++++++
 .../function/array/ArrayAggStringFunction.java     |  69 +++++++
 .../function/array/BaseArrayAggDoubleFunction.java |  98 ++++++++++
 .../function/array/BaseArrayAggFloatFunction.java  |  98 ++++++++++
 .../function/array/BaseArrayAggFunction.java       | 141 ++++++++++++++
 .../function/array/BaseArrayAggIntFunction.java    | 103 ++++++++++
 .../function/array/BaseArrayAggLongFunction.java   | 101 ++++++++++
 .../function/array/BaseArrayAggStringFunction.java |  98 ++++++++++
 .../pinot/core/common/ObjectSerDeUtilsTest.java    |  60 ++++++
 .../apache/pinot/queries/ArrayAggQueriesTest.java  | 209 +++++++++++++++++++++
 .../pinot/integration/tests/custom/ArrayTest.java  | 174 +++++++++++++++++
 .../query/runtime/operator/utils/TypeUtils.java    |  28 ++-
 .../pinot/segment/spi/AggregationFunctionType.java |  17 +-
 25 files changed, 2104 insertions(+), 20 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index e9a7e858fe..9bd8dacef1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -23,7 +23,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -52,7 +55,9 @@ public class DataSchema {
   private final ColumnDataType[] _columnDataTypes;
   private ColumnDataType[] _storedColumnDataTypes;
 
-  /** Used by both Broker and Server to generate results for EXPLAIN PLAN 
queries. */
+  /**
+   * Used by both Broker and Server to generate results for EXPLAIN PLAN 
queries.
+   */
   public static final DataSchema EXPLAIN_RESULT_SCHEMA =
       new DataSchema(new String[]{"Operator", "Operator_Id", "Parent_Id"}, new 
ColumnDataType[]{
           ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.INT
@@ -425,19 +430,19 @@ public class DataSchema {
         case BYTES:
           return ((ByteArray) value).getBytes();
         case INT_ARRAY:
-          return (int[]) value;
+          return toIntArray(value);
         case LONG_ARRAY:
           return toLongArray(value);
         case FLOAT_ARRAY:
-          return (float[]) value;
+          return toFloatArray(value);
         case DOUBLE_ARRAY:
           return toDoubleArray(value);
         case STRING_ARRAY:
-          return (String[]) value;
+          return toStringArray(value);
         case BOOLEAN_ARRAY:
-          return toBooleanArray((int[]) value);
+          return toBooleanArray(toIntArray(value));
         case TIMESTAMP_ARRAY:
-          return toTimestampArray((long[]) value);
+          return toTimestampArray(toLongArray(value));
         case BYTES_ARRAY:
           return (byte[][]) value;
         case UNKNOWN: // fall through
@@ -513,11 +518,31 @@ public class DataSchema {
       }
     }
 
+    private static int[] toIntArray(Object value) {
+      if (value instanceof int[]) {
+        return (int[]) value;
+      } else if (value instanceof IntArrayList) {
+        // For ArrayAggregationFunction
+        return ((IntArrayList) value).elements();
+      }
+      throw new IllegalStateException(String.format("Cannot convert: '%s' to 
int[]", value));
+    }
+
+    private static float[] toFloatArray(Object value) {
+      if (value instanceof float[]) {
+        return (float[]) value;
+      } else if (value instanceof FloatArrayList) {
+        // For ArrayAggregationFunction
+        return ((FloatArrayList) value).elements();
+      }
+      throw new IllegalStateException(String.format("Cannot convert: '%s' to 
float[]", value));
+    }
+
     private static double[] toDoubleArray(Object value) {
       if (value instanceof double[]) {
         return (double[]) value;
       } else if (value instanceof DoubleArrayList) {
-        // For HistogramAggregationFunction
+        // For HistogramAggregationFunction and ArrayAggregationFunction
         return ((DoubleArrayList) value).elements();
       } else if (value instanceof int[]) {
         int[] intValues = (int[]) value;
@@ -550,7 +575,7 @@ public class DataSchema {
       if (value instanceof long[]) {
         return (long[]) value;
       } else if (value instanceof LongArrayList) {
-        // For FunnelCountAggregationFunction
+        // For FunnelCountAggregationFunction and ArrayAggregationFunction
         return ((LongArrayList) value).elements();
       } else {
         int[] intValues = (int[]) value;
@@ -563,6 +588,16 @@ public class DataSchema {
       }
     }
 
+    private static String[] toStringArray(Object value) {
+      if (value instanceof String[]) {
+        return (String[]) value;
+      } else if (value instanceof ObjectArrayList) {
+        // For ArrayAggregationFunction
+        return ((ObjectArrayList<String>) value).toArray(new String[0]);
+      }
+      throw new IllegalStateException(String.format("Cannot convert: '%s' to 
String[]", value));
+    }
+
     private static boolean[] toBooleanArray(int[] intArray) {
       int length = intArray.length;
       boolean[] booleanArray = new boolean[length];
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 45f1ebd489..0359d99609 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -34,19 +34,23 @@ import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
 import it.unimi.dsi.fastutil.doubles.DoubleSet;
 import it.unimi.dsi.fastutil.floats.Float2LongMap;
 import it.unimi.dsi.fastutil.floats.Float2LongOpenHashMap;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
 import it.unimi.dsi.fastutil.floats.FloatIterator;
 import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
 import it.unimi.dsi.fastutil.floats.FloatSet;
 import it.unimi.dsi.fastutil.ints.Int2LongMap;
 import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.ints.IntIterator;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.longs.Long2LongMap;
 import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.longs.LongIterator;
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import it.unimi.dsi.fastutil.objects.ObjectSet;
 import java.io.IOException;
@@ -144,7 +148,11 @@ public class ObjectSerDeUtils {
     FrequentStringsSketch(38),
     FrequentLongsSketch(39),
     HyperLogLogPlus(40),
-    CompressedProbabilisticCounting(41);
+    CompressedProbabilisticCounting(41),
+    IntArrayList(42),
+    LongArrayList(43),
+    FloatArrayList(44),
+    StringArrayList(45);
 
     private final int _value;
 
@@ -165,8 +173,25 @@ public class ObjectSerDeUtils {
         return ObjectType.Double;
       } else if (value instanceof BigDecimal) {
         return ObjectType.BigDecimal;
+      } else if (value instanceof IntArrayList) {
+        return ObjectType.IntArrayList;
+      } else if (value instanceof LongArrayList) {
+        return ObjectType.LongArrayList;
+      } else if (value instanceof FloatArrayList) {
+        return ObjectType.FloatArrayList;
       } else if (value instanceof DoubleArrayList) {
         return ObjectType.DoubleArrayList;
+      } else if (value instanceof ObjectArrayList) {
+        ObjectArrayList objectArrayList = (ObjectArrayList) value;
+        if (!objectArrayList.isEmpty()) {
+          Object next = objectArrayList.get(0);
+          if (next instanceof String) {
+            return ObjectType.StringArrayList;
+          }
+          throw new IllegalArgumentException(
+              "Unsupported type of value: " + next.getClass().getSimpleName());
+        }
+        return ObjectType.StringArrayList;
       } else if (value instanceof AvgPair) {
         return ObjectType.AvgPair;
       } else if (value instanceof MinMaxRangePair) {
@@ -329,6 +354,99 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<IntArrayList> INT_ARRAY_LIST_SER_DE = new 
ObjectSerDe<IntArrayList>() {
+
+    @Override
+    public byte[] serialize(IntArrayList intArrayList) {
+      int size = intArrayList.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Integer.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      int[] values = intArrayList.elements();
+      for (int i = 0; i < size; i++) {
+        byteBuffer.putInt(values[i]);
+      }
+      return bytes;
+    }
+
+    @Override
+    public IntArrayList deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public IntArrayList deserialize(ByteBuffer byteBuffer) {
+      int numValues = byteBuffer.getInt();
+      IntArrayList intArrayList = new IntArrayList(numValues);
+      for (int i = 0; i < numValues; i++) {
+        intArrayList.add(byteBuffer.getInt());
+      }
+      return intArrayList;
+    }
+  };
+
+  public static final ObjectSerDe<LongArrayList> LONG_ARRAY_LIST_SER_DE = new 
ObjectSerDe<LongArrayList>() {
+
+    @Override
+    public byte[] serialize(LongArrayList longArrayList) {
+      int size = longArrayList.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      long[] values = longArrayList.elements();
+      for (int i = 0; i < size; i++) {
+        byteBuffer.putLong(values[i]);
+      }
+      return bytes;
+    }
+
+    @Override
+    public LongArrayList deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public LongArrayList deserialize(ByteBuffer byteBuffer) {
+      int numValues = byteBuffer.getInt();
+      LongArrayList longArrayList = new LongArrayList(numValues);
+      for (int i = 0; i < numValues; i++) {
+        longArrayList.add(byteBuffer.getLong());
+      }
+      return longArrayList;
+    }
+  };
+
+  public static final ObjectSerDe<FloatArrayList> FLOAT_ARRAY_LIST_SER_DE = 
new ObjectSerDe<FloatArrayList>() {
+
+    @Override
+    public byte[] serialize(FloatArrayList floatArrayList) {
+      int size = floatArrayList.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Float.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      float[] values = floatArrayList.elements();
+      for (int i = 0; i < size; i++) {
+        byteBuffer.putFloat(values[i]);
+      }
+      return bytes;
+    }
+
+    @Override
+    public FloatArrayList deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public FloatArrayList deserialize(ByteBuffer byteBuffer) {
+      int numValues = byteBuffer.getInt();
+      FloatArrayList floatArrayList = new FloatArrayList(numValues);
+      for (int i = 0; i < numValues; i++) {
+        floatArrayList.add(byteBuffer.getFloat());
+      }
+      return floatArrayList;
+    }
+  };
+
   public static final ObjectSerDe<DoubleArrayList> DOUBLE_ARRAY_LIST_SER_DE = 
new ObjectSerDe<DoubleArrayList>() {
 
     @Override
@@ -360,6 +478,50 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<ObjectArrayList> STRING_ARRAY_LIST_SER_DE =
+      new ObjectSerDe<ObjectArrayList>() {
+        @Override
+        public byte[] serialize(ObjectArrayList stringArrayList) {
+          int size = stringArrayList.size();
+          // Besides the value bytes, we store: size, length for each value
+          long bufferSize = (1 + (long) size) * Integer.BYTES;
+          byte[][] valueBytesArray = new byte[size][];
+          for (int index = 0; index < size; index++) {
+            Object value = stringArrayList.get(index);
+            byte[] valueBytes = value.toString().getBytes(UTF_8);
+            bufferSize += valueBytes.length;
+            valueBytesArray[index] = valueBytes;
+          }
+          Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer 
size exceeds 2GB");
+          byte[] bytes = new byte[(int) bufferSize];
+          ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+          byteBuffer.putInt(size);
+          for (byte[] valueBytes : valueBytesArray) {
+            byteBuffer.putInt(valueBytes.length);
+            byteBuffer.put(valueBytes);
+          }
+          return bytes;
+        }
+
+        @Override
+        public ObjectArrayList deserialize(byte[] bytes) {
+          return deserialize(ByteBuffer.wrap(bytes));
+        }
+
+        @Override
+        public ObjectArrayList deserialize(ByteBuffer byteBuffer) {
+          int size = byteBuffer.getInt();
+          ObjectArrayList stringArrayList = new ObjectArrayList(size);
+          for (int i = 0; i < size; i++) {
+            int length = byteBuffer.getInt();
+            byte[] valueBytes = new byte[length];
+            byteBuffer.get(valueBytes);
+            stringArrayList.add(new String(valueBytes, UTF_8));
+          }
+          return stringArrayList;
+        }
+      };
+
   public static final ObjectSerDe<AvgPair> AVG_PAIR_SER_DE = new 
ObjectSerDe<AvgPair>() {
 
     @Override
@@ -1435,6 +1597,10 @@ public class ObjectSerDeUtils {
       FREQUENT_LONGS_SKETCH_SER_DE,
       HYPER_LOG_LOG_PLUS_SER_DE,
       DATA_SKETCH_CPC_SER_DE,
+      INT_ARRAY_LIST_SER_DE,
+      LONG_ARRAY_LIST_SER_DE,
+      FLOAT_ARRAY_LIST_SER_DE,
+      STRING_ARRAY_LIST_SER_DE,
   };
   //@formatter:on
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 6801d90305..cf13255f8b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -19,7 +19,10 @@
 package org.apache.pinot.core.operator.blocks.results;
 
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Collections;
@@ -191,12 +194,21 @@ public class AggregationResultsBlock extends 
BaseResultsBlock {
       case BYTES:
         dataTableBuilder.setColumn(index, (ByteArray) result);
         break;
-      case DOUBLE_ARRAY:
-        dataTableBuilder.setColumn(index, ((DoubleArrayList) 
result).elements());
+      case INT_ARRAY:
+        dataTableBuilder.setColumn(index, ((IntArrayList) result).elements());
         break;
       case LONG_ARRAY:
         dataTableBuilder.setColumn(index, ((LongArrayList) result).elements());
         break;
+      case FLOAT_ARRAY:
+        dataTableBuilder.setColumn(index, ((FloatArrayList) 
result).elements());
+        break;
+      case DOUBLE_ARRAY:
+        dataTableBuilder.setColumn(index, ((DoubleArrayList) 
result).elements());
+        break;
+      case STRING_ARRAY:
+        dataTableBuilder.setColumn(index, ((ObjectArrayList<String>) 
result).toArray(new String[0]));
+        break;
       default:
         throw new IllegalStateException("Illegal column data type in final 
result: " + columnDataType);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 0480141f17..987342ac1a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -23,6 +23,16 @@ import java.util.List;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FunctionContext;
+import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDistinctDoubleFunction;
+import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDistinctFloatFunction;
+import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDistinctIntFunction;
+import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDistinctLongFunction;
+import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDistinctStringFunction;
+import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggDoubleFunction;
+import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggFloatFunction;
+import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggIntFunction;
+import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggLongFunction;
+import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggStringFunction;
 import 
org.apache.pinot.core.query.aggregation.function.funnel.FunnelCountAggregationFunctionFactory;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -226,6 +236,58 @@ public class AggregationFunctionFactory {
                 throw new IllegalArgumentException("Unsupported data type for 
FIRST_WITH_TIME: " + dataType);
             }
           }
+          case ARRAYAGG: {
+            Preconditions.checkArgument(numArguments >= 2,
+                "ARRAY_AGG expects 2 or 3 arguments, got: %s. The function can 
be used as "
+                    + "arrayAgg(dataColumn, 'dataType', ['isDistinct'])", 
numArguments);
+            ExpressionContext dataTypeExp = arguments.get(1);
+            Preconditions.checkArgument(dataTypeExp.getType() == 
ExpressionContext.Type.LITERAL,
+                "ARRAY_AGG expects the 2nd argument to be literal, got: %s. 
The function can be used as "
+                    + "arrayAgg(dataColumn, 'dataType', ['isDistinct'])", 
dataTypeExp.getType());
+            DataType dataType = 
DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase());
+            boolean isDistinct = false;
+            if (numArguments == 3) {
+              ExpressionContext isDistinctExp = arguments.get(2);
+              Preconditions.checkArgument(isDistinctExp.getType() == 
ExpressionContext.Type.LITERAL,
+                  "ARRAY_AGG expects the 3rd argument to be literal, got: %s. 
The function can be used as "
+                      + "arrayAgg(dataColumn, 'dataType', ['isDistinct'])", 
isDistinctExp.getType());
+              isDistinct = isDistinctExp.getLiteral().getBooleanValue();
+            }
+            if (isDistinct) {
+              switch (dataType) {
+                case BOOLEAN:
+                case INT:
+                  return new ArrayAggDistinctIntFunction(firstArgument, 
dataType, nullHandlingEnabled);
+                case LONG:
+                case TIMESTAMP:
+                  return new ArrayAggDistinctLongFunction(firstArgument, 
dataType, nullHandlingEnabled);
+                case FLOAT:
+                  return new ArrayAggDistinctFloatFunction(firstArgument, 
nullHandlingEnabled);
+                case DOUBLE:
+                  return new ArrayAggDistinctDoubleFunction(firstArgument, 
nullHandlingEnabled);
+                case STRING:
+                  return new ArrayAggDistinctStringFunction(firstArgument, 
nullHandlingEnabled);
+                default:
+                  throw new IllegalArgumentException("Unsupported data type 
for ARRAY_AGG: " + dataType);
+              }
+            }
+            switch (dataType) {
+              case BOOLEAN:
+              case INT:
+                return new ArrayAggIntFunction(firstArgument, dataType, 
nullHandlingEnabled);
+              case LONG:
+              case TIMESTAMP:
+                return new ArrayAggLongFunction(firstArgument, dataType, 
nullHandlingEnabled);
+              case FLOAT:
+                return new ArrayAggFloatFunction(firstArgument, 
nullHandlingEnabled);
+              case DOUBLE:
+                return new ArrayAggDoubleFunction(firstArgument, 
nullHandlingEnabled);
+              case STRING:
+                return new ArrayAggStringFunction(firstArgument, 
nullHandlingEnabled);
+              default:
+                throw new IllegalArgumentException("Unsupported data type for 
ARRAY_AGG: " + dataType);
+            }
+          }
           case LASTWITHTIME: {
             Preconditions.checkArgument(numArguments == 3,
                 "LAST_WITH_TIME expects 3 arguments, got: %s. The function can 
be used as "
@@ -368,7 +430,6 @@ public class AggregationFunctionFactory {
             return new DistinctCountCPCSketchAggregationFunction(arguments);
           case DISTINCTCOUNTRAWCPCSKETCH:
             return new DistinctCountRawCPCSketchAggregationFunction(arguments);
-
           default:
             throw new IllegalArgumentException("Unsupported aggregation 
function type: " + functionType);
         }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctDoubleFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctDoubleFunction.java
new file mode 100644
index 0000000000..1493ad82aa
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctDoubleFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDistinctDoubleFunction extends 
BaseArrayAggDoubleFunction<DoubleOpenHashSet> {
+  public ArrayAggDistinctDoubleFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled);
+  }
+
+  @Override
+  protected void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder, BlockValSet blockValSet) {
+    DoubleOpenHashSet valueArray = new DoubleOpenHashSet(length);
+    double[] value = blockValSet.getDoubleValuesSV();
+    for (int i = 0; i < length; i++) {
+      valueArray.add(value[i]);
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void aggregateArrayWithNull(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+    DoubleOpenHashSet valueArray = new DoubleOpenHashSet(length);
+    double[] value = blockValSet.getDoubleValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        valueArray.add(value[i]);
+      }
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, double value) {
+    ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder) 
groupByResultHolder;
+    if (resultHolder.getResult(groupKey) == null) {
+      DoubleOpenHashSet valueArray = new DoubleOpenHashSet();
+      valueArray.add(value);
+      resultHolder.setValueForKey(groupKey, valueArray);
+    } else {
+      DoubleOpenHashSet valueArray = resultHolder.getResult(groupKey);
+      valueArray.add(value);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctFloatFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctFloatFunction.java
new file mode 100644
index 0000000000..d9ad31ca35
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctFloatFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDistinctFloatFunction extends 
BaseArrayAggFloatFunction<FloatOpenHashSet> {
+  public ArrayAggDistinctFloatFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled);
+  }
+
+  @Override
+  protected void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder, BlockValSet blockValSet) {
+    FloatOpenHashSet valueArray = new FloatOpenHashSet(length);
+    float[] value = blockValSet.getFloatValuesSV();
+    for (int i = 0; i < length; i++) {
+      valueArray.add(value[i]);
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void aggregateArrayWithNull(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+    FloatOpenHashSet valueArray = new FloatOpenHashSet(length);
+    float[] value = blockValSet.getFloatValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        valueArray.add(value[i]);
+      }
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, float value) {
+    ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder) 
groupByResultHolder;
+    if (resultHolder.getResult(groupKey) == null) {
+      FloatOpenHashSet valueArray = new FloatOpenHashSet();
+      valueArray.add(value);
+      resultHolder.setValueForKey(groupKey, valueArray);
+    } else {
+      FloatOpenHashSet valueArray = resultHolder.getResult(groupKey);
+      valueArray.add(value);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctIntFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctIntFunction.java
new file mode 100644
index 0000000000..3aa757e311
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctIntFunction.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDistinctIntFunction extends 
BaseArrayAggIntFunction<IntOpenHashSet> {
+  public ArrayAggDistinctIntFunction(ExpressionContext expression, 
FieldSpec.DataType dataType,
+      boolean nullHandlingEnabled) {
+    super(expression, dataType, nullHandlingEnabled);
+  }
+
+  @Override
+  protected void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet) {
+    ObjectAggregationResultHolder resultHolder = 
(ObjectAggregationResultHolder) aggregationResultHolder;
+    int[] value = blockValSet.getIntValuesSV();
+    IntOpenHashSet valueArray = new IntOpenHashSet();
+    for (int i = 0; i < length; i++) {
+      valueArray.add(value[i]);
+    }
+    resultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void aggregateArrayWithNull(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+    ObjectAggregationResultHolder resultHolder = 
(ObjectAggregationResultHolder) aggregationResultHolder;
+    int[] value = blockValSet.getIntValuesSV();
+    IntOpenHashSet valueArray = new IntOpenHashSet(length);
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        valueArray.add(value[i]);
+      }
+    }
+    resultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, int value) {
+    ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder) 
groupByResultHolder;
+    IntOpenHashSet groupValue = resultHolder.getResult(groupKey);
+    if (groupValue == null) {
+      resultHolder.setValueForKey(groupKey, new IntOpenHashSet(value));
+    } else {
+      groupValue.add(value);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctLongFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctLongFunction.java
new file mode 100644
index 0000000000..34ba9fd91b
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctLongFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDistinctLongFunction extends 
BaseArrayAggLongFunction<LongOpenHashSet> {
+  public ArrayAggDistinctLongFunction(ExpressionContext expression, 
FieldSpec.DataType dataType,
+      boolean nullHandlingEnabled) {
+    super(expression, dataType, nullHandlingEnabled);
+  }
+
+  @Override
+  protected void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet) {
+    long[] value = blockValSet.getLongValuesSV();
+    LongOpenHashSet valueArray = new LongOpenHashSet(length);
+    for (int i = 0; i < length; i++) {
+      valueArray.add(value[i]);
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void aggregateArrayWithNull(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+    long[] value = blockValSet.getLongValuesSV();
+    LongOpenHashSet valueArray = new LongOpenHashSet(length);
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        valueArray.add(value[i]);
+      }
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, long value) {
+    ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder) 
groupByResultHolder;
+    if (resultHolder.getResult(groupKey) == null) {
+      resultHolder.setValueForKey(groupKey, new LongOpenHashSet());
+    }
+    LongOpenHashSet groupValue = resultHolder.getResult(groupKey);
+    groupValue.add(value);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctStringFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctStringFunction.java
new file mode 100644
index 0000000000..947fd59ba1
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDistinctStringFunction.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.Arrays;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDistinctStringFunction extends 
BaseArrayAggStringFunction<ObjectOpenHashSet<String>> {
+  public ArrayAggDistinctStringFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled);
+  }
+
+  @Override
+  protected void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder, BlockValSet blockValSet) {
+    ObjectOpenHashSet<String> valueArray = new ObjectOpenHashSet<>(length);
+    String[] value = blockValSet.getStringValuesSV();
+    valueArray.addAll(Arrays.asList(value).subList(0, length));
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void aggregateArrayWithNull(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+    ObjectOpenHashSet<String> valueArray = new ObjectOpenHashSet<>(length);
+    String[] value = blockValSet.getStringValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        valueArray.add(value[i]);
+      }
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, String value) {
+    ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder) 
groupByResultHolder;
+    if (resultHolder.getResult(groupKey) == null) {
+      ObjectOpenHashSet<String> valueArray = new ObjectOpenHashSet<>();
+      valueArray.add(value);
+      resultHolder.setValueForKey(groupKey, valueArray);
+    } else {
+      ObjectOpenHashSet<String> valueArray = resultHolder.getResult(groupKey);
+      valueArray.add(value);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDoubleFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDoubleFunction.java
new file mode 100644
index 0000000000..20c22592c6
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggDoubleFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggDoubleFunction extends 
BaseArrayAggDoubleFunction<DoubleArrayList> {
+  public ArrayAggDoubleFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled);
+  }
+
+  @Override
+  protected void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder, BlockValSet blockValSet) {
+    DoubleArrayList valueArray = new DoubleArrayList(length);
+    double[] value = blockValSet.getDoubleValuesSV();
+    for (int i = 0; i < length; i++) {
+      valueArray.add(value[i]);
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void aggregateArrayWithNull(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+    DoubleArrayList valueArray = new DoubleArrayList(length);
+    double[] value = blockValSet.getDoubleValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        valueArray.add(value[i]);
+      }
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, double value) {
+    ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder) 
groupByResultHolder;
+    if (resultHolder.getResult(groupKey) == null) {
+      DoubleArrayList valueArray = new DoubleArrayList();
+      valueArray.add(value);
+      resultHolder.setValueForKey(groupKey, valueArray);
+    } else {
+      DoubleArrayList valueArray = resultHolder.getResult(groupKey);
+      valueArray.add(value);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggFloatFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggFloatFunction.java
new file mode 100644
index 0000000000..9c43cd1138
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggFloatFunction.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggFloatFunction extends 
BaseArrayAggFloatFunction<FloatArrayList> {
+  public ArrayAggFloatFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled);
+  }
+
+  @Override
+  protected void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder, BlockValSet blockValSet) {
+    FloatArrayList valueArray = new FloatArrayList(length);
+    float[] value = blockValSet.getFloatValuesSV();
+    for (int i = 0; i < length; i++) {
+      valueArray.add(value[i]);
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void aggregateArrayWithNull(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+    FloatArrayList valueArray = new FloatArrayList(length);
+    float[] value = blockValSet.getFloatValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        valueArray.add(value[i]);
+      }
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+
+  protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, float value) {
+    ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder) 
groupByResultHolder;
+    if (resultHolder.getResult(groupKey) == null) {
+      FloatArrayList valueArray = new FloatArrayList();
+      valueArray.add(value);
+      resultHolder.setValueForKey(groupKey, valueArray);
+    } else {
+      FloatArrayList valueArray = resultHolder.getResult(groupKey);
+      valueArray.add(value);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggIntFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggIntFunction.java
new file mode 100644
index 0000000000..2e6764b190
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggIntFunction.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggIntFunction extends BaseArrayAggIntFunction<IntArrayList> 
{
+  public ArrayAggIntFunction(ExpressionContext expression, FieldSpec.DataType 
dataType, boolean nullHandlingEnabled) {
+    super(expression, dataType, nullHandlingEnabled);
+  }
+
+  @Override
+  protected void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet) {
+    int[] value = blockValSet.getIntValuesSV();
+    IntArrayList valueArray = new IntArrayList(length);
+    for (int i = 0; i < length; i++) {
+      valueArray.add(value[i]);
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void aggregateArrayWithNull(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+    int[] value = blockValSet.getIntValuesSV();
+    IntArrayList valueArray = new IntArrayList(length);
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        valueArray.add(value[i]);
+      }
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, int value) {
+    IntArrayList groupValue = groupByResultHolder.getResult(groupKey);
+    if (groupValue == null) {
+      groupByResultHolder.setValueForKey(groupKey, new IntArrayList(value));
+    } else {
+      groupValue.add(value);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggLongFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggLongFunction.java
new file mode 100644
index 0000000000..95fd5954e0
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggLongFunction.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggLongFunction extends 
BaseArrayAggLongFunction<LongArrayList> {
+  public ArrayAggLongFunction(ExpressionContext expression, FieldSpec.DataType 
dataType, boolean nullHandlingEnabled) {
+    super(expression, dataType, nullHandlingEnabled);
+  }
+
+  @Override
+  protected void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet) {
+    long[] value = blockValSet.getLongValuesSV();
+    LongArrayList valueArray = new LongArrayList(length);
+    for (int i = 0; i < length; i++) {
+      valueArray.add(value[i]);
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void aggregateArrayWithNull(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+    long[] value = blockValSet.getLongValuesSV();
+    LongArrayList valueArray = new LongArrayList(length);
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        valueArray.add(value[i]);
+      }
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, long value) {
+    ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder) 
groupByResultHolder;
+    if (resultHolder.getResult(groupKey) == null) {
+      resultHolder.setValueForKey(groupKey, new LongArrayList());
+    }
+    LongArrayList groupValue = resultHolder.getResult(groupKey);
+    groupValue.add(value);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggStringFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggStringFunction.java
new file mode 100644
index 0000000000..ad13868373
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/ArrayAggStringFunction.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import java.util.Arrays;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class ArrayAggStringFunction extends 
BaseArrayAggStringFunction<ObjectArrayList<String>> {
+  public ArrayAggStringFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled);
+  }
+
+  @Override
+  protected void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder, BlockValSet blockValSet) {
+    ObjectArrayList<String> valueArray = new ObjectArrayList<>(length);
+    String[] value = blockValSet.getStringValuesSV();
+    valueArray.addAll(Arrays.asList(value).subList(0, length));
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void aggregateArrayWithNull(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap) {
+    ObjectArrayList<String> valueArray = new ObjectArrayList<>(length);
+    String[] value = blockValSet.getStringValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        valueArray.add(value[i]);
+      }
+    }
+    aggregationResultHolder.setValue(valueArray);
+  }
+
+  @Override
+  protected void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, String value) {
+    ObjectGroupByResultHolder resultHolder = (ObjectGroupByResultHolder) 
groupByResultHolder;
+    if (resultHolder.getResult(groupKey) == null) {
+      ObjectArrayList<String> valueArray = new ObjectArrayList<>();
+      valueArray.add(value);
+      resultHolder.setValueForKey(groupKey, valueArray);
+    } else {
+      ObjectArrayList<String> valueArray = resultHolder.getResult(groupKey);
+      valueArray.add(value);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggDoubleFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggDoubleFunction.java
new file mode 100644
index 0000000000..e9cb01dfe8
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggDoubleFunction.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.doubles.AbstractDoubleCollection;
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggDoubleFunction<I extends 
AbstractDoubleCollection>
+    extends BaseArrayAggFunction<I, DoubleArrayList> {
+  public BaseArrayAggDoubleFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, FieldSpec.DataType.DOUBLE, nullHandlingEnabled);
+  }
+
+  abstract void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, double value);
+
+  @Override
+  protected void aggregateArrayGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet) {
+    double[] values = blockValSet.getDoubleValuesSV();
+    for (int i = 0; i < length; i++) {
+      setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupBySVWithNull(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap) {
+    double[] values = blockValSet.getDoubleValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+      }
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet) {
+    double[] values = blockValSet.getDoubleValuesSV();
+    for (int i = 0; i < length; i++) {
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupByResultHolder, groupKey, values[i]);
+      }
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupByMVWithNull(int length, int[][] 
groupKeysArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap) {
+    double[] values = blockValSet.getDoubleValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupByResultHolder, groupKey, values[i]);
+        }
+      }
+    }
+  }
+
+  @Override
+  public I merge(I intermediateResult1, I intermediateResult2) {
+    if (intermediateResult1 == null) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2 == null) {
+      return intermediateResult1;
+    }
+    intermediateResult1.addAll(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public DoubleArrayList extractFinalResult(I doubleArrayList) {
+    return new DoubleArrayList(doubleArrayList);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFloatFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFloatFunction.java
new file mode 100644
index 0000000000..32314064bd
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFloatFunction.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.floats.AbstractFloatCollection;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggFloatFunction<I extends 
AbstractFloatCollection>
+    extends BaseArrayAggFunction<I, FloatArrayList> {
+  public BaseArrayAggFloatFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, FieldSpec.DataType.FLOAT, nullHandlingEnabled);
+  }
+
+  abstract void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, float value);
+
+  @Override
+  protected void aggregateArrayGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet) {
+    float[] values = blockValSet.getFloatValuesSV();
+    for (int i = 0; i < length; i++) {
+      setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupBySVWithNull(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap) {
+    float[] values = blockValSet.getFloatValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+      }
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet) {
+    float[] values = blockValSet.getFloatValuesSV();
+    for (int i = 0; i < length; i++) {
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupByResultHolder, groupKey, values[i]);
+      }
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupByMVWithNull(int length, int[][] 
groupKeysArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap) {
+    float[] values = blockValSet.getFloatValuesSV();
+    for (int i = 0; i < length; i++) {
+      for (int groupKey : groupKeysArray[i]) {
+        if (!nullBitmap.contains(i)) {
+          setGroupByResult(groupByResultHolder, groupKey, values[i]);
+        }
+      }
+    }
+  }
+
+  @Override
+  public I merge(I intermediateResult1, I intermediateResult2) {
+    if (intermediateResult1 == null) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2 == null) {
+      return intermediateResult1;
+    }
+    intermediateResult1.addAll(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public FloatArrayList extractFinalResult(I floatArrayList) {
+    return new FloatArrayList(floatArrayList);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFunction.java
new file mode 100644
index 0000000000..7a90d7ef30
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggFunction.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import 
org.apache.pinot.core.query.aggregation.function.BaseSingleInputAggregationFunction;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggFunction<I, F extends Comparable> extends 
BaseSingleInputAggregationFunction<I, F> {
+
+  protected final boolean _nullHandlingEnabled;
+  private final DataSchema.ColumnDataType _resultColumnType;
+
+  public BaseArrayAggFunction(ExpressionContext expression, FieldSpec.DataType 
dataType, boolean nullHandlingEnabled) {
+    super(expression);
+    _nullHandlingEnabled = nullHandlingEnabled;
+    _resultColumnType = DataSchema.ColumnDataType.fromDataTypeMV(dataType);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.ARRAYAGG;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return _resultColumnType;
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    if (_nullHandlingEnabled) {
+      RoaringBitmap nullBitmap = blockValSet.getNullBitmap();
+      if (nullBitmap != null && !nullBitmap.isEmpty()) {
+        aggregateArrayWithNull(length, aggregationResultHolder, blockValSet, 
nullBitmap);
+        return;
+      }
+    }
+    aggregateArray(length, aggregationResultHolder, blockValSet);
+  }
+
+  protected abstract void aggregateArray(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet);
+
+  protected abstract void aggregateArrayWithNull(int length, 
AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, RoaringBitmap nullBitmap);
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    RoaringBitmap nullBitmap = null;
+    if (_nullHandlingEnabled) {
+      nullBitmap = blockValSet.getNullBitmap();
+      if (nullBitmap != null && !nullBitmap.isEmpty()) {
+        aggregateArrayGroupBySV(length, groupKeyArray, groupByResultHolder, 
blockValSet);
+        return;
+      }
+    }
+    aggregateArrayGroupBySVWithNull(length, groupKeyArray, 
groupByResultHolder, blockValSet, nullBitmap);
+  }
+
+  protected abstract void aggregateArrayGroupBySV(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet);
+
+  protected abstract void aggregateArrayGroupBySVWithNull(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap);
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    if (_nullHandlingEnabled) {
+      RoaringBitmap nullBitmap = blockValSet.getNullBitmap();
+      if (nullBitmap != null && !nullBitmap.isEmpty()) {
+        aggregateArrayGroupByMVWithNull(length, groupKeysArray, 
groupByResultHolder, blockValSet, nullBitmap);
+        return;
+      }
+    }
+    aggregateArrayGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSet);
+  }
+
+  protected abstract void aggregateArrayGroupByMV(int length, int[][] 
groupKeysArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet);
+
+  protected abstract void aggregateArrayGroupByMVWithNull(int length, int[][] 
groupKeysArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap);
+
+  @Override
+  public I extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return aggregationResultHolder.getResult();
+  }
+
+  @Override
+  public I extractGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey) {
+    return groupByResultHolder.getResult(groupKey);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggIntFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggIntFunction.java
new file mode 100644
index 0000000000..60af69fd36
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggIntFunction.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.ints.AbstractIntCollection;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggIntFunction<I extends AbstractIntCollection>
+    extends BaseArrayAggFunction<I, IntArrayList> {
+  public BaseArrayAggIntFunction(ExpressionContext expression, 
FieldSpec.DataType dataType,
+      boolean nullHandlingEnabled) {
+    super(expression, dataType, nullHandlingEnabled);
+  }
+
+  abstract void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, int value);
+
+  @Override
+  protected void aggregateArrayGroupBySV(int length, int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet) {
+    int[] values = blockValSet.getIntValuesSV();
+    for (int i = 0; i < length; i++) {
+      setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupBySVWithNull(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap) {
+    int[] values = blockValSet.getIntValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+      }
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupByMV(int length, int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet) {
+    int[] values = blockValSet.getIntValuesSV();
+    for (int i = 0; i < length; i++) {
+      int[] groupKeys = groupKeysArray[i];
+      int value = values[i];
+      for (int groupKey : groupKeys) {
+        setGroupByResult(groupByResultHolder, groupKey, value);
+      }
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupByMVWithNull(int length, int[][] 
groupKeysArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap) {
+    int[] values = blockValSet.getIntValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        int[] groupKeys = groupKeysArray[i];
+        int value = values[i];
+        for (int groupKey : groupKeys) {
+          setGroupByResult(groupByResultHolder, groupKey, value);
+        }
+      }
+    }
+  }
+
+  @Override
+  public I merge(I intermediateResult1, I intermediateResult2) {
+    if (intermediateResult1 == null || intermediateResult1.isEmpty()) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2 == null || intermediateResult2.isEmpty()) {
+      return intermediateResult1;
+    }
+    intermediateResult1.addAll(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public IntArrayList extractFinalResult(I intArrayList) {
+    return new IntArrayList(intArrayList);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggLongFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggLongFunction.java
new file mode 100644
index 0000000000..edb037f06d
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggLongFunction.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.longs.AbstractLongCollection;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggLongFunction<I extends 
AbstractLongCollection>
+    extends BaseArrayAggFunction<I, LongArrayList> {
+  public BaseArrayAggLongFunction(ExpressionContext expression, 
FieldSpec.DataType dataType,
+      boolean nullHandlingEnabled) {
+    super(expression, dataType, nullHandlingEnabled);
+  }
+
+  abstract void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, long value);
+
+  @Override
+  protected void aggregateArrayGroupBySV(int length, int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet) {
+    long[] values = blockValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupBySVWithNull(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap) {
+    long[] values = blockValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+      }
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupByMV(int length, int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet) {
+    long[] values = blockValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      int[] groupKeys = groupKeysArray[i];
+      for (int groupKey : groupKeys) {
+        setGroupByResult(groupByResultHolder, groupKey, values[i]);
+      }
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupByMVWithNull(int length, int[][] 
groupKeysArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap) {
+    long[] values = blockValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        int[] groupKeys = groupKeysArray[i];
+        for (int groupKey : groupKeys) {
+          setGroupByResult(groupByResultHolder, groupKey, values[i]);
+        }
+      }
+    }
+  }
+
+  @Override
+  public I merge(I intermediateResult1, I intermediateResult2) {
+    if (intermediateResult1 == null || intermediateResult1.isEmpty()) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2 == null || intermediateResult2.isEmpty()) {
+      return intermediateResult1;
+    }
+    intermediateResult1.addAll(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public LongArrayList extractFinalResult(I arrayList) {
+    return new LongArrayList(arrayList);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
new file mode 100644
index 0000000000..5f5b98f7c8
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.array;
+
+import it.unimi.dsi.fastutil.objects.AbstractObjectCollection;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public abstract class BaseArrayAggStringFunction<I extends 
AbstractObjectCollection<String>>
+    extends BaseArrayAggFunction<I, ObjectArrayList<String>> {
+  public BaseArrayAggStringFunction(ExpressionContext expression, boolean 
nullHandlingEnabled) {
+    super(expression, FieldSpec.DataType.STRING, nullHandlingEnabled);
+  }
+
+  abstract void setGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey, String value);
+
+  @Override
+  protected void aggregateArrayGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet) {
+    String[] values = blockValSet.getStringValuesSV();
+    for (int i = 0; i < length; i++) {
+      setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupBySVWithNull(int length, int[] 
groupKeyArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap) {
+    String[] values = blockValSet.getStringValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        setGroupByResult(groupByResultHolder, groupKeyArray[i], values[i]);
+      }
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet) {
+    String[] values = blockValSet.getStringValuesSV();
+    for (int i = 0; i < length; i++) {
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupByResultHolder, groupKey, values[i]);
+      }
+    }
+  }
+
+  @Override
+  protected void aggregateArrayGroupByMVWithNull(int length, int[][] 
groupKeysArray,
+      GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, 
RoaringBitmap nullBitmap) {
+    String[] values = blockValSet.getStringValuesSV();
+    for (int i = 0; i < length; i++) {
+      if (!nullBitmap.contains(i)) {
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupByResultHolder, groupKey, values[i]);
+        }
+      }
+    }
+  }
+
+  @Override
+  public I merge(I intermediateResult1, I intermediateResult2) {
+    if (intermediateResult1 == null) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2 == null) {
+      return intermediateResult1;
+    }
+    intermediateResult1.addAll(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public ObjectArrayList<String> extractFinalResult(I stringArrayList) {
+    return new ObjectArrayList<>(stringArrayList);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 8116a29373..dd2ad2e75a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -23,10 +23,14 @@ import com.tdunning.math.stats.TDigest;
 import it.unimi.dsi.fastutil.doubles.Double2LongOpenHashMap;
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import it.unimi.dsi.fastutil.floats.Float2LongOpenHashMap;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
 import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -392,4 +396,60 @@ public class ObjectSerDeUtilsTest {
       assertEquals(actual.getEstimate(), sketch.getEstimate(), ERROR_MESSAGE);
     }
   }
+
+  @Test
+  public void testIntArrayList() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      int size = RANDOM.nextInt(100);
+      IntArrayList expected = new IntArrayList(size);
+      for (int j = 0; j < size; j++) {
+        expected.add(RANDOM.nextInt());
+      }
+      byte[] bytes = ObjectSerDeUtils.serialize(expected);
+      IntArrayList actual = ObjectSerDeUtils.deserialize(bytes, 
ObjectSerDeUtils.ObjectType.IntArrayList);
+      assertEquals(actual, expected, ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testLongArrayList() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      int size = RANDOM.nextInt(100);
+      LongArrayList expected = new LongArrayList(size);
+      for (int j = 0; j < size; j++) {
+        expected.add(RANDOM.nextLong());
+      }
+      byte[] bytes = ObjectSerDeUtils.serialize(expected);
+      LongArrayList actual = ObjectSerDeUtils.deserialize(bytes, 
ObjectSerDeUtils.ObjectType.LongArrayList);
+      assertEquals(actual, expected, ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testFloatArrayList() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      int size = RANDOM.nextInt(100);
+      FloatArrayList expected = new FloatArrayList(size);
+      for (int j = 0; j < size; j++) {
+        expected.add(RANDOM.nextFloat());
+      }
+      byte[] bytes = ObjectSerDeUtils.serialize(expected);
+      FloatArrayList actual = ObjectSerDeUtils.deserialize(bytes, 
ObjectSerDeUtils.ObjectType.FloatArrayList);
+      assertEquals(actual, expected, ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testStringArrayList() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      int size = RANDOM.nextInt(100);
+      ObjectArrayList<String> expected = new ObjectArrayList<>(size);
+      for (int j = 0; j < size; j++) {
+        expected.add(RandomStringUtils.random(RANDOM.nextInt(20)));
+      }
+      byte[] bytes = ObjectSerDeUtils.serialize(expected);
+      ObjectArrayList<String> actual = ObjectSerDeUtils.deserialize(bytes, 
ObjectSerDeUtils.ObjectType.StringArrayList);
+      assertEquals(actual, expected, ERROR_MESSAGE);
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ArrayAggQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ArrayAggQueriesTest.java
new file mode 100644
index 0000000000..5dc8bc0a11
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ArrayAggQueriesTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Queries test for ArrayAgg queries.
+ */
+@SuppressWarnings("unchecked")
+public class ArrayAggQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"ArrayAggQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final Random RANDOM = new Random();
+
+  private static final int NUM_RECORDS = 2000;
+  private static final int MAX_VALUE = 1000;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, 
DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, 
DataType.DOUBLE).addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .build();
+  private static final TableConfig TABLE_CONFIG =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private Set<Integer> _values;
+  private int[] _expectedCardinalityResults;
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE);
+    _values = new HashSet<>(hashMapCapacity);
+    Set<Integer> longResultSet = new HashSet<>(hashMapCapacity);
+    Set<Integer> floatResultSet = new HashSet<>(hashMapCapacity);
+    Set<Integer> doubleResultSet = new HashSet<>(hashMapCapacity);
+    Set<Integer> stringResultSet = new HashSet<>(hashMapCapacity);
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      int value = RANDOM.nextInt(MAX_VALUE);
+      GenericRow record = new GenericRow();
+      record.putValue(INT_COLUMN, value);
+      _values.add(Integer.hashCode(value));
+      record.putValue(LONG_COLUMN, (long) value);
+      longResultSet.add(Long.hashCode(value));
+      record.putValue(FLOAT_COLUMN, (float) value);
+      floatResultSet.add(Float.hashCode(value));
+      record.putValue(DOUBLE_COLUMN, (double) value);
+      doubleResultSet.add(Double.hashCode(value));
+      String stringValue = Integer.toString(value);
+      record.putValue(STRING_COLUMN, stringValue);
+      stringResultSet.add(stringValue.hashCode());
+      records.add(record);
+    }
+    _expectedCardinalityResults = new int[]{
+        _values.size(), longResultSet.size(), floatResultSet.size(), 
doubleResultSet.size(), stringResultSet.size()
+    };
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  @Test
+  public void testArrayAggNonDistinct() {
+    String query =
+        "SELECT ArrayAgg(intColumn, 'INT'), ArrayAgg(longColumn, 'LONG'), 
ArrayAgg(floatColumn, 'FLOAT'), "
+            + "ArrayAgg(doubleColumn, 'DOUBLE'), ArrayAgg(stringColumn, 
'STRING')"
+            + " FROM testTable";
+
+    // Inner segment
+    AggregationOperator aggregationOperator = getOperator(query);
+    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        5 * NUM_RECORDS, NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getResults();
+    assertNotNull(aggregationResult);
+    for (int i = 0; i < 5; i++) {
+      assertEquals(((List) aggregationResult.get(i)).size(), NUM_RECORDS);
+    }
+
+    // Inter segments
+    ResultTable resultTable = getBrokerResponse(query).getResultTable();
+    assertEquals(resultTable.getRows().get(0).length, 5);
+    assertEquals(((int[]) resultTable.getRows().get(0)[0]).length, 4 * 
NUM_RECORDS);
+    assertEquals(((long[]) resultTable.getRows().get(0)[1]).length, 4 * 
NUM_RECORDS);
+    assertEquals(((float[]) resultTable.getRows().get(0)[2]).length, 4 * 
NUM_RECORDS);
+    assertEquals(((double[]) resultTable.getRows().get(0)[3]).length, 4 * 
NUM_RECORDS);
+    assertEquals(((String[]) resultTable.getRows().get(0)[4]).length, 4 * 
NUM_RECORDS);
+  }
+
+  @Test
+  public void testArrayAggDistinct() {
+    String query =
+        "SELECT ArrayAgg(intColumn, 'INT', true), ArrayAgg(longColumn, 'LONG', 
true), "
+            + "ArrayAgg(floatColumn, 'FLOAT', true), ArrayAgg(doubleColumn, 
'DOUBLE', true), "
+            + "ArrayAgg(stringColumn, 'STRING', true)"
+            + " FROM testTable";
+
+    // Inner segment
+    AggregationOperator aggregationOperator = getOperator(query);
+    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        5 * NUM_RECORDS, NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getResults();
+    assertNotNull(aggregationResult);
+    for (int i = 0; i < 5; i++) {
+      assertEquals(((Set) aggregationResult.get(i)).size(), 
_expectedCardinalityResults[i]);
+    }
+
+    // Inter segments
+    ResultTable resultTable = getBrokerResponse(query).getResultTable();
+    assertEquals(resultTable.getRows().get(0).length, 5);
+    assertEquals(((int[]) resultTable.getRows().get(0)[0]).length, 
_expectedCardinalityResults[0]);
+    assertEquals(((long[]) resultTable.getRows().get(0)[1]).length, 
_expectedCardinalityResults[1]);
+    assertEquals(((float[]) resultTable.getRows().get(0)[2]).length, 
_expectedCardinalityResults[2]);
+    assertEquals(((double[]) resultTable.getRows().get(0)[3]).length, 
_expectedCardinalityResults[3]);
+    assertEquals(((String[]) resultTable.getRows().get(0)[4]).length, 
_expectedCardinalityResults[4]);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _indexSegment.destroy();
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
new file mode 100644
index 0000000000..0cf7ebdbf8
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class ArrayTest extends CustomDataQueryClusterIntegrationTest {
+
+  private static final String DEFAULT_TABLE_NAME = "ArrayTest";
+  private static final String BOOLEAN_COLUMN = "boolCol";
+  private static final String INT_COLUMN = "intCol";
+  private static final String LONG_COLUMN = "longCol";
+  private static final String FLOAT_COLUMN = "floatCol";
+  private static final String DOUBLE_COLUMN = "doubleCol";
+  private static final String STRING_COLUMN = "stringCol";
+  private static final String TIMESTAMP_COLUMN = "timestampCol";
+
+  @Override
+  protected long getCountStarResult() {
+    return 1000;
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT "
+            + "arrayAgg(boolCol, 'BOOLEAN'), "
+            + "arrayAgg(intCol, 'INT'), "
+            + "arrayAgg(longCol, 'LONG'), "
+            // NOTE: FLOAT array is auto converted to DOUBLE array
+            + (useMultiStageQueryEngine ? "arrayAgg(floatCol, 'DOUBLE'), " : 
"arrayAgg(floatCol, 'FLOAT'), ")
+            + "arrayAgg(doubleCol, 'DOUBLE'), "
+            + "arrayAgg(stringCol, 'STRING'), "
+            + "arrayAgg(timestampCol, 'TIMESTAMP') "
+            + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    System.out.println(jsonNode);
+    Assert.assertEquals(jsonNode.get("resultTable").get("rows").size(), 1);
+    Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).size(), 
7);
+    
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).size(),
 getCountStarResult());
+    
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).size(),
 getCountStarResult());
+    
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(2).size(),
 getCountStarResult());
+    
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(3).size(),
 getCountStarResult());
+    
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(4).size(),
 getCountStarResult());
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueryWithDistinct(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT "
+            + "arrayAgg(boolCol, 'BOOLEAN', true), "
+            + "arrayAgg(intCol, 'INT', true), "
+            + "arrayAgg(longCol, 'LONG', true), "
+            // NOTE: FLOAT array is auto converted to DOUBLE array
+            + (useMultiStageQueryEngine ? "arrayAgg(floatCol, 'DOUBLE', true), 
"
+            : "arrayAgg(floatCol, 'FLOAT', true), ")
+            + "arrayAgg(doubleCol, 'DOUBLE', true), "
+            + "arrayAgg(stringCol, 'STRING', true), "
+            + "arrayAgg(timestampCol, 'TIMESTAMP', true) "
+            + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    System.out.println(jsonNode);
+    Assert.assertEquals(jsonNode.get("resultTable").get("rows").size(), 1);
+    Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).size(), 
7);
+    
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).size(),
 2);
+    
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).size(),
 getCountStarResult() / 10);
+    
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(2).size(),
 getCountStarResult() / 10);
+    
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(3).size(),
 getCountStarResult() / 10);
+    
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(4).size(),
 getCountStarResult() / 10);
+  }
+
+  @Override
+  public String getTableName() {
+    return DEFAULT_TABLE_NAME;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+        .addSingleValueDimension(BOOLEAN_COLUMN, FieldSpec.DataType.BOOLEAN)
+        .addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
+        .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(FLOAT_COLUMN, FieldSpec.DataType.FLOAT)
+        .addSingleValueDimension(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE)
+        .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(TIMESTAMP_COLUMN, 
FieldSpec.DataType.TIMESTAMP)
+        .build();
+  }
+
+  @Override
+  public File createAvroFile()
+      throws Exception {
+    // create avro schema
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(ImmutableList.of(
+        new org.apache.avro.Schema.Field(BOOLEAN_COLUMN,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN),
+            null, null),
+        new org.apache.avro.Schema.Field(INT_COLUMN, 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT),
+            null, null),
+        new org.apache.avro.Schema.Field(LONG_COLUMN, 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
+            null, null),
+        new org.apache.avro.Schema.Field(FLOAT_COLUMN, 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT),
+            null, null),
+        new org.apache.avro.Schema.Field(DOUBLE_COLUMN,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE),
+            null, null),
+        new org.apache.avro.Schema.Field(STRING_COLUMN,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
+            null, null),
+        new org.apache.avro.Schema.Field(TIMESTAMP_COLUMN,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
+            null, null)
+    ));
+
+    // create avro file
+    File avroFile = new File(_tempDir, "data.avro");
+    Cache<Integer, GenericData.Record> recordCache = 
CacheBuilder.newBuilder().build();
+    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+      fileWriter.create(avroSchema, avroFile);
+      for (int i = 0; i < getCountStarResult(); i++) {
+        // add avro record to file
+        fileWriter.append(recordCache.get((int) (i % (getCountStarResult() / 
10)), () -> {
+              // create avro record
+              GenericData.Record record = new GenericData.Record(avroSchema);
+              record.put(BOOLEAN_COLUMN, RANDOM.nextBoolean());
+              record.put(INT_COLUMN, RANDOM.nextInt());
+              record.put(LONG_COLUMN, RANDOM.nextLong());
+              record.put(FLOAT_COLUMN, RANDOM.nextFloat());
+              record.put(DOUBLE_COLUMN, RANDOM.nextDouble());
+              record.put(STRING_COLUMN, 
RandomStringUtils.random(RANDOM.nextInt(100)));
+              record.put(TIMESTAMP_COLUMN, RANDOM.nextLong());
+              return record;
+            }
+        ));
+      }
+    }
+    return avroFile;
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/TypeUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/TypeUtils.java
index 4a609c737e..a50d48b31a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/TypeUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/TypeUtils.java
@@ -19,7 +19,10 @@
 package org.apache.pinot.query.runtime.operator.utils;
 
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 
 
@@ -45,20 +48,41 @@ public class TypeUtils {
       // For AggregationFunctions that return serialized custom object, e.g. 
DistinctCountRawHLLAggregationFunction
       case STRING:
         return value.toString();
+      case INT_ARRAY:
+        if (value instanceof IntArrayList) {
+          // For ArrayAggregationFunction
+          return ((IntArrayList) value).elements();
+        } else {
+          return value;
+        }
       case LONG_ARRAY:
         if (value instanceof LongArrayList) {
-          // For FunnelCountAggregationFunction
+          // For FunnelCountAggregationFunction and ArrayAggregationFunction
           return ((LongArrayList) value).elements();
         } else {
           return value;
         }
+      case FLOAT_ARRAY:
+        if (value instanceof FloatArrayList) {
+          // For ArrayAggregationFunction
+          return ((FloatArrayList) value).elements();
+        } else {
+          return value;
+        }
       case DOUBLE_ARRAY:
         if (value instanceof DoubleArrayList) {
-          // For HistogramAggregationFunction
+          // For HistogramAggregationFunction and ArrayAggregationFunction
           return ((DoubleArrayList) value).elements();
         } else {
           return value;
         }
+      case STRING_ARRAY:
+        if (value instanceof ObjectArrayList) {
+          // For ArrayAggregationFunction
+          return ((ObjectArrayList<String>) value).toArray(new String[0]);
+        } else {
+          return value;
+        }
       // TODO: Add more conversions
       default:
         return value;
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 5a7c127e54..928670eeba 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -308,6 +308,11 @@ public enum AggregationFunctionType {
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.INTEGER, 
SqlTypeFamily.ANY), ordinal -> ordinal > 3),
       ReturnTypes.ARG1, ReturnTypes.explicit(SqlTypeName.OTHER)),
 
+  // Array aggregate functions
+  ARRAYAGG("arrayAgg", null, SqlKind.ARRAY_AGG, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
+      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.STRING, SqlTypeFamily.BOOLEAN),
+          ordinal -> ordinal > 1), ReturnTypes.TO_ARRAY, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
+
   // funnel aggregate functions
   // TODO: revisit support for funnel count in V2
   FUNNELCOUNT("funnelCount");
@@ -357,12 +362,12 @@ public enum AggregationFunctionType {
    * Constructor to use for aggregation functions which are supported in both 
v1 and multistage engines with
    * different behavior comparing to Calcite and requires literal operand 
inputs.
    *
-   * @param name name of the agg function
-   * @param alternativeNames alternative name of the agg function.
-   * @param sqlKind sql kind indicator, used by Calcite
-   * @param sqlFunctionCategory function catalog, used by Calcite
-   * @param operandTypeChecker input operand type signature, used by Calcite
-   * @param finalReturnType final output type signature, used by Calcite
+   * @param name                   name of the agg function
+   * @param alternativeNames       alternative name of the agg function.
+   * @param sqlKind                sql kind indicator, used by Calcite
+   * @param sqlFunctionCategory    function catalog, used by Calcite
+   * @param operandTypeChecker     input operand type signature, used by 
Calcite
+   * @param finalReturnType        final output type signature, used by Calcite
    * @param intermediateReturnType intermediate output type signature, used by 
Pinot and Calcite
    */
   AggregationFunctionType(String name, @Nullable List<String> 
alternativeNames, @Nullable SqlKind sqlKind,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to