This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 555e5769ed0 Add distinctCountSmartULL aggregation (set→ULL smart 
promotion) (#16605)
555e5769ed0 is described below

commit 555e5769ed0054634de7f5ebe07ea6bb04457dca
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Aug 16 09:51:58 2025 -0700

    Add distinctCountSmartULL aggregation (set→ULL smart promotion) (#16605)
    
    * core: add DistinctCountSmartULL aggregation
    
    - Implement DistinctCountSmartULLAggregationFunction (set→ULL promotion 
with threshold+p)
    - Wire DISTINCTCOUNTSMARTULL in AggregationFunctionType and 
AggregationFunctionFactory
    - Non-scan operator + planner support (dictionary-based paths)
    - Tests: query coverage and enum recognition
    
    Also improves ULL handling for RAWULL/HLL paths in non-scan operator.
    
    * core: introduce BaseDistinctCountSmartSketchAggregationFunction for 
shared raw-value handling; refactor DistinctCountSmartHLL/ULL to extend it; 
clean up duplicates and unused imports
---
 .../function/AggregationFunctionTypeTest.java      |   2 +
 .../query/NonScanBasedAggregationOperator.java     |  63 +-
 .../pinot/core/plan/AggregationPlanNode.java       |   3 +-
 .../function/AggregationFunctionFactory.java       |   2 +
 ...stinctCountSmartSketchAggregationFunction.java} | 628 +++++---------------
 .../DistinctCountSmartHLLAggregationFunction.java  | 643 +--------------------
 .../DistinctCountSmartULLAggregationFunction.java  | 496 ++++++++++++++++
 .../DistinctCountULLAggregationFunction.java       |  10 +-
 .../pinot/queries/DistinctCountQueriesTest.java    |  44 ++
 .../tests/MultiStageEngineIntegrationTest.java     |  34 +-
 .../pinot/segment/spi/AggregationFunctionType.java |   2 +
 11 files changed, 798 insertions(+), 1129 deletions(-)

diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
index dc631eb346c..c069c5d152e 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
@@ -87,6 +87,8 @@ public class AggregationFunctionTypeTest {
         AggregationFunctionType.DISTINCTCOUNTULL);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnTrAwUll"),
         AggregationFunctionType.DISTINCTCOUNTRAWULL);
+    
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnTsMaRtUlL"),
+        AggregationFunctionType.DISTINCTCOUNTSMARTULL);
   }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
index 04f388cd464..42394a19283 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator.query;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.dynatrace.hash4j.distinctcount.UltraLogLog;
 import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
 import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@@ -42,8 +43,11 @@ import 
org.apache.pinot.core.query.aggregation.function.DistinctCountOffHeapAggr
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLPlusAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.DistinctCountSmartULLAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.DistinctCountULLAggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
+import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -54,10 +58,11 @@ import org.apache.pinot.spi.utils.ByteArray;
  * Aggregation operator that utilizes dictionary or column metadata for 
serving aggregation queries to avoid scanning.
  * The scanless operator is selected in the plan maker, if the query is of 
aggregation type min, max, minmaxrange,
  * distinctcount, distinctcounthll, distinctcountrawhll, 
segmentpartitioneddistinctcount, distinctcountsmarthll,
- * distinctcounthllplus, distinctcountrawhllplus, and the column has a 
dictionary, or has column metadata with min and
- * max value defined. It also supports count(*) if the query has no filter.
- * We don't use this operator if the segment has star tree,
- * as the dictionary will have aggregated values for the metrics, and 
dimensions will have star node value.
+ * distinctcounthllplus, distinctcountrawhllplus, distinctcountUll, 
distinctcountsmartUll and the column has a
+ * dictionary, or has column metadata with min and max value defined. It also 
supports count(*) if the query has
+ * no filter.
+ * We don't use this operator if the segment has star tree, as the dictionary 
will have aggregated values for the
+ * metrics, and dimensions will have star node value.
  *
  * For min value, we use the first value from the dictionary, falling back to 
the column metadata min value if there
  * is no dictionary.
@@ -144,6 +149,18 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
           result = 
getDistinctCountSmartHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
               (DistinctCountSmartHLLAggregationFunction) aggregationFunction);
           break;
+        case DISTINCTCOUNTULL:
+          result = 
getDistinctCountULLResult(Objects.requireNonNull(dataSource.getDictionary()),
+              (DistinctCountULLAggregationFunction) aggregationFunction);
+          break;
+        case DISTINCTCOUNTSMARTULL:
+          result = 
getDistinctCountSmartULLResult(Objects.requireNonNull(dataSource.getDictionary()),
+              (DistinctCountSmartULLAggregationFunction) aggregationFunction);
+          break;
+        case DISTINCTCOUNTRAWULL:
+          result = 
getDistinctCountULLResult(Objects.requireNonNull(dataSource.getDictionary()),
+              (DistinctCountULLAggregationFunction) aggregationFunction);
+          break;
         default:
           throw new IllegalStateException(
               "Non-scan based aggregation operator does not support function 
type: " + aggregationFunction.getType());
@@ -234,6 +251,16 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
     return hll;
   }
 
+  private static UltraLogLog getDistinctValueULL(Dictionary dictionary, int p) 
{
+    UltraLogLog ull = UltraLogLog.create(p);
+    int length = dictionary.length();
+    for (int i = 0; i < length; i++) {
+      Object value = dictionary.get(i);
+      UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+    }
+    return ull;
+  }
+
   private static HyperLogLogPlus getDistinctValueHLLPlus(Dictionary 
dictionary, int p, int sp) {
     HyperLogLogPlus hllPlus = new HyperLogLogPlus(p, sp);
     int length = dictionary.length();
@@ -291,6 +318,34 @@ public class NonScanBasedAggregationOperator extends 
BaseOperator<AggregationRes
     }
   }
 
+  private static UltraLogLog getDistinctCountULLResult(Dictionary dictionary,
+      DistinctCountULLAggregationFunction function) {
+    if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+      // Treat BYTES value as serialized UltraLogLog and merge
+      try {
+        UltraLogLog ull = 
ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize(dictionary.getBytesValue(0));
+        int length = dictionary.length();
+        for (int i = 1; i < length; i++) {
+          
ull.add(ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize(dictionary.getBytesValue(i)));
+        }
+        return ull;
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging 
UltraLogLogs", e);
+      }
+    } else {
+      return getDistinctValueULL(dictionary, function.getP());
+    }
+  }
+
+  private static Object getDistinctCountSmartULLResult(Dictionary dictionary,
+      DistinctCountSmartULLAggregationFunction function) {
+    if (dictionary.length() > function.getThreshold()) {
+      return getDistinctValueULL(dictionary, function.getP());
+    } else {
+      return getDistinctValueSet(dictionary);
+    }
+  }
+
   @Override
   public String toExplainString() {
     return EXPLAIN_NAME;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 7db0aefd296..f60de0466cc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -52,7 +52,8 @@ public class AggregationPlanNode implements PlanNode {
       EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, 
DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTSUM,
           DISTINCTSUMMV, DISTINCTAVG, DISTINCTAVGMV, DISTINCTCOUNTOFFHEAP, 
DISTINCTCOUNTHLL, DISTINCTCOUNTHLLMV,
           DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, DISTINCTCOUNTHLLPLUS, 
DISTINCTCOUNTHLLPLUSMV,
-          DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTRAWHLLPLUSMV, 
SEGMENTPARTITIONEDDISTINCTCOUNT, DISTINCTCOUNTSMARTHLL);
+          DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTRAWHLLPLUSMV, 
DISTINCTCOUNTULL, DISTINCTCOUNTRAWULL,
+          SEGMENTPARTITIONEDDISTINCTCOUNT, DISTINCTCOUNTSMARTHLL, 
DISTINCTCOUNTSMARTULL);
 
   // DISTINCTCOUNT excluded because consuming segment metadata contains 
unknown cardinality when there is no dictionary
   private static final EnumSet<AggregationFunctionType> 
METADATA_BASED_FUNCTIONS =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index d05316b5480..092fe60c7f3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -376,6 +376,8 @@ public class AggregationFunctionFactory {
             return new DistinctCountRawHLLAggregationFunction(arguments);
           case DISTINCTCOUNTSMARTHLL:
             return new DistinctCountSmartHLLAggregationFunction(arguments);
+          case DISTINCTCOUNTSMARTULL:
+            return new DistinctCountSmartULLAggregationFunction(arguments);
           case FASTHLL:
             return new FastHLLAggregationFunction(arguments);
           case DISTINCTCOUNTTHETASKETCH:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctCountSmartSketchAggregationFunction.java
similarity index 52%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctCountSmartSketchAggregationFunction.java
index e93b48def80..0f12378b929 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctCountSmartSketchAggregationFunction.java
@@ -18,30 +18,22 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.HyperLogLog;
-import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
 import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
-import it.unimi.dsi.fastutil.objects.ObjectSet;
-import java.util.List;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -50,239 +42,102 @@ import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code DistinctCountSmartHLLAggregationFunction} calculates the number 
of distinct values for a given expression
- * (both single-valued and multi-valued are supported).
+ * Base class that encapsulates the common raw-value handling for Smart 
distinct-count functions
+ * that switch from exact Set aggregation to a sketch when a threshold is 
exceeded.
  *
- * For aggregation-only queries, the distinct values are stored in a Set 
initially. Once the number of distinct values
- * exceeds a threshold, the Set will be converted into a HyperLogLog, and 
approximate result will be returned.
- *
- * The function takes an optional second argument for parameters:
- * - threshold: Threshold of the number of distinct values to trigger the 
conversion, 100_000 by default. Non-positive
- *              value means never convert.
- * - log2m: Log2m for the converted HyperLogLog, 12 by default.
- * Example of second argument: 'threshold=10;log2m=8'
+ * Subclasses provide the sketch-specific behavior (conversion and dictionary 
handling).
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class DistinctCountSmartHLLAggregationFunction extends 
BaseSingleInputAggregationFunction<Object, Integer> {
+abstract class BaseDistinctCountSmartSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<Object, Integer> {
   // Use empty IntOpenHashSet as a placeholder for empty result
-  private static final IntSet EMPTY_PLACEHOLDER = new IntOpenHashSet();
-
-  private final int _threshold;
-  private final int _log2m;
+  protected static final IntSet EMPTY_PLACEHOLDER = new IntOpenHashSet();
 
-  public DistinctCountSmartHLLAggregationFunction(List<ExpressionContext> 
arguments) {
-    super(arguments.get(0));
-
-    if (arguments.size() > 1) {
-      Parameters parameters = new 
Parameters(arguments.get(1).getLiteral().getStringValue());
-      _threshold = parameters._threshold;
-      _log2m = parameters._log2m;
-    } else {
-      _threshold = Parameters.DEFAULT_THRESHOLD;
-      _log2m = Parameters.DEFAULT_LOG2M;
-    }
+  protected BaseDistinctCountSmartSketchAggregationFunction(ExpressionContext 
expression) {
+    super(expression);
   }
 
-  public int getThreshold() {
-    return _threshold;
-  }
+  protected abstract int getThreshold();
 
-  public int getLog2m() {
-    return _log2m;
-  }
+  protected abstract Object convertSetToSketch(Set valueSet, DataType 
storedType);
 
-  @Override
-  public AggregationFunctionType getType() {
-    return AggregationFunctionType.DISTINCTCOUNTSMARTHLL;
-  }
+  protected abstract Object convertToSketch(DictIdsWrapper dictIdsWrapper);
+
+  protected abstract IllegalStateException 
getIllegalDataTypeException(DataType dataType, boolean singleValue);
 
   @Override
-  public AggregationResultHolder createAggregationResultHolder() {
+  public final AggregationResultHolder createAggregationResultHolder() {
     return new ObjectAggregationResultHolder();
   }
 
   @Override
-  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+  public final GroupByResultHolder createGroupByResultHolder(int 
initialCapacity, int maxCapacity) {
     return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
   }
 
-  @Override
-  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    BlockValSet blockValSet = blockValSetMap.get(_expression);
-
-    // For dictionary-encoded expression, store dictionary ids into the bitmap
-    Dictionary dictionary = blockValSet.getDictionary();
-    if (dictionary != null) {
-      RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder, 
dictionary);
-      if (blockValSet.isSingleValue()) {
-        int[] dictIds = blockValSet.getDictionaryIdsSV();
-        dictIdBitmap.addN(dictIds, 0, length);
-      } else {
-        int[][] dictIds = blockValSet.getDictionaryIdsMV();
-        for (int i = 0; i < length; i++) {
-          dictIdBitmap.add(dictIds[i]);
-        }
-      }
-      return;
-    }
-
-    // For non-dictionary-encoded expression, store values into the value set 
or HLL
-    if (aggregationResultHolder.getResult() instanceof HyperLogLog) {
-      aggregateIntoHLL(length, aggregationResultHolder, blockValSet);
-    } else {
-      aggregateIntoSet(length, aggregationResultHolder, blockValSet);
-    }
-  }
-
-  private void aggregateIntoHLL(int length, AggregationResultHolder 
aggregationResultHolder, BlockValSet blockValSet) {
-    DataType valueType = blockValSet.getValueType();
-    DataType storedType = valueType.getStoredType();
-    HyperLogLog hll = aggregationResultHolder.getResult();
-    if (blockValSet.isSingleValue()) {
-      switch (storedType) {
-        case INT:
-          int[] intValues = blockValSet.getIntValuesSV();
-          for (int i = 0; i < length; i++) {
-            hll.offer(intValues[i]);
-          }
-          break;
-        case LONG:
-          long[] longValues = blockValSet.getLongValuesSV();
-          for (int i = 0; i < length; i++) {
-            hll.offer(longValues[i]);
-          }
-          break;
-        case FLOAT:
-          float[] floatValues = blockValSet.getFloatValuesSV();
-          for (int i = 0; i < length; i++) {
-            hll.offer(floatValues[i]);
-          }
-          break;
-        case DOUBLE:
-          double[] doubleValues = blockValSet.getDoubleValuesSV();
-          for (int i = 0; i < length; i++) {
-            hll.offer(doubleValues[i]);
-          }
-          break;
-        case STRING:
-          String[] stringValues = blockValSet.getStringValuesSV();
-          for (int i = 0; i < length; i++) {
-            hll.offer(stringValues[i]);
-          }
-          break;
-        case BYTES:
-          byte[][] bytesValues = blockValSet.getBytesValuesSV();
-          for (int i = 0; i < length; i++) {
-            hll.offer(bytesValues[i]);
-          }
-          break;
-        default:
-          throw getIllegalDataTypeException(valueType, true);
-      }
-    } else {
-      switch (storedType) {
-        case INT:
-          int[][] intValues = blockValSet.getIntValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (int value : intValues[i]) {
-              hll.offer(value);
-            }
-          }
-          break;
-        case LONG:
-          long[][] longValues = blockValSet.getLongValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (long value : longValues[i]) {
-              hll.offer(value);
-            }
-          }
-          break;
-        case FLOAT:
-          float[][] floatValues = blockValSet.getFloatValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (float value : floatValues[i]) {
-              hll.offer(value);
-            }
-          }
-          break;
-        case DOUBLE:
-          double[][] doubleValues = blockValSet.getDoubleValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (double value : doubleValues[i]) {
-              hll.offer(value);
-            }
-          }
-          break;
-        case STRING:
-          String[][] stringValues = blockValSet.getStringValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (String value : stringValues[i]) {
-              hll.offer(value);
-            }
-          }
-          break;
-        default:
-          throw getIllegalDataTypeException(valueType, false);
-      }
-    }
-  }
-
-  private void aggregateIntoSet(int length, AggregationResultHolder 
aggregationResultHolder, BlockValSet blockValSet) {
+  /**
+   * Common aggregation for non-dictionary-encoded expressions into a 
per-segment Set.
+   * Subclasses can call this from their aggregate(...) when the current 
result is not a sketch.
+   */
+  protected final void aggregateIntoSet(int length, AggregationResultHolder 
aggregationResultHolder,
+      BlockValSet blockValSet) {
     DataType valueType = blockValSet.getValueType();
     DataType storedType = valueType.getStoredType();
     Set valueSet = getValueSet(aggregationResultHolder, storedType);
     if (blockValSet.isSingleValue()) {
       switch (storedType) {
-        case INT:
+        case INT: {
           IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
           int[] intValues = blockValSet.getIntValuesSV();
           for (int i = 0; i < length; i++) {
             intSet.add(intValues[i]);
           }
           break;
-        case LONG:
+        }
+        case LONG: {
           LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
           long[] longValues = blockValSet.getLongValuesSV();
           for (int i = 0; i < length; i++) {
             longSet.add(longValues[i]);
           }
           break;
-        case FLOAT:
+        }
+        case FLOAT: {
           FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
           float[] floatValues = blockValSet.getFloatValuesSV();
           for (int i = 0; i < length; i++) {
             floatSet.add(floatValues[i]);
           }
           break;
-        case DOUBLE:
+        }
+        case DOUBLE: {
           DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
           double[] doubleValues = blockValSet.getDoubleValuesSV();
           for (int i = 0; i < length; i++) {
             doubleSet.add(doubleValues[i]);
           }
           break;
-        case STRING:
+        }
+        case STRING: {
           ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>) 
valueSet;
           String[] stringValues = blockValSet.getStringValuesSV();
-          //noinspection ManualArrayToCollectionCopy
-          for (int i = 0; i < length; i++) {
-            stringSet.add(stringValues[i]);
-          }
+          stringSet.addAll(Arrays.asList(stringValues).subList(0, length));
           break;
-        case BYTES:
+        }
+        case BYTES: {
           ObjectOpenHashSet<ByteArray> bytesSet = 
(ObjectOpenHashSet<ByteArray>) valueSet;
           byte[][] bytesValues = blockValSet.getBytesValuesSV();
           for (int i = 0; i < length; i++) {
             bytesSet.add(new ByteArray(bytesValues[i]));
           }
           break;
+        }
         default:
           throw getIllegalDataTypeException(valueType, true);
       }
     } else {
       switch (storedType) {
-        case INT:
+        case INT: {
           IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
           int[][] intValues = blockValSet.getIntValuesMV();
           for (int i = 0; i < length; i++) {
@@ -291,7 +146,8 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case LONG:
+        }
+        case LONG: {
           LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
           long[][] longValues = blockValSet.getLongValuesMV();
           for (int i = 0; i < length; i++) {
@@ -300,7 +156,8 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case FLOAT:
+        }
+        case FLOAT: {
           FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
           float[][] floatValues = blockValSet.getFloatValuesMV();
           for (int i = 0; i < length; i++) {
@@ -309,7 +166,8 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case DOUBLE:
+        }
+        case DOUBLE: {
           DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
           double[][] doubleValues = blockValSet.getDoubleValuesMV();
           for (int i = 0; i < length; i++) {
@@ -318,58 +176,30 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case STRING:
+        }
+        case STRING: {
           ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>) 
valueSet;
           String[][] stringValues = blockValSet.getStringValuesMV();
           for (int i = 0; i < length; i++) {
-            //noinspection ManualArrayToCollectionCopy
-            for (String value : stringValues[i]) {
-              //noinspection UseBulkOperation
-              stringSet.add(value);
-            }
+            Collections.addAll(stringSet, stringValues[i]);
           }
           break;
+        }
         default:
           throw getIllegalDataTypeException(valueType, false);
       }
     }
 
-    // Convert to HLL if the set size exceeds the threshold
-    if (valueSet.size() > _threshold) {
-      aggregationResultHolder.setValue(convertSetToHLL(valueSet, storedType));
-    }
-  }
-
-  protected HyperLogLog convertSetToHLL(Set valueSet, DataType storedType) {
-    if (storedType == DataType.BYTES) {
-      return convertByteArraySetToHLL((ObjectSet<ByteArray>) valueSet);
-    } else {
-      return convertNonByteArraySetToHLL(valueSet);
+    if (valueSet.size() > getThreshold()) {
+      aggregationResultHolder.setValue(convertSetToSketch(valueSet, 
storedType));
     }
   }
 
-  protected HyperLogLog convertByteArraySetToHLL(ObjectSet<ByteArray> 
valueSet) {
-    HyperLogLog hll = new HyperLogLog(_log2m);
-    for (ByteArray value : valueSet) {
-      hll.offer(value.getBytes());
-    }
-    return hll;
-  }
-
-  protected HyperLogLog convertNonByteArraySetToHLL(Set valueSet) {
-    HyperLogLog hll = new HyperLogLog(_log2m);
-    for (Object value : valueSet) {
-      hll.offer(value);
-    }
-    return hll;
-  }
-
   @Override
-  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+  public final void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
 
-    // For dictionary-encoded expression, store dictionary ids into the bitmap
     Dictionary dictionary = blockValSet.getDictionary();
     if (dictionary != null) {
       if (blockValSet.isSingleValue()) {
@@ -386,56 +216,61 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
       return;
     }
 
-    // For non-dictionary-encoded expression, store values into the value set
     DataType valueType = blockValSet.getValueType();
     DataType storedType = valueType.getStoredType();
     if (blockValSet.isSingleValue()) {
       switch (storedType) {
-        case INT:
+        case INT: {
           int[] intValues = blockValSet.getIntValuesSV();
           for (int i = 0; i < length; i++) {
             ((IntOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.INT)).add(intValues[i]);
           }
           break;
-        case LONG:
+        }
+        case LONG: {
           long[] longValues = blockValSet.getLongValuesSV();
           for (int i = 0; i < length; i++) {
             ((LongOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.LONG)).add(longValues[i]);
           }
           break;
-        case FLOAT:
+        }
+        case FLOAT: {
           float[] floatValues = blockValSet.getFloatValuesSV();
           for (int i = 0; i < length; i++) {
             ((FloatOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.FLOAT)).add(floatValues[i]);
           }
           break;
-        case DOUBLE:
+        }
+        case DOUBLE: {
           double[] doubleValues = blockValSet.getDoubleValuesSV();
           for (int i = 0; i < length; i++) {
             ((DoubleOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.DOUBLE)).add(
                 doubleValues[i]);
           }
           break;
-        case STRING:
+        }
+        case STRING: {
           String[] stringValues = blockValSet.getStringValuesSV();
           for (int i = 0; i < length; i++) {
             ((ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.STRING)).add(
                 stringValues[i]);
           }
           break;
-        case BYTES:
+        }
+        case BYTES: {
           byte[][] bytesValues = blockValSet.getBytesValuesSV();
           for (int i = 0; i < length; i++) {
             ((ObjectOpenHashSet<ByteArray>) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.BYTES)).add(
                 new ByteArray(bytesValues[i]));
           }
           break;
+        }
         default:
           throw getIllegalDataTypeException(valueType, true);
       }
     } else {
       switch (storedType) {
-        case INT:
+        case INT: {
           int[][] intValues = blockValSet.getIntValuesMV();
           for (int i = 0; i < length; i++) {
             IntOpenHashSet intSet = (IntOpenHashSet) 
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT);
@@ -444,7 +279,8 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case LONG:
+        }
+        case LONG: {
           long[][] longValues = blockValSet.getLongValuesMV();
           for (int i = 0; i < length; i++) {
             LongOpenHashSet longSet =
@@ -454,7 +290,8 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case FLOAT:
+        }
+        case FLOAT: {
           float[][] floatValues = blockValSet.getFloatValuesMV();
           for (int i = 0; i < length; i++) {
             FloatOpenHashSet floatSet =
@@ -464,7 +301,8 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case DOUBLE:
+        }
+        case DOUBLE: {
           double[][] doubleValues = blockValSet.getDoubleValuesMV();
           for (int i = 0; i < length; i++) {
             DoubleOpenHashSet doubleSet =
@@ -474,18 +312,16 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case STRING:
+        }
+        case STRING: {
           String[][] stringValues = blockValSet.getStringValuesMV();
           for (int i = 0; i < length; i++) {
             ObjectOpenHashSet<String> stringSet =
                 (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.STRING);
-            //noinspection ManualArrayToCollectionCopy
-            for (String value : stringValues[i]) {
-              //noinspection UseBulkOperation
-              stringSet.add(value);
-            }
+            Collections.addAll(stringSet, stringValues[i]);
           }
           break;
+        }
         default:
           throw getIllegalDataTypeException(valueType, false);
       }
@@ -493,11 +329,10 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
   }
 
   @Override
-  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+  public final void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
 
-    // For dictionary-encoded expression, store dictionary ids into the bitmap
     Dictionary dictionary = blockValSet.getDictionary();
     if (dictionary != null) {
       if (blockValSet.isSingleValue()) {
@@ -516,53 +351,58 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
       return;
     }
 
-    // For non-dictionary-encoded expression, store values into the value set
     DataType valueType = blockValSet.getValueType();
     DataType storedType = valueType.getStoredType();
     if (blockValSet.isSingleValue()) {
       switch (storedType) {
-        case INT:
+        case INT: {
           int[] intValues = blockValSet.getIntValuesSV();
           for (int i = 0; i < length; i++) {
             setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
intValues[i]);
           }
           break;
-        case LONG:
+        }
+        case LONG: {
           long[] longValues = blockValSet.getLongValuesSV();
           for (int i = 0; i < length; i++) {
             setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
longValues[i]);
           }
           break;
-        case FLOAT:
+        }
+        case FLOAT: {
           float[] floatValues = blockValSet.getFloatValuesSV();
           for (int i = 0; i < length; i++) {
             setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
floatValues[i]);
           }
           break;
-        case DOUBLE:
+        }
+        case DOUBLE: {
           double[] doubleValues = blockValSet.getDoubleValuesSV();
           for (int i = 0; i < length; i++) {
             setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
doubleValues[i]);
           }
           break;
-        case STRING:
+        }
+        case STRING: {
           String[] stringValues = blockValSet.getStringValuesSV();
           for (int i = 0; i < length; i++) {
             setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
stringValues[i]);
           }
           break;
-        case BYTES:
+        }
+        case BYTES: {
           byte[][] bytesValues = blockValSet.getBytesValuesSV();
           for (int i = 0; i < length; i++) {
             setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], new 
ByteArray(bytesValues[i]));
           }
           break;
+        }
         default:
           throw getIllegalDataTypeException(valueType, true);
       }
     } else {
       switch (storedType) {
-        case INT:
+        case INT: {
           int[][] intValues = blockValSet.getIntValuesMV();
           for (int i = 0; i < length; i++) {
             for (int groupKey : groupKeysArray[i]) {
@@ -573,7 +413,8 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case LONG:
+        }
+        case LONG: {
           long[][] longValues = blockValSet.getLongValuesMV();
           for (int i = 0; i < length; i++) {
             for (int groupKey : groupKeysArray[i]) {
@@ -584,7 +425,8 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case FLOAT:
+        }
+        case FLOAT: {
           float[][] floatValues = blockValSet.getFloatValuesMV();
           for (int i = 0; i < length; i++) {
             for (int groupKey : groupKeysArray[i]) {
@@ -595,7 +437,8 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case DOUBLE:
+        }
+        case DOUBLE: {
           double[][] doubleValues = blockValSet.getDoubleValuesMV();
           for (int i = 0; i < length; i++) {
             for (int groupKey : groupKeysArray[i]) {
@@ -607,20 +450,18 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
             }
           }
           break;
-        case STRING:
+        }
+        case STRING: {
           String[][] stringValues = blockValSet.getStringValuesMV();
           for (int i = 0; i < length; i++) {
             for (int groupKey : groupKeysArray[i]) {
               ObjectOpenHashSet<String> stringSet =
                   (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, 
groupKey, DataType.STRING);
-              //noinspection ManualArrayToCollectionCopy
-              for (String value : stringValues[i]) {
-                //noinspection UseBulkOperation
-                stringSet.add(value);
-              }
+              Collections.addAll(stringSet, stringValues[i]);
             }
           }
           break;
+        }
         default:
           throw getIllegalDataTypeException(valueType, false);
       }
@@ -628,138 +469,39 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
   }
 
   @Override
-  public Object extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+  public final Object extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
     Object result = aggregationResultHolder.getResult();
     if (result == null) {
       return EMPTY_PLACEHOLDER;
     }
 
     if (result instanceof DictIdsWrapper) {
-      // For dictionary-encoded expression, convert dictionary ids to values
       DictIdsWrapper dictIdsWrapper = (DictIdsWrapper) result;
-      if (dictIdsWrapper._dictIdBitmap.cardinalityExceeds(_threshold)) {
-        return convertToHLL(dictIdsWrapper);
+      if (dictIdsWrapper._dictIdBitmap.cardinalityExceeds(getThreshold())) {
+        return convertToSketch(dictIdsWrapper);
       } else {
         return convertToValueSet(dictIdsWrapper);
       }
     } else {
-      // For non-dictionary-encoded expression, directly return the value set
       return result;
     }
   }
 
   @Override
-  public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey) {
+  public final Set extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
     Object result = groupByResultHolder.getResult(groupKey);
     if (result == null) {
       return EMPTY_PLACEHOLDER;
     }
 
     if (result instanceof DictIdsWrapper) {
-      // For dictionary-encoded expression, convert dictionary ids to values
       return convertToValueSet((DictIdsWrapper) result);
     } else {
-      // For non-dictionary-encoded expression, directly return the value set
       return (Set) result;
     }
   }
 
-  @Override
-  public Object merge(Object intermediateResult1, Object intermediateResult2) {
-    if (intermediateResult1 instanceof HyperLogLog) {
-      return mergeIntoHLL((HyperLogLog) intermediateResult1, 
intermediateResult2);
-    }
-    if (intermediateResult2 instanceof HyperLogLog) {
-      return mergeIntoHLL((HyperLogLog) intermediateResult2, 
intermediateResult1);
-    }
-
-    Set valueSet1 = (Set) intermediateResult1;
-    Set valueSet2 = (Set) intermediateResult2;
-    if (valueSet1.isEmpty()) {
-      return valueSet2;
-    }
-    if (valueSet2.isEmpty()) {
-      return valueSet1;
-    }
-    valueSet1.addAll(valueSet2);
-
-    // Convert to HLL if the set size exceeds the threshold
-    if (valueSet1.size() > _threshold) {
-      if (valueSet1 instanceof ObjectSet && valueSet1.iterator().next() 
instanceof ByteArray) {
-        return convertByteArraySetToHLL((ObjectSet<ByteArray>) valueSet1);
-      } else {
-        return convertNonByteArraySetToHLL(valueSet1);
-      }
-    } else {
-      return valueSet1;
-    }
-  }
-
-  private static HyperLogLog mergeIntoHLL(HyperLogLog hll, Object 
intermediateResult) {
-    if (intermediateResult instanceof HyperLogLog) {
-      try {
-        hll.addAll((HyperLogLog) intermediateResult);
-      } catch (CardinalityMergeException e) {
-        throw new RuntimeException("Caught exception while merging 
HyperLogLog", e);
-      }
-    } else {
-      Set valueSet = (Set) intermediateResult;
-      if (!valueSet.isEmpty()) {
-        if (valueSet instanceof ObjectSet && valueSet.iterator().next() 
instanceof ByteArray) {
-          for (Object value : valueSet) {
-            hll.offer(((ByteArray) value).getBytes());
-          }
-        } else {
-          for (Object value : valueSet) {
-            hll.offer(value);
-          }
-        }
-      }
-    }
-    return hll;
-  }
-
-  @Override
-  public ColumnDataType getIntermediateResultColumnType() {
-    return ColumnDataType.OBJECT;
-  }
-
-  @Override
-  public SerializedIntermediateResult serializeIntermediateResult(Object o) {
-    if (o instanceof HyperLogLog) {
-      return new 
SerializedIntermediateResult(ObjectSerDeUtils.ObjectType.HyperLogLog.getValue(),
-          ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize((HyperLogLog) o));
-    }
-    return BaseDistinctAggregateAggregationFunction.serializeSet((Set) o);
-  }
-
-  @Override
-  public Object deserializeIntermediateResult(CustomObject customObject) {
-    return ObjectSerDeUtils.deserialize(customObject);
-  }
-
-  @Override
-  public ColumnDataType getFinalResultColumnType() {
-    return ColumnDataType.INT;
-  }
-
-  @Override
-  public Integer extractFinalResult(Object intermediateResult) {
-    if (intermediateResult instanceof HyperLogLog) {
-      return (int) ((HyperLogLog) intermediateResult).cardinality();
-    } else {
-      return ((Set) intermediateResult).size();
-    }
-  }
-
-  @Override
-  public Integer mergeFinalResult(Integer finalResult1, Integer finalResult2) {
-    return finalResult1 + finalResult2;
-  }
-
-  /**
-   * Returns the dictionary id bitmap from the result holder or creates a new 
one if it does not exist.
-   */
+  /** Returns the dictionary id bitmap from the result holder or creates a new 
one if it does not exist. */
   protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder 
aggregationResultHolder,
       Dictionary dictionary) {
     DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
@@ -770,9 +512,7 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
     return dictIdsWrapper._dictIdBitmap;
   }
 
-  /**
-   * Returns the value set from the result holder or creates a new one if it 
does not exist.
-   */
+  /** Returns the value set from the result holder or creates a new one if it 
does not exist. */
   protected static Set getValueSet(AggregationResultHolder 
aggregationResultHolder, DataType valueType) {
     Set valueSet = aggregationResultHolder.getResult();
     if (valueSet == null) {
@@ -782,10 +522,8 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
     return valueSet;
   }
 
-  /**
-   * Helper method to create a value set for the given value type.
-   */
-  private static Set getValueSet(DataType valueType) {
+  /** Helper method to create a value set for the given value type. */
+  protected static Set getValueSet(DataType valueType) {
     switch (valueType) {
       case INT:
         return new IntOpenHashSet();
@@ -799,13 +537,12 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
       case BYTES:
         return new ObjectOpenHashSet();
       default:
-        throw new IllegalStateException("Illegal data type for DISTINCT_COUNT 
aggregation function: " + valueType);
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_SMART aggregation function: 
" + valueType);
     }
   }
 
-  /**
-   * Returns the dictionary id bitmap for the given group key or creates a new 
one if it does not exist.
-   */
+  /** Returns the dictionary id bitmap for the given group key or creates a 
new one if it does not exist. */
   protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder 
groupByResultHolder, int groupKey,
       Dictionary dictionary) {
     DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey);
@@ -816,9 +553,7 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
     return dictIdsWrapper._dictIdBitmap;
   }
 
-  /**
-   * Returns the value set for the given group key or creates a new one if it 
does not exist.
-   */
+  /** Returns the value set for the given group key or creates a new one if it 
does not exist. */
   protected static Set getValueSet(GroupByResultHolder groupByResultHolder, 
int groupKey, DataType valueType) {
     Set valueSet = groupByResultHolder.getResult(groupKey);
     if (valueSet == null) {
@@ -828,198 +563,121 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
     return valueSet;
   }
 
-  /**
-   * Helper method to set dictionary id for the given group keys into the 
result holder.
-   */
-  private static void setDictIdForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys,
+  /** Helper method to set dictionary id for the given group keys into the 
result holder. */
+  protected static void setDictIdForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys,
       Dictionary dictionary, int dictId) {
     for (int groupKey : groupKeys) {
       getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId);
     }
   }
 
-  /**
-   * Helper method to set INT value for the given group keys into the result 
holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, int value) {
+  /** Helper method to set INT value for the given group keys into the result 
holder. */
+  protected static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, int value) {
     for (int groupKey : groupKeys) {
       ((IntOpenHashSet) getValueSet(groupByResultHolder, groupKey, 
DataType.INT)).add(value);
     }
   }
 
-  /**
-   * Helper method to set LONG value for the given group keys into the result 
holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, long value) {
+  /** Helper method to set LONG value for the given group keys into the result 
holder. */
+  protected static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, long value) {
     for (int groupKey : groupKeys) {
       ((LongOpenHashSet) getValueSet(groupByResultHolder, groupKey, 
DataType.LONG)).add(value);
     }
   }
 
-  /**
-   * Helper method to set FLOAT value for the given group keys into the result 
holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, float value) {
+  /** Helper method to set FLOAT value for the given group keys into the 
result holder. */
+  protected static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, float value) {
     for (int groupKey : groupKeys) {
       ((FloatOpenHashSet) getValueSet(groupByResultHolder, groupKey, 
DataType.FLOAT)).add(value);
     }
   }
 
-  /**
-   * Helper method to set DOUBLE value for the given group keys into the 
result holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, double value) {
+  /** Helper method to set DOUBLE value for the given group keys into the 
result holder. */
+  protected static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, double value) {
     for (int groupKey : groupKeys) {
       ((DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey, 
DataType.DOUBLE)).add(value);
     }
   }
 
-  /**
-   * Helper method to set STRING value for the given group keys into the 
result holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, String value) {
+  /** Helper method to set STRING value for the given group keys into the 
result holder. */
+  protected static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, String value) {
     for (int groupKey : groupKeys) {
       ((ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, groupKey, 
DataType.STRING)).add(value);
     }
   }
 
-  /**
-   * Helper method to set BYTES value for the given group keys into the result 
holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, ByteArray value) {
+  /** Helper method to set BYTES value for the given group keys into the 
result holder. */
+  protected static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys,
+      ByteArray value) {
     for (int groupKey : groupKeys) {
       ((ObjectOpenHashSet<ByteArray>) getValueSet(groupByResultHolder, 
groupKey, DataType.BYTES)).add(value);
     }
   }
 
-  /**
-   * Helper method to read dictionary and convert dictionary ids to a value 
set for dictionary-encoded expression.
-   */
-  private static Set convertToValueSet(DictIdsWrapper dictIdsWrapper) {
+  /** Helper method to read dictionary and convert dictionary ids to a value 
set for dictionary-encoded expression. */
+  protected static Set convertToValueSet(DictIdsWrapper dictIdsWrapper) {
     Dictionary dictionary = dictIdsWrapper._dictionary;
     RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
     int numValues = dictIdBitmap.getCardinality();
     PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
     DataType storedType = dictionary.getValueType();
     switch (storedType) {
-      case INT:
+      case INT: {
         IntOpenHashSet intSet = new IntOpenHashSet(numValues);
         while (iterator.hasNext()) {
           intSet.add(dictionary.getIntValue(iterator.next()));
         }
         return intSet;
-      case LONG:
+      }
+      case LONG: {
         LongOpenHashSet longSet = new LongOpenHashSet(numValues);
         while (iterator.hasNext()) {
           longSet.add(dictionary.getLongValue(iterator.next()));
         }
         return longSet;
-      case FLOAT:
+      }
+      case FLOAT: {
         FloatOpenHashSet floatSet = new FloatOpenHashSet(numValues);
         while (iterator.hasNext()) {
           floatSet.add(dictionary.getFloatValue(iterator.next()));
         }
         return floatSet;
-      case DOUBLE:
+      }
+      case DOUBLE: {
         DoubleOpenHashSet doubleSet = new DoubleOpenHashSet(numValues);
         while (iterator.hasNext()) {
           doubleSet.add(dictionary.getDoubleValue(iterator.next()));
         }
         return doubleSet;
-      case STRING:
+      }
+      case STRING: {
         ObjectOpenHashSet<String> stringSet = new 
ObjectOpenHashSet<>(numValues);
         while (iterator.hasNext()) {
           stringSet.add(dictionary.getStringValue(iterator.next()));
         }
         return stringSet;
-      case BYTES:
+      }
+      case BYTES: {
         ObjectOpenHashSet<ByteArray> bytesSet = new 
ObjectOpenHashSet<>(numValues);
         while (iterator.hasNext()) {
           bytesSet.add(new 
ByteArray(dictionary.getBytesValue(iterator.next())));
         }
         return bytesSet;
+      }
       default:
-        throw new IllegalStateException("Illegal data type for DISTINCT_COUNT 
aggregation function: " + storedType);
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_SMART aggregation function: 
" + storedType);
     }
   }
 
-  /**
-   * Helper method to read dictionary and convert dictionary ids to a 
HyperLogLog for dictionary-encoded expression.
-   */
-  private HyperLogLog convertToHLL(DictIdsWrapper dictIdsWrapper) {
-    HyperLogLog hyperLogLog = new HyperLogLog(_log2m);
-    Dictionary dictionary = dictIdsWrapper._dictionary;
-    RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
-    PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
-    while (iterator.hasNext()) {
-      hyperLogLog.offer(dictionary.get(iterator.next()));
-    }
-    return hyperLogLog;
-  }
-
-  private static IllegalStateException getIllegalDataTypeException(DataType 
dataType, boolean singleValue) {
-    return new IllegalStateException(
-        "Illegal data type for DISTINCT_COUNT_SMART_HLL aggregation function: 
" + dataType + (singleValue ? ""
-            : "_MV"));
-  }
-
-  private static final class DictIdsWrapper {
+  /** Wrapper of dictionary and dict-id bitmap used during aggregation. */
+  protected static final class DictIdsWrapper {
     final Dictionary _dictionary;
     final RoaringBitmap _dictIdBitmap;
 
-    private DictIdsWrapper(Dictionary dictionary) {
+    DictIdsWrapper(Dictionary dictionary) {
       _dictionary = dictionary;
       _dictIdBitmap = new RoaringBitmap();
     }
   }
-
-  /**
-   * Helper class to wrap the parameters.
-   */
-  private static class Parameters {
-    static final char PARAMETER_DELIMITER = ';';
-    static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
-
-    static final String THRESHOLD_KEY = "THRESHOLD";
-    // 100K values to trigger HLL conversion by default
-    static final int DEFAULT_THRESHOLD = 100_000;
-    @Deprecated
-    static final String DEPRECATED_THRESHOLD_KEY = "HLLCONVERSIONTHRESHOLD";
-
-    static final String LOG2M_KEY = "LOG2M";
-    // Use 12 by default to get good accuracy for DistinctCount
-    static final int DEFAULT_LOG2M = 12;
-    @Deprecated
-    static final String DEPRECATED_LOG2M_KEY = "HLLLOG2M";
-
-    int _threshold = DEFAULT_THRESHOLD;
-    int _log2m = DEFAULT_LOG2M;
-
-    Parameters(String parametersString) {
-      StringUtils.deleteWhitespace(parametersString);
-      String[] keyValuePairs = StringUtils.split(parametersString, 
PARAMETER_DELIMITER);
-      for (String keyValuePair : keyValuePairs) {
-        String[] keyAndValue = StringUtils.split(keyValuePair, 
PARAMETER_KEY_VALUE_SEPARATOR);
-        Preconditions.checkArgument(keyAndValue.length == 2, "Invalid 
parameter: %s", keyValuePair);
-        String key = keyAndValue[0];
-        String value = keyAndValue[1];
-        switch (key.toUpperCase()) {
-          case THRESHOLD_KEY:
-          case DEPRECATED_THRESHOLD_KEY:
-            _threshold = Integer.parseInt(value);
-            // Treat non-positive threshold as unlimited
-            if (_threshold <= 0) {
-              _threshold = Integer.MAX_VALUE;
-            }
-            break;
-          case LOG2M_KEY:
-          case DEPRECATED_LOG2M_KEY:
-            _log2m = Integer.parseInt(value);
-            break;
-          default:
-            throw new IllegalArgumentException("Invalid parameter key: " + 
key);
-        }
-      }
-    }
-  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
index e93b48def80..5656212398f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
@@ -21,12 +21,6 @@ package org.apache.pinot.core.query.aggregation.function;
 import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import com.google.common.base.Preconditions;
-import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
-import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import it.unimi.dsi.fastutil.ints.IntSet;
-import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
-import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import it.unimi.dsi.fastutil.objects.ObjectSet;
 import java.util.List;
 import java.util.Map;
@@ -38,9 +32,6 @@ import 
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
-import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -63,9 +54,7 @@ import org.roaringbitmap.RoaringBitmap;
  * Example of second argument: 'threshold=10;log2m=8'
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class DistinctCountSmartHLLAggregationFunction extends 
BaseSingleInputAggregationFunction<Object, Integer> {
-  // Use empty IntOpenHashSet as a placeholder for empty result
-  private static final IntSet EMPTY_PLACEHOLDER = new IntOpenHashSet();
+public class DistinctCountSmartHLLAggregationFunction extends 
BaseDistinctCountSmartSketchAggregationFunction {
 
   private final int _threshold;
   private final int _log2m;
@@ -96,15 +85,7 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
     return AggregationFunctionType.DISTINCTCOUNTSMARTHLL;
   }
 
-  @Override
-  public AggregationResultHolder createAggregationResultHolder() {
-    return new ObjectAggregationResultHolder();
-  }
-
-  @Override
-  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
-    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
-  }
+  // Result holder creators are provided by the base class
 
   @Override
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
@@ -228,117 +209,7 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
     }
   }
 
-  private void aggregateIntoSet(int length, AggregationResultHolder 
aggregationResultHolder, BlockValSet blockValSet) {
-    DataType valueType = blockValSet.getValueType();
-    DataType storedType = valueType.getStoredType();
-    Set valueSet = getValueSet(aggregationResultHolder, storedType);
-    if (blockValSet.isSingleValue()) {
-      switch (storedType) {
-        case INT:
-          IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
-          int[] intValues = blockValSet.getIntValuesSV();
-          for (int i = 0; i < length; i++) {
-            intSet.add(intValues[i]);
-          }
-          break;
-        case LONG:
-          LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
-          long[] longValues = blockValSet.getLongValuesSV();
-          for (int i = 0; i < length; i++) {
-            longSet.add(longValues[i]);
-          }
-          break;
-        case FLOAT:
-          FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
-          float[] floatValues = blockValSet.getFloatValuesSV();
-          for (int i = 0; i < length; i++) {
-            floatSet.add(floatValues[i]);
-          }
-          break;
-        case DOUBLE:
-          DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
-          double[] doubleValues = blockValSet.getDoubleValuesSV();
-          for (int i = 0; i < length; i++) {
-            doubleSet.add(doubleValues[i]);
-          }
-          break;
-        case STRING:
-          ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>) 
valueSet;
-          String[] stringValues = blockValSet.getStringValuesSV();
-          //noinspection ManualArrayToCollectionCopy
-          for (int i = 0; i < length; i++) {
-            stringSet.add(stringValues[i]);
-          }
-          break;
-        case BYTES:
-          ObjectOpenHashSet<ByteArray> bytesSet = 
(ObjectOpenHashSet<ByteArray>) valueSet;
-          byte[][] bytesValues = blockValSet.getBytesValuesSV();
-          for (int i = 0; i < length; i++) {
-            bytesSet.add(new ByteArray(bytesValues[i]));
-          }
-          break;
-        default:
-          throw getIllegalDataTypeException(valueType, true);
-      }
-    } else {
-      switch (storedType) {
-        case INT:
-          IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
-          int[][] intValues = blockValSet.getIntValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (int value : intValues[i]) {
-              intSet.add(value);
-            }
-          }
-          break;
-        case LONG:
-          LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
-          long[][] longValues = blockValSet.getLongValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (long value : longValues[i]) {
-              longSet.add(value);
-            }
-          }
-          break;
-        case FLOAT:
-          FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
-          float[][] floatValues = blockValSet.getFloatValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (float value : floatValues[i]) {
-              floatSet.add(value);
-            }
-          }
-          break;
-        case DOUBLE:
-          DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
-          double[][] doubleValues = blockValSet.getDoubleValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (double value : doubleValues[i]) {
-              doubleSet.add(value);
-            }
-          }
-          break;
-        case STRING:
-          ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>) 
valueSet;
-          String[][] stringValues = blockValSet.getStringValuesMV();
-          for (int i = 0; i < length; i++) {
-            //noinspection ManualArrayToCollectionCopy
-            for (String value : stringValues[i]) {
-              //noinspection UseBulkOperation
-              stringSet.add(value);
-            }
-          }
-          break;
-        default:
-          throw getIllegalDataTypeException(valueType, false);
-      }
-    }
-
-    // Convert to HLL if the set size exceeds the threshold
-    if (valueSet.size() > _threshold) {
-      aggregationResultHolder.setValue(convertSetToHLL(valueSet, storedType));
-    }
-  }
+  // aggregateIntoSet is handled by the base class
 
   protected HyperLogLog convertSetToHLL(Set valueSet, DataType storedType) {
     if (storedType == DataType.BYTES) {
@@ -364,305 +235,13 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
     return hll;
   }
 
-  @Override
-  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    BlockValSet blockValSet = blockValSetMap.get(_expression);
+  // group-by SV handled by the base class
 
-    // For dictionary-encoded expression, store dictionary ids into the bitmap
-    Dictionary dictionary = blockValSet.getDictionary();
-    if (dictionary != null) {
-      if (blockValSet.isSingleValue()) {
-        int[] dictIds = blockValSet.getDictionaryIdsSV();
-        for (int i = 0; i < length; i++) {
-          getDictIdBitmap(groupByResultHolder, groupKeyArray[i], 
dictionary).add(dictIds[i]);
-        }
-      } else {
-        int[][] dictIds = blockValSet.getDictionaryIdsMV();
-        for (int i = 0; i < length; i++) {
-          getDictIdBitmap(groupByResultHolder, groupKeyArray[i], 
dictionary).add(dictIds[i]);
-        }
-      }
-      return;
-    }
+  // group-by MV handled by the base class
 
-    // For non-dictionary-encoded expression, store values into the value set
-    DataType valueType = blockValSet.getValueType();
-    DataType storedType = valueType.getStoredType();
-    if (blockValSet.isSingleValue()) {
-      switch (storedType) {
-        case INT:
-          int[] intValues = blockValSet.getIntValuesSV();
-          for (int i = 0; i < length; i++) {
-            ((IntOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.INT)).add(intValues[i]);
-          }
-          break;
-        case LONG:
-          long[] longValues = blockValSet.getLongValuesSV();
-          for (int i = 0; i < length; i++) {
-            ((LongOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.LONG)).add(longValues[i]);
-          }
-          break;
-        case FLOAT:
-          float[] floatValues = blockValSet.getFloatValuesSV();
-          for (int i = 0; i < length; i++) {
-            ((FloatOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.FLOAT)).add(floatValues[i]);
-          }
-          break;
-        case DOUBLE:
-          double[] doubleValues = blockValSet.getDoubleValuesSV();
-          for (int i = 0; i < length; i++) {
-            ((DoubleOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.DOUBLE)).add(
-                doubleValues[i]);
-          }
-          break;
-        case STRING:
-          String[] stringValues = blockValSet.getStringValuesSV();
-          for (int i = 0; i < length; i++) {
-            ((ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.STRING)).add(
-                stringValues[i]);
-          }
-          break;
-        case BYTES:
-          byte[][] bytesValues = blockValSet.getBytesValuesSV();
-          for (int i = 0; i < length; i++) {
-            ((ObjectOpenHashSet<ByteArray>) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.BYTES)).add(
-                new ByteArray(bytesValues[i]));
-          }
-          break;
-        default:
-          throw getIllegalDataTypeException(valueType, true);
-      }
-    } else {
-      switch (storedType) {
-        case INT:
-          int[][] intValues = blockValSet.getIntValuesMV();
-          for (int i = 0; i < length; i++) {
-            IntOpenHashSet intSet = (IntOpenHashSet) 
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT);
-            for (int value : intValues[i]) {
-              intSet.add(value);
-            }
-          }
-          break;
-        case LONG:
-          long[][] longValues = blockValSet.getLongValuesMV();
-          for (int i = 0; i < length; i++) {
-            LongOpenHashSet longSet =
-                (LongOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.LONG);
-            for (long value : longValues[i]) {
-              longSet.add(value);
-            }
-          }
-          break;
-        case FLOAT:
-          float[][] floatValues = blockValSet.getFloatValuesMV();
-          for (int i = 0; i < length; i++) {
-            FloatOpenHashSet floatSet =
-                (FloatOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.FLOAT);
-            for (float value : floatValues[i]) {
-              floatSet.add(value);
-            }
-          }
-          break;
-        case DOUBLE:
-          double[][] doubleValues = blockValSet.getDoubleValuesMV();
-          for (int i = 0; i < length; i++) {
-            DoubleOpenHashSet doubleSet =
-                (DoubleOpenHashSet) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.DOUBLE);
-            for (double value : doubleValues[i]) {
-              doubleSet.add(value);
-            }
-          }
-          break;
-        case STRING:
-          String[][] stringValues = blockValSet.getStringValuesMV();
-          for (int i = 0; i < length; i++) {
-            ObjectOpenHashSet<String> stringSet =
-                (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, 
groupKeyArray[i], DataType.STRING);
-            //noinspection ManualArrayToCollectionCopy
-            for (String value : stringValues[i]) {
-              //noinspection UseBulkOperation
-              stringSet.add(value);
-            }
-          }
-          break;
-        default:
-          throw getIllegalDataTypeException(valueType, false);
-      }
-    }
-  }
+  // extraction is handled by the base class
 
-  @Override
-  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    BlockValSet blockValSet = blockValSetMap.get(_expression);
-
-    // For dictionary-encoded expression, store dictionary ids into the bitmap
-    Dictionary dictionary = blockValSet.getDictionary();
-    if (dictionary != null) {
-      if (blockValSet.isSingleValue()) {
-        int[] dictIds = blockValSet.getDictionaryIdsSV();
-        for (int i = 0; i < length; i++) {
-          setDictIdForGroupKeys(groupByResultHolder, groupKeysArray[i], 
dictionary, dictIds[i]);
-        }
-      } else {
-        int[][] dictIds = blockValSet.getDictionaryIdsMV();
-        for (int i = 0; i < length; i++) {
-          for (int groupKey : groupKeysArray[i]) {
-            getDictIdBitmap(groupByResultHolder, groupKey, 
dictionary).add(dictIds[i]);
-          }
-        }
-      }
-      return;
-    }
-
-    // For non-dictionary-encoded expression, store values into the value set
-    DataType valueType = blockValSet.getValueType();
-    DataType storedType = valueType.getStoredType();
-    if (blockValSet.isSingleValue()) {
-      switch (storedType) {
-        case INT:
-          int[] intValues = blockValSet.getIntValuesSV();
-          for (int i = 0; i < length; i++) {
-            setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
intValues[i]);
-          }
-          break;
-        case LONG:
-          long[] longValues = blockValSet.getLongValuesSV();
-          for (int i = 0; i < length; i++) {
-            setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
longValues[i]);
-          }
-          break;
-        case FLOAT:
-          float[] floatValues = blockValSet.getFloatValuesSV();
-          for (int i = 0; i < length; i++) {
-            setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
floatValues[i]);
-          }
-          break;
-        case DOUBLE:
-          double[] doubleValues = blockValSet.getDoubleValuesSV();
-          for (int i = 0; i < length; i++) {
-            setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
doubleValues[i]);
-          }
-          break;
-        case STRING:
-          String[] stringValues = blockValSet.getStringValuesSV();
-          for (int i = 0; i < length; i++) {
-            setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], 
stringValues[i]);
-          }
-          break;
-        case BYTES:
-          byte[][] bytesValues = blockValSet.getBytesValuesSV();
-          for (int i = 0; i < length; i++) {
-            setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], new 
ByteArray(bytesValues[i]));
-          }
-          break;
-        default:
-          throw getIllegalDataTypeException(valueType, true);
-      }
-    } else {
-      switch (storedType) {
-        case INT:
-          int[][] intValues = blockValSet.getIntValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (int groupKey : groupKeysArray[i]) {
-              IntOpenHashSet intSet = (IntOpenHashSet) 
getValueSet(groupByResultHolder, groupKey, DataType.INT);
-              for (int value : intValues[i]) {
-                intSet.add(value);
-              }
-            }
-          }
-          break;
-        case LONG:
-          long[][] longValues = blockValSet.getLongValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (int groupKey : groupKeysArray[i]) {
-              LongOpenHashSet longSet = (LongOpenHashSet) 
getValueSet(groupByResultHolder, groupKey, DataType.LONG);
-              for (long value : longValues[i]) {
-                longSet.add(value);
-              }
-            }
-          }
-          break;
-        case FLOAT:
-          float[][] floatValues = blockValSet.getFloatValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (int groupKey : groupKeysArray[i]) {
-              FloatOpenHashSet floatSet = (FloatOpenHashSet) 
getValueSet(groupByResultHolder, groupKey, DataType.FLOAT);
-              for (float value : floatValues[i]) {
-                floatSet.add(value);
-              }
-            }
-          }
-          break;
-        case DOUBLE:
-          double[][] doubleValues = blockValSet.getDoubleValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (int groupKey : groupKeysArray[i]) {
-              DoubleOpenHashSet doubleSet =
-                  (DoubleOpenHashSet) getValueSet(groupByResultHolder, 
groupKey, DataType.DOUBLE);
-              for (double value : doubleValues[i]) {
-                doubleSet.add(value);
-              }
-            }
-          }
-          break;
-        case STRING:
-          String[][] stringValues = blockValSet.getStringValuesMV();
-          for (int i = 0; i < length; i++) {
-            for (int groupKey : groupKeysArray[i]) {
-              ObjectOpenHashSet<String> stringSet =
-                  (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, 
groupKey, DataType.STRING);
-              //noinspection ManualArrayToCollectionCopy
-              for (String value : stringValues[i]) {
-                //noinspection UseBulkOperation
-                stringSet.add(value);
-              }
-            }
-          }
-          break;
-        default:
-          throw getIllegalDataTypeException(valueType, false);
-      }
-    }
-  }
-
-  @Override
-  public Object extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
-    Object result = aggregationResultHolder.getResult();
-    if (result == null) {
-      return EMPTY_PLACEHOLDER;
-    }
-
-    if (result instanceof DictIdsWrapper) {
-      // For dictionary-encoded expression, convert dictionary ids to values
-      DictIdsWrapper dictIdsWrapper = (DictIdsWrapper) result;
-      if (dictIdsWrapper._dictIdBitmap.cardinalityExceeds(_threshold)) {
-        return convertToHLL(dictIdsWrapper);
-      } else {
-        return convertToValueSet(dictIdsWrapper);
-      }
-    } else {
-      // For non-dictionary-encoded expression, directly return the value set
-      return result;
-    }
-  }
-
-  @Override
-  public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int 
groupKey) {
-    Object result = groupByResultHolder.getResult(groupKey);
-    if (result == null) {
-      return EMPTY_PLACEHOLDER;
-    }
-
-    if (result instanceof DictIdsWrapper) {
-      // For dictionary-encoded expression, convert dictionary ids to values
-      return convertToValueSet((DictIdsWrapper) result);
-    } else {
-      // For non-dictionary-encoded expression, directly return the value set
-      return (Set) result;
-    }
-  }
+  // extraction is handled by the base class
 
   @Override
   public Object merge(Object intermediateResult1, Object intermediateResult2) {
@@ -760,193 +339,12 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
   /**
    * Returns the dictionary id bitmap from the result holder or creates a new 
one if it does not exist.
    */
-  protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder 
aggregationResultHolder,
-      Dictionary dictionary) {
-    DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
-    if (dictIdsWrapper == null) {
-      dictIdsWrapper = new DictIdsWrapper(dictionary);
-      aggregationResultHolder.setValue(dictIdsWrapper);
-    }
-    return dictIdsWrapper._dictIdBitmap;
-  }
-
-  /**
-   * Returns the value set from the result holder or creates a new one if it 
does not exist.
-   */
-  protected static Set getValueSet(AggregationResultHolder 
aggregationResultHolder, DataType valueType) {
-    Set valueSet = aggregationResultHolder.getResult();
-    if (valueSet == null) {
-      valueSet = getValueSet(valueType);
-      aggregationResultHolder.setValue(valueSet);
-    }
-    return valueSet;
-  }
-
-  /**
-   * Helper method to create a value set for the given value type.
-   */
-  private static Set getValueSet(DataType valueType) {
-    switch (valueType) {
-      case INT:
-        return new IntOpenHashSet();
-      case LONG:
-        return new LongOpenHashSet();
-      case FLOAT:
-        return new FloatOpenHashSet();
-      case DOUBLE:
-        return new DoubleOpenHashSet();
-      case STRING:
-      case BYTES:
-        return new ObjectOpenHashSet();
-      default:
-        throw new IllegalStateException("Illegal data type for DISTINCT_COUNT 
aggregation function: " + valueType);
-    }
-  }
-
-  /**
-   * Returns the dictionary id bitmap for the given group key or creates a new 
one if it does not exist.
-   */
-  protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder 
groupByResultHolder, int groupKey,
-      Dictionary dictionary) {
-    DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey);
-    if (dictIdsWrapper == null) {
-      dictIdsWrapper = new DictIdsWrapper(dictionary);
-      groupByResultHolder.setValueForKey(groupKey, dictIdsWrapper);
-    }
-    return dictIdsWrapper._dictIdBitmap;
-  }
-
-  /**
-   * Returns the value set for the given group key or creates a new one if it 
does not exist.
-   */
-  protected static Set getValueSet(GroupByResultHolder groupByResultHolder, 
int groupKey, DataType valueType) {
-    Set valueSet = groupByResultHolder.getResult(groupKey);
-    if (valueSet == null) {
-      valueSet = getValueSet(valueType);
-      groupByResultHolder.setValueForKey(groupKey, valueSet);
-    }
-    return valueSet;
-  }
-
-  /**
-   * Helper method to set dictionary id for the given group keys into the 
result holder.
-   */
-  private static void setDictIdForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys,
-      Dictionary dictionary, int dictId) {
-    for (int groupKey : groupKeys) {
-      getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId);
-    }
-  }
-
-  /**
-   * Helper method to set INT value for the given group keys into the result 
holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, int value) {
-    for (int groupKey : groupKeys) {
-      ((IntOpenHashSet) getValueSet(groupByResultHolder, groupKey, 
DataType.INT)).add(value);
-    }
-  }
-
-  /**
-   * Helper method to set LONG value for the given group keys into the result 
holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, long value) {
-    for (int groupKey : groupKeys) {
-      ((LongOpenHashSet) getValueSet(groupByResultHolder, groupKey, 
DataType.LONG)).add(value);
-    }
-  }
-
-  /**
-   * Helper method to set FLOAT value for the given group keys into the result 
holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, float value) {
-    for (int groupKey : groupKeys) {
-      ((FloatOpenHashSet) getValueSet(groupByResultHolder, groupKey, 
DataType.FLOAT)).add(value);
-    }
-  }
-
-  /**
-   * Helper method to set DOUBLE value for the given group keys into the 
result holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, double value) {
-    for (int groupKey : groupKeys) {
-      ((DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey, 
DataType.DOUBLE)).add(value);
-    }
-  }
-
-  /**
-   * Helper method to set STRING value for the given group keys into the 
result holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, String value) {
-    for (int groupKey : groupKeys) {
-      ((ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, groupKey, 
DataType.STRING)).add(value);
-    }
-  }
-
-  /**
-   * Helper method to set BYTES value for the given group keys into the result 
holder.
-   */
-  private static void setValueForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys, ByteArray value) {
-    for (int groupKey : groupKeys) {
-      ((ObjectOpenHashSet<ByteArray>) getValueSet(groupByResultHolder, 
groupKey, DataType.BYTES)).add(value);
-    }
-  }
-
-  /**
-   * Helper method to read dictionary and convert dictionary ids to a value 
set for dictionary-encoded expression.
-   */
-  private static Set convertToValueSet(DictIdsWrapper dictIdsWrapper) {
-    Dictionary dictionary = dictIdsWrapper._dictionary;
-    RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
-    int numValues = dictIdBitmap.getCardinality();
-    PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
-    DataType storedType = dictionary.getValueType();
-    switch (storedType) {
-      case INT:
-        IntOpenHashSet intSet = new IntOpenHashSet(numValues);
-        while (iterator.hasNext()) {
-          intSet.add(dictionary.getIntValue(iterator.next()));
-        }
-        return intSet;
-      case LONG:
-        LongOpenHashSet longSet = new LongOpenHashSet(numValues);
-        while (iterator.hasNext()) {
-          longSet.add(dictionary.getLongValue(iterator.next()));
-        }
-        return longSet;
-      case FLOAT:
-        FloatOpenHashSet floatSet = new FloatOpenHashSet(numValues);
-        while (iterator.hasNext()) {
-          floatSet.add(dictionary.getFloatValue(iterator.next()));
-        }
-        return floatSet;
-      case DOUBLE:
-        DoubleOpenHashSet doubleSet = new DoubleOpenHashSet(numValues);
-        while (iterator.hasNext()) {
-          doubleSet.add(dictionary.getDoubleValue(iterator.next()));
-        }
-        return doubleSet;
-      case STRING:
-        ObjectOpenHashSet<String> stringSet = new 
ObjectOpenHashSet<>(numValues);
-        while (iterator.hasNext()) {
-          stringSet.add(dictionary.getStringValue(iterator.next()));
-        }
-        return stringSet;
-      case BYTES:
-        ObjectOpenHashSet<ByteArray> bytesSet = new 
ObjectOpenHashSet<>(numValues);
-        while (iterator.hasNext()) {
-          bytesSet.add(new 
ByteArray(dictionary.getBytesValue(iterator.next())));
-        }
-        return bytesSet;
-      default:
-        throw new IllegalStateException("Illegal data type for DISTINCT_COUNT 
aggregation function: " + storedType);
-    }
-  }
+  // helper methods for dict/value set conversions are provided by the base 
class
 
   /**
    * Helper method to read dictionary and convert dictionary ids to a 
HyperLogLog for dictionary-encoded expression.
    */
-  private HyperLogLog convertToHLL(DictIdsWrapper dictIdsWrapper) {
+  private HyperLogLog 
convertToHLL(BaseDistinctCountSmartSketchAggregationFunction.DictIdsWrapper 
dictIdsWrapper) {
     HyperLogLog hyperLogLog = new HyperLogLog(_log2m);
     Dictionary dictionary = dictIdsWrapper._dictionary;
     RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
@@ -957,20 +355,21 @@ public class DistinctCountSmartHLLAggregationFunction 
extends BaseSingleInputAgg
     return hyperLogLog;
   }
 
-  private static IllegalStateException getIllegalDataTypeException(DataType 
dataType, boolean singleValue) {
-    return new IllegalStateException(
-        "Illegal data type for DISTINCT_COUNT_SMART_HLL aggregation function: 
" + dataType + (singleValue ? ""
-            : "_MV"));
+  @Override
+  protected Object convertSetToSketch(Set valueSet, DataType storedType) {
+    return convertSetToHLL(valueSet, storedType);
   }
 
-  private static final class DictIdsWrapper {
-    final Dictionary _dictionary;
-    final RoaringBitmap _dictIdBitmap;
+  @Override
+  protected Object 
convertToSketch(BaseDistinctCountSmartSketchAggregationFunction.DictIdsWrapper 
dictIdsWrapper) {
+    return convertToHLL(dictIdsWrapper);
+  }
 
-    private DictIdsWrapper(Dictionary dictionary) {
-      _dictionary = dictionary;
-      _dictIdBitmap = new RoaringBitmap();
-    }
+  @Override
+  protected IllegalStateException getIllegalDataTypeException(DataType 
dataType, boolean singleValue) {
+    return new IllegalStateException(
+        "Illegal data type for DISTINCT_COUNT_SMART_HLL aggregation function: 
" + dataType + (singleValue ? ""
+            : "_MV"));
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartULLAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartULLAggregationFunction.java
new file mode 100644
index 00000000000..de639f4b464
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartULLAggregationFunction.java
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.dynatrace.hash4j.distinctcount.UltraLogLog;
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code DistinctCountSmartULLAggregationFunction} calculates the number 
of distinct values for a given expression
+ * (both single-valued and multi-valued are supported).
+ *
+ * For aggregation-only queries, the distinct values are stored in a Set 
initially. Once the number of distinct values
+ * exceeds a threshold, the Set will be converted into an UltraLogLog, and 
approximate result will be returned.
+ *
+ * The function takes an optional second argument for parameters:
+ * - threshold: Threshold of the number of distinct values to trigger the 
conversion, 100_000 by default. Non-positive
+ *              value means never convert.
+ * - p: Parameter p for UltraLogLog, default defined by 
CommonConstants.Helix.DEFAULT_ULTRALOGLOG_P.
+ * Example of second argument: 'threshold=10;p=16'
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctCountSmartULLAggregationFunction extends 
BaseDistinctCountSmartSketchAggregationFunction {
+  // placeholder handled by base class
+
+  private final int _threshold;
+  private final int _p;
+
+  public DistinctCountSmartULLAggregationFunction(List<ExpressionContext> 
arguments) {
+    super(arguments.get(0));
+    int numExpressions = arguments.size();
+    // This function expects 1 or 2 arguments.
+    Preconditions.checkArgument(numExpressions <= 2, "DistinctCountSmartULL 
expects 1 or 2 arguments, got: %s",
+        numExpressions);
+    if (arguments.size() > 1) {
+      Parameters parameters = new 
Parameters(arguments.get(1).getLiteral().getStringValue());
+      _threshold = parameters._threshold;
+      _p = parameters._p;
+    } else {
+      _threshold = Parameters.DEFAULT_THRESHOLD;
+      _p = 
org.apache.pinot.spi.utils.CommonConstants.Helix.DEFAULT_ULTRALOGLOG_P;
+    }
+  }
+
+  public int getThreshold() {
+    return _threshold;
+  }
+
+  public int getP() {
+    return _p;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTSMARTULL;
+  }
+
+  // Result holder creators are provided by the base class
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder, 
dictionary);
+      if (blockValSet.isSingleValue()) {
+        int[] dictIds = blockValSet.getDictionaryIdsSV();
+        dictIdBitmap.addN(dictIds, 0, length);
+      } else {
+        int[][] dictIds = blockValSet.getDictionaryIdsMV();
+        for (int i = 0; i < length; i++) {
+          dictIdBitmap.add(dictIds[i]);
+        }
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the value set 
or ULL
+    if (aggregationResultHolder.getResult() instanceof UltraLogLog) {
+      aggregateIntoULL(length, aggregationResultHolder, blockValSet);
+    } else {
+      aggregateIntoSet(length, aggregationResultHolder, blockValSet);
+    }
+  }
+
+  private void aggregateIntoULL(int length, AggregationResultHolder 
aggregationResultHolder, BlockValSet blockValSet) {
+    DataType valueType = blockValSet.getValueType();
+    DataType storedType = valueType.getStoredType();
+    UltraLogLog ull = aggregationResultHolder.getResult();
+    if (blockValSet.isSingleValue()) {
+      switch (storedType) {
+        case INT: {
+          int[] intValues = blockValSet.getIntValuesSV();
+          for (int i = 0; i < length; i++) {
+            UltraLogLogUtils.hashObject(intValues[i]).ifPresent(ull::add);
+          }
+          break;
+        }
+        case LONG: {
+          long[] longValues = blockValSet.getLongValuesSV();
+          for (int i = 0; i < length; i++) {
+            UltraLogLogUtils.hashObject(longValues[i]).ifPresent(ull::add);
+          }
+          break;
+        }
+        case FLOAT: {
+          float[] floatValues = blockValSet.getFloatValuesSV();
+          for (int i = 0; i < length; i++) {
+            UltraLogLogUtils.hashObject(floatValues[i]).ifPresent(ull::add);
+          }
+          break;
+        }
+        case DOUBLE: {
+          double[] doubleValues = blockValSet.getDoubleValuesSV();
+          for (int i = 0; i < length; i++) {
+            UltraLogLogUtils.hashObject(doubleValues[i]).ifPresent(ull::add);
+          }
+          break;
+        }
+        case STRING: {
+          String[] stringValues = blockValSet.getStringValuesSV();
+          for (int i = 0; i < length; i++) {
+            UltraLogLogUtils.hashObject(stringValues[i]).ifPresent(ull::add);
+          }
+          break;
+        }
+        case BYTES: {
+          byte[][] bytesValues = blockValSet.getBytesValuesSV();
+          for (int i = 0; i < length; i++) {
+            UltraLogLogUtils.hashObject(bytesValues[i]).ifPresent(ull::add);
+          }
+          break;
+        }
+        default:
+          throw getIllegalDataTypeException(valueType, true);
+      }
+    } else {
+      switch (storedType) {
+        case INT: {
+          int[][] intValues = blockValSet.getIntValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (int value : intValues[i]) {
+              UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+            }
+          }
+          break;
+        }
+        case LONG: {
+          long[][] longValues = blockValSet.getLongValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (long value : longValues[i]) {
+              UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+            }
+          }
+          break;
+        }
+        case FLOAT: {
+          float[][] floatValues = blockValSet.getFloatValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (float value : floatValues[i]) {
+              UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+            }
+          }
+          break;
+        }
+        case DOUBLE: {
+          double[][] doubleValues = blockValSet.getDoubleValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (double value : doubleValues[i]) {
+              UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+            }
+          }
+          break;
+        }
+        case STRING: {
+          String[][] stringValues = blockValSet.getStringValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (String value : stringValues[i]) {
+              UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+            }
+          }
+          break;
+        }
+        case BYTES: {
+          byte[][][] bytesValues = blockValSet.getBytesValuesMV();
+          for (int i = 0; i < length; i++) {
+            for (byte[] value : bytesValues[i]) {
+              UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+            }
+          }
+          break;
+        }
+        default:
+          throw getIllegalDataTypeException(valueType, false);
+      }
+    }
+  }
+
+  // aggregateIntoSet is handled by the base class
+
+  protected UltraLogLog convertSetToULL(Set valueSet, DataType storedType) {
+    if (storedType == DataType.BYTES) {
+      return convertByteArraySetToULL((ObjectSet<ByteArray>) valueSet);
+    } else {
+      return convertNonByteArraySetToULL(valueSet);
+    }
+  }
+
+  protected UltraLogLog convertByteArraySetToULL(ObjectSet<ByteArray> 
valueSet) {
+    UltraLogLog ull = UltraLogLog.create(_p);
+    for (ByteArray value : valueSet) {
+      UltraLogLogUtils.hashObject(value.getBytes()).ifPresent(ull::add);
+    }
+    return ull;
+  }
+
+  protected UltraLogLog convertNonByteArraySetToULL(Set valueSet) {
+    UltraLogLog ull = UltraLogLog.create(_p);
+    for (Object value : valueSet) {
+      UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+    }
+    return ull;
+  }
+
+  // group-by SV handled by the base class
+
+  // group-by MV handled by the base class
+
+  // extraction is handled by the base class
+
+  @Override
+  public Object merge(Object intermediateResult1, Object intermediateResult2) {
+    if (intermediateResult1 instanceof UltraLogLog) {
+      return mergeIntoULL((UltraLogLog) intermediateResult1, 
intermediateResult2);
+    }
+    if (intermediateResult2 instanceof UltraLogLog) {
+      return mergeIntoULL((UltraLogLog) intermediateResult2, 
intermediateResult1);
+    }
+
+    Set valueSet1 = (Set) intermediateResult1;
+    Set valueSet2 = (Set) intermediateResult2;
+    if (valueSet1.isEmpty()) {
+      return valueSet2;
+    }
+    if (valueSet2.isEmpty()) {
+      return valueSet1;
+    }
+    valueSet1.addAll(valueSet2);
+
+    if (valueSet1.size() > _threshold) {
+      if (valueSet1 instanceof ObjectSet && valueSet1.iterator().next() 
instanceof ByteArray) {
+        return convertByteArraySetToULL((ObjectSet<ByteArray>) valueSet1);
+      } else {
+        return convertNonByteArraySetToULL(valueSet1);
+      }
+    } else {
+      return valueSet1;
+    }
+  }
+
+  private UltraLogLog mergeIntoULL(UltraLogLog ull, Object intermediateResult) 
{
+    if (intermediateResult instanceof UltraLogLog) {
+      UltraLogLog other = (UltraLogLog) intermediateResult;
+      int largerP = Math.max(ull.getP(), other.getP());
+      if (largerP != ull.getP()) {
+        UltraLogLog merged = UltraLogLog.create(largerP);
+        merged.add(ull);
+        merged.add(other);
+        return merged;
+      } else {
+        ull.add(other);
+        return ull;
+      }
+    } else {
+      Set valueSet = (Set) intermediateResult;
+      if (!valueSet.isEmpty()) {
+        if (valueSet instanceof ObjectSet && valueSet.iterator().next() 
instanceof ByteArray) {
+          for (Object value : valueSet) {
+            UltraLogLogUtils.hashObject(((ByteArray) 
value).getBytes()).ifPresent(ull::add);
+          }
+        } else {
+          for (Object value : valueSet) {
+            UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+          }
+        }
+      }
+      return ull;
+    }
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public SerializedIntermediateResult serializeIntermediateResult(Object o) {
+    if (o instanceof UltraLogLog) {
+      return new 
SerializedIntermediateResult(ObjectSerDeUtils.ObjectType.UltraLogLog.getValue(),
+          ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.serialize((UltraLogLog) 
o));
+    }
+    return BaseDistinctAggregateAggregationFunction.serializeSet((Set) o);
+  }
+
+  @Override
+  public Object deserializeIntermediateResult(CustomObject customObject) {
+    return ObjectSerDeUtils.deserialize(customObject);
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.INT;
+  }
+
+  @Override
+  public Integer extractFinalResult(Object intermediateResult) {
+    if (intermediateResult instanceof UltraLogLog) {
+      return (int) Math.round(((UltraLogLog) 
intermediateResult).getDistinctCountEstimate());
+    } else {
+      return ((Set) intermediateResult).size();
+    }
+  }
+
+  @Override
+  public Integer mergeFinalResult(Integer finalResult1, Integer finalResult2) {
+    return finalResult1 + finalResult2;
+  }
+
+  /**
+   * Returns the dictionary id bitmap from the result holder or creates a new 
one if it does not exist.
+   */
+  // getDictIdBitmap for result holder is provided by the base class
+
+  /**
+   * Returns the value set from the result holder or creates a new one if it 
does not exist.
+   */
+  // value set helpers are provided by the base class
+
+  /**
+   * Returns the dictionary id bitmap for the given group key or creates a new 
one if it does not exist.
+   */
+  // groupBy result helpers are provided by the base class
+
+  /**
+   * Returns the value set for the given group key or creates a new one if it 
does not exist.
+   */
+  // group-by value set helper is provided by the base class
+
+  /**
+   * Helper method to set dictionary id for the given group keys into the 
result holder.
+   */
+  // setDictIdForGroupKeys is provided by the base class
+
+  /**
+   * Helper method to set INT value for the given group keys into the result 
holder.
+   */
+  // typed setValueForGroupKeys helpers are provided by the base class
+
+  /**
+   * Helper method to set LONG value for the given group keys into the result 
holder.
+   */
+
+  /**
+   * Helper method to set FLOAT value for the given group keys into the result 
holder.
+   */
+
+  /**
+   * Helper method to set DOUBLE value for the given group keys into the 
result holder.
+   */
+
+  /**
+   * Helper method to set STRING value for the given group keys into the 
result holder.
+   */
+
+  /**
+   * Helper method to set BYTES value for the given group keys into the result 
holder.
+   */
+  // typed setValueForGroupKeys helpers are provided by the base class
+
+  /**
+   * Helper method to read dictionary and convert dictionary ids to a value 
set for dictionary-encoded expression.
+   */
+  // value set conversion handled by the base class
+
+  /**
+   * Helper method to read dictionary and convert dictionary ids to an 
UltraLogLog for dictionary-encoded expression.
+   */
+  private UltraLogLog 
convertToULL(BaseDistinctCountSmartSketchAggregationFunction.DictIdsWrapper 
dictIdsWrapper) {
+    UltraLogLog ull = UltraLogLog.create(_p);
+    Dictionary dictionary = dictIdsWrapper._dictionary;
+    RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
+    PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+    while (iterator.hasNext()) {
+      
UltraLogLogUtils.hashObject(dictionary.get(iterator.next())).ifPresent(ull::add);
+    }
+    return ull;
+  }
+
+  @Override
+  protected IllegalStateException getIllegalDataTypeException(DataType 
dataType, boolean singleValue) {
+    return new IllegalStateException(
+        "Illegal data type for DISTINCT_COUNT_SMART_ULL aggregation function: 
" + dataType + (singleValue ? ""
+            : "_MV"));
+  }
+
+  // DictIdsWrapper is provided by the base class
+
+  // threshold accessor for base class is provided by getThreshold()
+
+  @Override
+  protected Object convertSetToSketch(Set valueSet, DataType storedType) {
+    return convertSetToULL(valueSet, storedType);
+  }
+
+  @Override
+  protected Object 
convertToSketch(BaseDistinctCountSmartSketchAggregationFunction.DictIdsWrapper 
dictIdsWrapper) {
+    return convertToULL(dictIdsWrapper);
+  }
+
+  /**
+   * Helper class to wrap the parameters.
+   */
+  private static class Parameters {
+    static final char PARAMETER_DELIMITER = ';';
+    static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
+
+    static final String THRESHOLD_KEY = "THRESHOLD";
+    static final int DEFAULT_THRESHOLD = 100_000;
+
+    static final String P_KEY = "P";
+
+    int _threshold = DEFAULT_THRESHOLD;
+    int _p = 
org.apache.pinot.spi.utils.CommonConstants.Helix.DEFAULT_ULTRALOGLOG_P;
+
+    Parameters(String parametersString) {
+      parametersString = StringUtils.deleteWhitespace(parametersString);
+      String[] keyValuePairs = StringUtils.split(parametersString, 
PARAMETER_DELIMITER);
+      for (String keyValuePair : keyValuePairs) {
+        String[] keyAndValue = StringUtils.split(keyValuePair, 
PARAMETER_KEY_VALUE_SEPARATOR);
+        Preconditions.checkArgument(keyAndValue.length == 2, "Invalid 
parameter: %s", keyValuePair);
+        String key = keyAndValue[0];
+        String value = keyAndValue[1];
+        switch (key.toUpperCase()) {
+          case THRESHOLD_KEY:
+            _threshold = Integer.parseInt(value);
+            if (_threshold <= 0) {
+              _threshold = Integer.MAX_VALUE;
+            }
+            break;
+          case P_KEY:
+            _p = Integer.parseInt(value);
+            break;
+          default:
+            throw new IllegalArgumentException("Invalid parameter key: " + 
key);
+        }
+      }
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
index e4045a9a02f..1a036ae5342 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
@@ -47,8 +47,8 @@ public class DistinctCountULLAggregationFunction extends 
BaseSingleInputAggregat
   public DistinctCountULLAggregationFunction(List<ExpressionContext> 
arguments) {
     super(arguments.get(0));
     int numExpressions = arguments.size();
-    // This function expects 1 or 2 or 3 arguments.
-    Preconditions.checkArgument(numExpressions <= 2, "DistinctCountHLLPlus 
expects 1 or 2 arguments, got: %s",
+    // This function expects 1 or 2 arguments.
+    Preconditions.checkArgument(numExpressions <= 2, "DistinctCountULL expects 
1 or 2 arguments, got: %s",
         numExpressions);
     if (arguments.size() == 2) {
       _p = arguments.get(1).getLiteral().getIntValue();
@@ -128,19 +128,19 @@ public class DistinctCountULLAggregationFunction extends 
BaseSingleInputAggregat
       case FLOAT:
         float[] floatValues = blockValSet.getFloatValuesSV();
         for (int i = 0; i < length; i++) {
-            UltraLogLogUtils.hashObject(floatValues[i]).ifPresent(ull::add);
+          UltraLogLogUtils.hashObject(floatValues[i]).ifPresent(ull::add);
         }
         break;
       case DOUBLE:
         double[] doubleValues = blockValSet.getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-            UltraLogLogUtils.hashObject(doubleValues[i]).ifPresent(ull::add);
+          UltraLogLogUtils.hashObject(doubleValues[i]).ifPresent(ull::add);
         }
         break;
       case STRING:
         String[] stringValues = blockValSet.getStringValuesSV();
         for (int i = 0; i < length; i++) {
-            UltraLogLogUtils.hashObject(stringValues[i]).ifPresent(ull::add);
+          UltraLogLogUtils.hashObject(stringValues[i]).ifPresent(ull::add);
         }
         break;
       default:
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
index e3c6b6240fc..be1666a9b6a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
@@ -521,6 +521,50 @@ public class DistinctCountQueriesTest extends 
BaseQueriesTest {
     assertEquals(function.getLog2m(), 8);
   }
 
+  @Test
+  public void testSmartULL() {
+    String query = "SELECT DISTINCTCOUNTSMARTULL(intColumn, 'threshold=10'), "
+        + "DISTINCTCOUNTSMARTULL(longColumn, 'threshold=10'), 
DISTINCTCOUNTSMARTULL(floatColumn, 'threshold=10'), "
+        + "DISTINCTCOUNTSMARTULL(doubleColumn, 'threshold=10'), 
DISTINCTCOUNTSMARTULL(stringColumn, 'threshold=10') "
+        + "FROM testTable";
+
+    Object[] interSegmentsExpectedResults = new Object[5];
+    for (Object operator : Arrays.asList(getOperator(query), 
getOperatorWithFilter(query))) {
+      assertTrue(operator instanceof NonScanBasedAggregationOperator);
+      AggregationResultsBlock resultsBlock = 
((NonScanBasedAggregationOperator) operator).nextBlock();
+      QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) 
operator).getExecutionStatistics(), NUM_RECORDS,
+          0, 0, NUM_RECORDS);
+      List<Object> aggregationResult = resultsBlock.getResults();
+      assertNotNull(aggregationResult);
+      assertEquals(aggregationResult.size(), 5);
+      for (int i = 0; i < 5; i++) {
+        // After threshold promotion, result should be ULL object
+        assertTrue(aggregationResult.get(i) instanceof 
com.dynatrace.hash4j.distinctcount.UltraLogLog);
+        com.dynatrace.hash4j.distinctcount.UltraLogLog ull =
+            (com.dynatrace.hash4j.distinctcount.UltraLogLog) 
aggregationResult.get(i);
+        int actual = (int) Math.round(ull.getDistinctCountEstimate());
+        int expected = _values.size();
+        // ULL with default p provides high accuracy; allow 5% error similar 
to HLL(log2m=12)
+        assertEquals(actual, expected, expected * 0.05);
+        interSegmentsExpectedResults[i] = actual;
+      }
+    }
+
+    for (BrokerResponseNative brokerResponse : 
Arrays.asList(getBrokerResponse(query),
+        getBrokerResponseWithFilter(query))) {
+      QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 * 
NUM_RECORDS, 0, 0, 4 * NUM_RECORDS,
+          interSegmentsExpectedResults);
+    }
+
+    // Change p via parameters
+    query = "SELECT DISTINCTCOUNTSMARTULL(intColumn, 'threshold=10;p=16') FROM 
testTable";
+    NonScanBasedAggregationOperator nonScanOperator = getOperator(query);
+    List<Object> aggregationResult = nonScanOperator.nextBlock().getResults();
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 1);
+    assertTrue(aggregationResult.get(0) instanceof 
com.dynatrace.hash4j.distinctcount.UltraLogLog);
+  }
+
   @AfterClass
   public void tearDown()
       throws IOException {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index f74807f9f29..111c57c7b16 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -68,7 +68,10 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.function.scalar.StringFunctions.*;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 
 public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestSet {
@@ -259,11 +262,12 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String[] numericResultFunctions = new String[]{
         "distinctCount", "distinctCountBitmap", "distinctCountHLL", 
"segmentPartitionedDistinctCount",
-        "distinctCountSmartHLL", "distinctCountThetaSketch", "distinctSum", 
"distinctAvg"
+        "distinctCountSmartHLL", "distinctCountULL", "distinctCountSmartULL", 
"distinctCountThetaSketch",
+        "distinctSum", "distinctAvg"
     };
 
     double[] expectedNumericResults = new double[]{
-        364, 364, 355, 364, 364, 364, 5915969, 16252.662087912087
+        364, 364, 355, 364, 364, 364, 364, 364, 5915969, 16252.662087912087
     };
     Assert.assertEquals(numericResultFunctions.length, 
expectedNumericResults.length);
 
@@ -377,8 +381,7 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
   @Test
   public void testUnsupportedUdfOnIntermediateStage()
       throws Exception {
-    String sqlQuery = ""
-        + "SET timeoutMs=10000;\n" // In older versions we timeout in this 
case, but we should fail fast now
+    String sqlQuery = "SET timeoutMs=10000;\n" // In older versions we timeout 
in this case, but we should fail fast now
         + "WITH fakeTable AS (\n" // this table is used to make sure the call 
is made on an intermediate stage
         + "  SELECT \n"
         + "    t1.DaysSinceEpoch + t2.DaysSinceEpoch as DaysSinceEpoch"
@@ -1555,7 +1558,8 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
   }
 
   @Test
-  public void testCaseInsensitiveNames() throws Exception {
+  public void testCaseInsensitiveNames()
+      throws Exception {
     String query = "select ACTualELAPsedTIMe from mYtABLE where 
actUALelAPSedTIMe > 0 limit 1";
     JsonNode jsonNode = postQuery(query);
     long result = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -1564,7 +1568,8 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
   }
 
   @Test
-  public void testCaseInsensitiveNamesAgainstController() throws Exception {
+  public void testCaseInsensitiveNamesAgainstController()
+      throws Exception {
     String query = "select ACTualELAPsedTIMe from mYtABLE where 
actUALelAPSedTIMe > 0 limit 1";
     JsonNode jsonNode = postQueryToController(query);
     long result = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -1573,7 +1578,8 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
   }
 
   @Test
-  public void testQueryCompileBrokerTimeout() throws Exception {
+  public void testQueryCompileBrokerTimeout()
+      throws Exception {
     // The sleep function is called with a literal value so it should be 
evaluated during the query compile phase
     String query = "SET timeoutMs=100; SELECT sleep(1000) FROM mytable";
 
@@ -1598,7 +1604,8 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
   }
 
   @Test
-  public void testNumServersQueried() throws Exception {
+  public void testNumServersQueried()
+      throws Exception {
     String query = "select * from mytable limit 10";
     JsonNode jsonNode = postQuery(query);
     JsonNode numServersQueried = jsonNode.get("numServersQueried");
@@ -1608,7 +1615,8 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
   }
 
   @Test
-  public void testLookupJoin() throws Exception {
+  public void testLookupJoin()
+      throws Exception {
 
     Schema lookupTableSchema = createSchema(DIM_TABLE_SCHEMA_PATH);
     addSchema(lookupTableSchema);
@@ -1645,7 +1653,8 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     dropOfflineTable(tableConfig.getTableName());
   }
 
-  public void testSearchLiteralFilter() throws Exception {
+  public void testSearchLiteralFilter()
+      throws Exception {
     String sqlQuery =
         "WITH CTE_B AS (SELECT 1692057600000 AS __ts FROM mytable GROUP BY 
__ts) SELECT 1692057600000 AS __ts FROM "
             + "CTE_B WHERE __ts >= 1692057600000 AND __ts < 1693267200000 
GROUP BY __ts";
@@ -1670,7 +1679,8 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
   }
 
   @Test
-  public void testPolymorphicScalarArrayFunctions() throws Exception {
+  public void testPolymorphicScalarArrayFunctions()
+      throws Exception {
     String query = "select ARRAY_LENGTH(ARRAY[1,2,3]);";
     JsonNode jsonNode = postQuery(query);
     assertNoError(jsonNode);
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index ede51582739..4203f891069 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -95,6 +95,8 @@ public enum AggregationFunctionType {
       OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER), i 
-> i == 1), SqlTypeName.OTHER),
   DISTINCTCOUNTRAWULL("distinctCountRawULL", ReturnTypes.VARCHAR,
       OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER), i 
-> i == 1), SqlTypeName.OTHER),
+  DISTINCTCOUNTSMARTULL("distinctCountSmartULL", ReturnTypes.BIGINT,
+      OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER), 
i -> i == 1), SqlTypeName.OTHER),
   DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch", ReturnTypes.BIGINT, 
OperandTypes.ONE_OR_MORE, SqlTypeName.OTHER),
   DISTINCTCOUNTRAWTHETASKETCH("distinctCountRawThetaSketch", 
ReturnTypes.VARCHAR, OperandTypes.ONE_OR_MORE,
       SqlTypeName.OTHER),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to