This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a9e319964f Theta Sketch Aggregation Enhancements (#12042)
a9e319964f is described below

commit a9e319964f642ff86f2c4a6122058bb033a2475a
Author: David Cromberge <[email protected]>
AuthorDate: Tue Dec 5 23:22:30 2023 +0000

    Theta Sketch Aggregation Enhancements (#12042)
    
    * Theta Sketch Aggregation Enhancements
    
    Introduces additional parameters to the DistinctCountThetaSketch 
aggregation function that
    give the end-user more control over how sketches are merged.  The defaults 
are selected
    to ensure that the behaviour remains unchanged over the current 
implementation.
    
    Furthermore, an accumulator custom object is added to ensure that pairwise 
union
    operations are avoided as much as possible.  Instead, sketches can be 
aggregated
    and merged when a threshold is met.
    
    * Use correct naming convention for private variable
    
    * Fetch flaky test edge-case
    
    * Decrease default constant value
    
    This better aligns to the default nominal values parameter
    that is used in the query aggregation function.
    
    * Attempt 2: Simplify implementation
    
    Removes intermediate array list to buffer/accumulate sketch
    elements.  Instead, inputs are fed directly to the underlying
    union.  This ensures that the memory usage of the merge is
    kept under control.
    
    * Rename sampling probability parameter
    
    * Minor code improvements
    
    * Revert "Attempt 2: Simplify implementation"
    
    This reverts commit 2ed38f395ae1e3071637f58da2386e867dbc80e3.
    
    * Add toString methods for supported sketches to enable debugging
    
    * Additional inline commentary on early stop optimization.
    
    * Refactor serializer for Theta to remove temp variables
---
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  43 ++++-
 .../core/function/scalar/SketchFunctions.java      |  10 +
 ...inctCountRawThetaSketchAggregationFunction.java |  19 +-
 ...istinctCountThetaSketchAggregationFunction.java | 202 ++++++++++++++-------
 .../pinot/core/common/ObjectSerDeUtilsTest.java    |  53 ++++++
 .../core/function/scalar/SketchFunctionsTest.java  |  22 +++
 .../DistinctCountThetaSketchQueriesTest.java       |  32 ++--
 .../local/customobject/ThetaSketchAccumulator.java | 141 ++++++++++++++
 .../segment/local/utils/CustomSerDeUtils.java      |   9 +-
 ...istinctCountThetaSketchValueAggregatorTest.java |  13 ++
 .../customobject/ThetaSketchAccumulatorTest.java   | 104 +++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +-
 12 files changed, 554 insertions(+), 98 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 903ef712db..14aa2d2f10 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -88,6 +88,7 @@ import 
org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
 import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
 import org.apache.pinot.segment.local.customobject.VarianceTuple;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -154,7 +155,8 @@ public class ObjectSerDeUtils {
     LongArrayList(43),
     FloatArrayList(44),
     StringArrayList(45),
-    UltraLogLog(46);
+    UltraLogLog(46),
+    ThetaSketchAccumulator(47);
 
     private final int _value;
 
@@ -273,6 +275,8 @@ public class ObjectSerDeUtils {
         return ObjectType.CompressedProbabilisticCounting;
       } else if (value instanceof UltraLogLog) {
         return ObjectType.UltraLogLog;
+      } else if (value instanceof ThetaSketchAccumulator) {
+        return ObjectType.ThetaSketchAccumulator;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + 
value.getClass().getSimpleName());
       }
@@ -1125,9 +1129,12 @@ public class ObjectSerDeUtils {
 
     @Override
     public byte[] serialize(Sketch value) {
-      // NOTE: Compact the sketch in unsorted, on-heap fashion for performance 
concern.
-      //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html 
for more details.
-      return value.compact(false, null).toByteArray();
+      // The serializer should respect existing ordering to enable "early stop"
+      // optimisations on unions.
+      if (!value.isCompact()) {
+        return value.compact(value.isOrdered(), null).toByteArray();
+      }
+      return value.toByteArray();
     }
 
     @Override
@@ -1580,6 +1587,33 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<ThetaSketchAccumulator> 
DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE =
+      new ObjectSerDe<ThetaSketchAccumulator>() {
+
+        @Override
+        public byte[] serialize(ThetaSketchAccumulator thetaSketchBuffer) {
+          Sketch sketch = thetaSketchBuffer.getResult();
+          return sketch.toByteArray();
+        }
+
+        @Override
+        public ThetaSketchAccumulator deserialize(byte[] bytes) {
+          return deserialize(ByteBuffer.wrap(bytes));
+        }
+
+        // Note: The accumulator is designed to serialize as a sketch and 
should
+        // not be deserialized in practice.
+        @Override
+        public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) {
+          ThetaSketchAccumulator thetaSketchAccumulator = new 
ThetaSketchAccumulator();
+          byte[] bytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(bytes);
+          Sketch sketch = Sketch.wrap(Memory.wrap(bytes));
+          thetaSketchAccumulator.apply(sketch);
+          return thetaSketchAccumulator;
+        }
+      };
+
   // NOTE: DO NOT change the order, it has to be the same order as the 
ObjectType
   //@formatter:off
   private static final ObjectSerDe[] SER_DES = {
@@ -1630,6 +1664,7 @@ public class ObjectSerDeUtils {
       FLOAT_ARRAY_LIST_SER_DE,
       STRING_ARRAY_LIST_SER_DE,
       ULTRA_LOG_LOG_OBJECT_SER_DE,
+      DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE,
   };
   //@formatter:on
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index 74e35e8bb7..90e313edb2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -261,6 +261,11 @@ public class SketchFunctions {
     return diff.getResult(false, null, false);
   }
 
+  @ScalarFunction(names = {"thetaSketchToString", "theta_sketch_to_string"})
+  public static String thetaSketchToString(Object sketchObject) {
+    return asThetaSketch(sketchObject).toString();
+  }
+
   private static Sketch thetaSketchUnionVar(Object... sketchObjects) {
     Union union = SET_OPERATION_BUILDER.buildUnion();
     for (Object sketchObj : sketchObjects) {
@@ -417,6 +422,11 @@ public class SketchFunctions {
     return cpcSketchUnionVar(o1, o2, o3, o4, o5);
   }
 
+  @ScalarFunction(names = {"cpcSketchToString", "cpc_sketch_to_string"})
+  public static String cpcSketchToString(Object sketchObject) {
+    return asCpcSketch(sketchObject).toString();
+  }
+
   /**
    * Create a CPC Sketch containing the input, with a configured nominal 
entries
    *
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
index cd75fa3807..00d6ec2906 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
@@ -47,11 +49,18 @@ public class DistinctCountRawThetaSketchAggregationFunction 
extends DistinctCoun
   }
 
   @Override
-  public String extractFinalResult(List<Sketch> sketches) {
-    Sketch sketch = evaluatePostAggregationExpression(sketches);
+  public String extractFinalResult(List<ThetaSketchAccumulator> accumulators) {
+    int numAccumulators = accumulators.size();
+    List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
 
-    // NOTE: Compact the sketch in unsorted, on-heap fashion for performance 
concern.
-    //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html for 
more details.
-    return Base64.getEncoder().encodeToString(sketch.compact(false, 
null).toByteArray());
+    for (ThetaSketchAccumulator accumulator : accumulators) {
+      accumulator.setOrdered(_intermediateOrdering);
+      accumulator.setThreshold(_accumulatorThreshold);
+      accumulator.setSetOperationBuilder(_setOperationBuilder);
+      mergedSketches.add(accumulator.getResult());
+    }
+
+    Sketch sketch = evaluatePostAggregationExpression(mergedSketches);
+    return Base64.getEncoder().encodeToString(sketch.toByteArray());
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index f3a9adfda6..0fea53db34 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.common.ResizeFactor;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.AnotB;
 import org.apache.datasketches.theta.Intersection;
@@ -48,6 +49,7 @@ import 
org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -68,29 +70,36 @@ import org.apache.pinot.sql.parsers.CalciteSqlParser;
  *     'dimName=''course'' AND dimValue=''math''', 'SET_INTERSECT($1,$2)')
  *   </li>
  * </ul>
- * Currently there is only 1 parameter for the function:
+ * Currently, there are 5 parameters to the function:
  * <ul>
  *   <li>
  *     nominalEntries: The nominal entries used to create the sketch. (Default 
4096)
+ *     resizeFactor: Controls the size multiple that affects how fast the 
internal cache grows (Default 2^3=8)
+ *     samplingProbability: Sets the upfront uniform sampling probability, p. 
(Default 1.0)
+ *     intermediateOrdering: Whether compacted sketches should be ordered. 
(Default false)
+ *     accumulatorThreshold: How many sketches should be kept in memory before 
merging. (Default 2)
  *   </li>
  * </ul>
  * <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, 'nominalEntries=8192')
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class DistinctCountThetaSketchAggregationFunction
-    extends BaseSingleInputAggregationFunction<List<Sketch>, Comparable> {
+    extends BaseSingleInputAggregationFunction<List<ThetaSketchAccumulator>, 
Comparable> {
   private static final String SET_UNION = "setunion";
   private static final String SET_INTERSECT = "setintersect";
   private static final String SET_DIFF = "setdiff";
   private static final String DEFAULT_SKETCH_IDENTIFIER = "$0";
-  private static final Sketch EMPTY_SKETCH = new 
UpdateSketchBuilder().build().compact();
+  private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2;
+  private static final boolean DEFAULT_INTERMEDIATE_ORDERING = false;
 
   private final List<ExpressionContext> _inputExpressions;
   private final boolean _includeDefaultSketch;
   private final List<FilterEvaluator> _filterEvaluators;
   private final ExpressionContext _postAggregationExpression;
   private final UpdateSketchBuilder _updateSketchBuilder = new 
UpdateSketchBuilder();
-  private final SetOperationBuilder _setOperationBuilder = new 
SetOperationBuilder();
+  protected final SetOperationBuilder _setOperationBuilder = new 
SetOperationBuilder();
+  protected boolean _intermediateOrdering = DEFAULT_INTERMEDIATE_ORDERING;
+  protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
 
   public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> 
arguments) {
     super(arguments.get(0));
@@ -102,9 +111,22 @@ public class DistinctCountThetaSketchAggregationFunction
       Preconditions.checkArgument(paramsExpression.getType() == 
ExpressionContext.Type.LITERAL,
           "Second argument of DISTINCT_COUNT_THETA_SKETCH aggregation function 
must be literal (parameters)");
       Parameters parameters = new 
Parameters(paramsExpression.getLiteral().getStringValue());
+      // Allows the user to trade-off memory usage for merge CPU; higher 
values use more memory
+      _accumulatorThreshold = parameters.getAccumulatorThreshold();
+      // Ordering controls whether intermediate compact sketches are ordered 
in set operations
+      _intermediateOrdering = parameters.getIntermediateOrdering();
+      // Nominal entries controls sketch accuracy and size
       int nominalEntries = parameters.getNominalEntries();
       _updateSketchBuilder.setNominalEntries(nominalEntries);
       _setOperationBuilder.setNominalEntries(nominalEntries);
+      // Sampling probability sets the initial value of Theta, defaults to 1.0
+      float p = parameters.getSamplingProbability();
+      _setOperationBuilder.setP(p);
+      _updateSketchBuilder.setP(p);
+      // Resize factor controls the size multiple that affects how fast the 
internal cache grows
+      ResizeFactor rf = parameters.getResizeFactor();
+      _setOperationBuilder.setResizeFactor(rf);
+      _updateSketchBuilder.setResizeFactor(rf);
     }
 
     if (numArguments < 4) {
@@ -401,20 +423,20 @@ public class DistinctCountThetaSketchAggregationFunction
       }
     } else {
       // Serialized sketch
-      List<Union> unions = getUnions(aggregationResultHolder);
+      List<ThetaSketchAccumulator> thetaSketchAccumulators = 
getUnions(aggregationResultHolder);
       Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], 
length);
       if (_includeDefaultSketch) {
-        Union defaultUnion = unions.get(0);
+        ThetaSketchAccumulator defaultThetaAccumulator = 
thetaSketchAccumulators.get(0);
         for (Sketch sketch : sketches) {
-          defaultUnion.union(sketch);
+          defaultThetaAccumulator.apply(sketch);
         }
       }
       for (int i = 0; i < numFilters; i++) {
         FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
-        Union union = unions.get(i + 1);
+        ThetaSketchAccumulator thetaSketchAccumulator = 
thetaSketchAccumulators.get(i + 1);
         for (int j = 0; j < length; j++) {
           if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, 
j)) {
-            union.union(sketches[j]);
+            thetaSketchAccumulator.apply(sketches[j]);
           }
         }
       }
@@ -631,14 +653,14 @@ public class DistinctCountThetaSketchAggregationFunction
       // Serialized sketch
       Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], 
length);
       for (int i = 0; i < length; i++) {
-        List<Union> unions = getUnions(groupByResultHolder, groupKeyArray[i]);
+        List<ThetaSketchAccumulator> thetaSketchAccumulators = 
getUnions(groupByResultHolder, groupKeyArray[i]);
         Sketch sketch = sketches[i];
         if (_includeDefaultSketch) {
-          unions.get(0).union(sketch);
+          thetaSketchAccumulators.get(0).apply(sketch);
         }
         for (int j = 0; j < numFilters; j++) {
           if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, 
valueArrays, i)) {
-            unions.get(j + 1).union(sketch);
+            thetaSketchAccumulators.get(j + 1).apply(sketch);
           }
         }
       }
@@ -907,7 +929,7 @@ public class DistinctCountThetaSketchAggregationFunction
       if (_includeDefaultSketch) {
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            getUnions(groupByResultHolder, groupKey).get(0).union(sketches[i]);
+            getUnions(groupByResultHolder, groupKey).get(0).apply(sketches[i]);
           }
         }
       }
@@ -916,7 +938,7 @@ public class DistinctCountThetaSketchAggregationFunction
         for (int j = 0; j < length; j++) {
           if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, 
j)) {
             for (int groupKey : groupKeysArray[i]) {
-              getUnions(groupByResultHolder, groupKey).get(i + 
1).union(sketches[i]);
+              getUnions(groupByResultHolder, groupKey).get(i + 
1).apply(sketches[i]);
             }
           }
         }
@@ -925,57 +947,70 @@ public class DistinctCountThetaSketchAggregationFunction
   }
 
   @Override
-  public List<Sketch> extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+  public List<ThetaSketchAccumulator> 
extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
     List result = aggregationResultHolder.getResult();
     if (result == null) {
       int numSketches = _filterEvaluators.size() + 1;
-      List<Sketch> sketches = new ArrayList<>(numSketches);
+      List<ThetaSketchAccumulator> sketches = new ArrayList<>(numSketches);
       for (int i = 0; i < numSketches; i++) {
-        sketches.add(EMPTY_SKETCH);
+        sketches.add(new ThetaSketchAccumulator(_setOperationBuilder, 
_intermediateOrdering, _accumulatorThreshold));
       }
       return sketches;
     }
 
     if (result.get(0) instanceof Sketch) {
-      return result;
+      int numSketches = result.size();
+      ArrayList<ThetaSketchAccumulator> thetaSketchAccumulators = new 
ArrayList<>(numSketches);
+      for (Object o : result) {
+        ThetaSketchAccumulator thetaSketchAccumulator =
+            new ThetaSketchAccumulator(_setOperationBuilder, 
_intermediateOrdering, _accumulatorThreshold);
+        thetaSketchAccumulator.apply((Sketch) o);
+        thetaSketchAccumulators.add(thetaSketchAccumulator);
+      }
+      return thetaSketchAccumulators;
     } else {
-      return convertToSketches(result);
+      return result;
     }
   }
 
   @Override
-  public List<Sketch> extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+  public List<ThetaSketchAccumulator> extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
     List result = groupByResultHolder.getResult(groupKey);
+
     if (result.get(0) instanceof Sketch) {
-      return result;
-    } else {
-      return convertToSketches(result);
+      int numSketches = result.size();
+      ArrayList<ThetaSketchAccumulator> thetaSketchAccumulators = new 
ArrayList<>(numSketches);
+      for (Object o : result) {
+        ThetaSketchAccumulator thetaSketchAccumulator =
+            new ThetaSketchAccumulator(_setOperationBuilder, 
_intermediateOrdering, _accumulatorThreshold);
+        thetaSketchAccumulator.apply((Sketch) o);
+        thetaSketchAccumulators.add(thetaSketchAccumulator);
+      }
+      return thetaSketchAccumulators;
     }
+
+    return result;
   }
 
   @Override
-  public List<Sketch> merge(List<Sketch> sketches1, List<Sketch> sketches2) {
-    int numSketches = sketches1.size();
-    List<Sketch> mergedSketches = new ArrayList<>(numSketches);
-    for (int i = 0; i < numSketches; i++) {
-      Sketch sketch1 = sketches1.get(i);
-      Sketch sketch2 = sketches2.get(i);
-      if (sketch1.isEmpty()) {
-        mergedSketches.add(sketch2);
+  public List<ThetaSketchAccumulator> merge(List<ThetaSketchAccumulator> acc1, 
List<ThetaSketchAccumulator> acc2) {
+    int numAccumulators = acc1.size();
+    List<ThetaSketchAccumulator> mergedAccumulators = new 
ArrayList<>(numAccumulators);
+    for (int i = 0; i < numAccumulators; i++) {
+      ThetaSketchAccumulator thetaSketchAccumulator1 = acc1.get(i);
+      ThetaSketchAccumulator thetaSketchAccumulator2 = acc2.get(i);
+      if (thetaSketchAccumulator1.isEmpty()) {
+        mergedAccumulators.add(thetaSketchAccumulator2);
         continue;
       }
-      if (sketch2.isEmpty()) {
-        mergedSketches.add(sketch1);
+      if (thetaSketchAccumulator2.isEmpty()) {
+        mergedAccumulators.add(thetaSketchAccumulator1);
         continue;
       }
-      Union union = _setOperationBuilder.buildUnion();
-      union.union(sketch1);
-      union.union(sketch2);
-      // NOTE: Compact the sketch in unsorted, on-heap fashion for performance 
concern.
-      //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html 
for more details.
-      mergedSketches.add(union.getResult(false, null));
+      thetaSketchAccumulator1.merge(thetaSketchAccumulator2);
+      mergedAccumulators.add(thetaSketchAccumulator1);
     }
-    return mergedSketches;
+    return mergedAccumulators;
   }
 
   @Override
@@ -989,8 +1024,18 @@ public class DistinctCountThetaSketchAggregationFunction
   }
 
   @Override
-  public Comparable extractFinalResult(List<Sketch> sketches) {
-    return 
Math.round(evaluatePostAggregationExpression(_postAggregationExpression, 
sketches).getEstimate());
+  public Comparable extractFinalResult(List<ThetaSketchAccumulator> 
accumulators) {
+    int numAccumulators = accumulators.size();
+    List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
+
+    for (ThetaSketchAccumulator accumulator : accumulators) {
+      accumulator.setOrdered(_intermediateOrdering);
+      accumulator.setThreshold(_accumulatorThreshold);
+      accumulator.setSetOperationBuilder(_setOperationBuilder);
+      mergedSketches.add(accumulator.getResult());
+    }
+
+    return 
Math.round(evaluatePostAggregationExpression(_postAggregationExpression, 
mergedSketches).getEstimate());
   }
 
   /**
@@ -1172,8 +1217,8 @@ public class DistinctCountThetaSketchAggregationFunction
   /**
    * Returns the Union list from the result holder or creates a new one if it 
does not exist.
    */
-  private List<Union> getUnions(AggregationResultHolder 
aggregationResultHolder) {
-    List<Union> unions = aggregationResultHolder.getResult();
+  private List<ThetaSketchAccumulator> getUnions(AggregationResultHolder 
aggregationResultHolder) {
+    List<ThetaSketchAccumulator> unions = aggregationResultHolder.getResult();
     if (unions == null) {
       unions = buildUnions();
       aggregationResultHolder.setValue(unions);
@@ -1196,8 +1241,8 @@ public class DistinctCountThetaSketchAggregationFunction
   /**
    * Returns the Union list for the given group key or creates a new one if it 
does not exist.
    */
-  private List<Union> getUnions(GroupByResultHolder groupByResultHolder, int 
groupKey) {
-    List<Union> unions = groupByResultHolder.getResult(groupKey);
+  private List<ThetaSketchAccumulator> getUnions(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    List<ThetaSketchAccumulator> unions = 
groupByResultHolder.getResult(groupKey);
     if (unions == null) {
       unions = buildUnions();
       groupByResultHolder.setValueForKey(groupKey, unions);
@@ -1220,11 +1265,13 @@ public class DistinctCountThetaSketchAggregationFunction
   /**
    * Builds the Union list.
    */
-  private List<Union> buildUnions() {
+  private List<ThetaSketchAccumulator> buildUnions() {
     int numUnions = _filterEvaluators.size() + 1;
-    List<Union> unions = new ArrayList<>(numUnions);
+    List<ThetaSketchAccumulator> unions = new ArrayList<>(numUnions);
     for (int i = 0; i < numUnions; i++) {
-      unions.add(_setOperationBuilder.buildUnion());
+      ThetaSketchAccumulator thetaSketchAccumulator =
+          new ThetaSketchAccumulator(_setOperationBuilder, 
_intermediateOrdering, _accumulatorThreshold);
+      unions.add(thetaSketchAccumulator);
     }
     return unions;
   }
@@ -1240,20 +1287,6 @@ public class DistinctCountThetaSketchAggregationFunction
     return sketches;
   }
 
-  /**
-   * Converts the given Unions to Sketches.
-   */
-  private List<Sketch> convertToSketches(List<Union> unions) {
-    int numUnions = unions.size();
-    List<Sketch> sketches = new ArrayList<>(numUnions);
-    for (Union union : unions) {
-      // NOTE: Compact the sketch in unsorted, on-heap fashion for performance 
concern.
-      //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html 
for more details.
-      sketches.add(union.getResult(false, null));
-    }
-    return sketches;
-  }
-
   /**
    * Evaluates the post-aggregation expression.
    */
@@ -1269,8 +1302,6 @@ public class DistinctCountThetaSketchAggregationFunction
       return sketches.get(extractSketchId(expression.getIdentifier()));
     }
 
-    // NOTE: Compact the sketch in unsorted, on-heap fashion for performance 
concern.
-    //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html for 
more details.
     FunctionContext function = expression.getFunction();
     String functionName = function.getFunctionName();
     List<ExpressionContext> arguments = function.getArguments();
@@ -1280,32 +1311,41 @@ public class DistinctCountThetaSketchAggregationFunction
         for (ExpressionContext argument : arguments) {
           union.union(evaluatePostAggregationExpression(argument, sketches));
         }
-        return union.getResult(false, null);
+        return union.getResult(_intermediateOrdering, null);
       case SET_INTERSECT:
         Intersection intersection = _setOperationBuilder.buildIntersection();
         for (ExpressionContext argument : arguments) {
           intersection.intersect(evaluatePostAggregationExpression(argument, 
sketches));
         }
-        return intersection.getResult(false, null);
+        return intersection.getResult(_intermediateOrdering, null);
       case SET_DIFF:
         AnotB diff = _setOperationBuilder.buildANotB();
         diff.setA(evaluatePostAggregationExpression(arguments.get(0), 
sketches));
         diff.notB(evaluatePostAggregationExpression(arguments.get(1), 
sketches));
-        return diff.getResult(false, null, false);
+        return diff.getResult(_intermediateOrdering, null, false);
       default:
         throw new IllegalStateException();
     }
   }
 
   /**
-   * Helper class to wrap the theta-sketch parameters.
+   * Helper class to wrap the theta-sketch parameters.  The initial values for 
the parameters are set to the
+   * same defaults in the Apache Datasketches library.
    */
   private static class Parameters {
     private static final char PARAMETER_DELIMITER = ';';
     private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
     private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
+    private static final String RESIZE_FACTOR_KEY = "resizeFactor";
+    private static final String SAMPLING_PROBABILITY_KEY = 
"samplingProbability";
+    private static final String INTERMEDIATE_ORDERING_KEY = 
"intermediateOrdering";
+    private static final String ACCUMULATOR_THRESHOLD_KEY = 
"accumulatorThreshold";
 
+    private int _resizeFactor = ResizeFactor.X8.getValue();
     private int _nominalEntries = ThetaUtil.DEFAULT_NOMINAL_ENTRIES;
+    private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+    private boolean _intermediateOrdering = DEFAULT_INTERMEDIATE_ORDERING;
+    private float _samplingProbability = 1.0F;
 
     Parameters(String parametersString) {
       StringUtils.deleteWhitespace(parametersString);
@@ -1317,6 +1357,14 @@ public class DistinctCountThetaSketchAggregationFunction
         String value = keyAndValue[1];
         if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) {
           _nominalEntries = Integer.parseInt(value);
+        } else if (key.equalsIgnoreCase(SAMPLING_PROBABILITY_KEY)) {
+          _samplingProbability = Float.parseFloat(value);
+        } else if (key.equalsIgnoreCase(RESIZE_FACTOR_KEY)) {
+          _resizeFactor = Integer.parseInt(value);
+        } else if (key.equalsIgnoreCase(INTERMEDIATE_ORDERING_KEY)) {
+          _intermediateOrdering = Boolean.parseBoolean(value);
+        } else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) {
+          _accumulatorThreshold = Integer.parseInt(value);
         } else {
           throw new IllegalArgumentException("Invalid parameter key: " + key);
         }
@@ -1326,6 +1374,22 @@ public class DistinctCountThetaSketchAggregationFunction
     int getNominalEntries() {
       return _nominalEntries;
     }
+
+    float getSamplingProbability() {
+      return _samplingProbability;
+    }
+
+    boolean getIntermediateOrdering() {
+      return _intermediateOrdering;
+    }
+
+    int getAccumulatorThreshold() {
+      return _accumulatorThreshold;
+    }
+
+    ResizeFactor getResizeFactor() {
+      return ResizeFactor.getRF(_resizeFactor);
+    }
   }
 
   /**
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 8fb198474c..b9f6f985b5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -39,6 +39,10 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.UpdateSketch;
 import 
org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
 import org.apache.pinot.segment.local.customobject.AvgPair;
@@ -49,6 +53,7 @@ import 
org.apache.pinot.segment.local.customobject.LongLongPair;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
 import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
 import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
 import org.testng.annotations.Test;
@@ -471,4 +476,52 @@ public class ObjectSerDeUtilsTest {
       assertEquals(actual.getState(), ull.getState(), ERROR_MESSAGE);
     }
   }
+
+  @Test
+  public void testThetaSketch() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      UpdateSketch input = Sketches.updateSketchBuilder().build();
+      int size = RANDOM.nextInt(100) + 10;
+      boolean shouldOrder = RANDOM.nextBoolean();
+
+      for (int j = 0; j < size; j++) {
+        input.update(j);
+      }
+
+      Sketch sketch = input.compact(shouldOrder, null);
+
+      byte[] bytes = ObjectSerDeUtils.serialize(sketch);
+      Sketch actual = ObjectSerDeUtils.deserialize(bytes, 
ObjectSerDeUtils.ObjectType.DataSketch);
+
+      assertEquals(actual.getEstimate(), sketch.getEstimate(), ERROR_MESSAGE);
+      assertEquals(actual.toByteArray(), sketch.toByteArray(), ERROR_MESSAGE);
+      assertEquals(actual.isOrdered(), shouldOrder, ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testThetaSketchAccumulator() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      UpdateSketch input = Sketches.updateSketchBuilder().build();
+      int size = RANDOM.nextInt(100) + 10;
+      boolean shouldOrder = RANDOM.nextBoolean();
+
+      for (int j = 0; j < size; j++) {
+        input.update(j);
+      }
+
+      SetOperationBuilder setOperationBuilder = new SetOperationBuilder();
+      ThetaSketchAccumulator accumulator = new 
ThetaSketchAccumulator(setOperationBuilder, shouldOrder, 2);
+      Sketch sketch = input.compact(shouldOrder, null);
+      accumulator.apply(sketch);
+
+      byte[] bytes = ObjectSerDeUtils.serialize(accumulator);
+      ThetaSketchAccumulator actual =
+          ObjectSerDeUtils.deserialize(bytes, 
ObjectSerDeUtils.ObjectType.ThetaSketchAccumulator);
+
+      assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(), 
ERROR_MESSAGE);
+      assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(), 
ERROR_MESSAGE);
+      assertEquals(actual.getResult().isOrdered(), shouldOrder, ERROR_MESSAGE);
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
index 11e31aaf0f..11662741db 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
@@ -20,6 +20,10 @@ package org.apache.pinot.core.function.scalar;
 
 import com.dynatrace.hash4j.distinctcount.UltraLogLog;
 import java.math.BigDecimal;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
 import org.testng.Assert;
@@ -50,6 +54,15 @@ public class SketchFunctionsTest {
     Assert.assertThrows(IllegalArgumentException.class, () -> 
SketchFunctions.toThetaSketch(new Object(), 1024));
   }
 
+  @Test
+  public void thetaThetaSketchSummary() {
+    for (Object i : _inputs) {
+      Sketch sketch = 
Sketches.wrapSketch(Memory.wrap(SketchFunctions.toThetaSketch(i)));
+      Assert.assertEquals(SketchFunctions.thetaSketchToString(sketch), 
sketch.toString());
+    }
+    Assert.assertThrows(RuntimeException.class, () -> 
SketchFunctions.thetaSketchToString(new Object()));
+  }
+
   private long hllEstimate(byte[] bytes) {
     return 
ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytes).cardinality();
   }
@@ -97,6 +110,15 @@ public class SketchFunctionsTest {
     Assert.assertThrows(IllegalArgumentException.class, () -> 
SketchFunctions.toCpcSketch(new Object(), 11));
   }
 
+  @Test
+  public void thetaCpcSketchToString() {
+    for (Object i : _inputs) {
+      CpcSketch sketch = 
CpcSketch.heapify(Memory.wrap(SketchFunctions.toCpcSketch(i)));
+      Assert.assertEquals(SketchFunctions.cpcSketchToString(sketch), 
sketch.toString());
+    }
+    Assert.assertThrows(RuntimeException.class, () -> 
SketchFunctions.cpcSketchToString(new Object()));
+  }
+
   private long ullEstimate(byte[] bytes) {
     // round it to a long to make it easier to assert on
     return 
Math.round(ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize(bytes).getDistinctCountEstimate());
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
index 4d04539b4a..98e9b90900 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.core.operator.query.AggregationOperator;
 import org.apache.pinot.core.operator.query.GroupByOperator;
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -172,13 +173,13 @@ public class DistinctCountThetaSketchQueriesTest extends 
BaseQueriesTest {
     assertNotNull(aggregationResult);
     assertEquals(aggregationResult.size(), 11);
     for (int i = 0; i < 11; i++) {
-      List<Sketch> sketches = (List<Sketch>) aggregationResult.get(i);
-      assertEquals(sketches.size(), 1);
-      Sketch sketch = sketches.get(0);
+      List<ThetaSketchAccumulator> accumulators = 
(List<ThetaSketchAccumulator>) aggregationResult.get(i);
+      assertEquals(accumulators.size(), 1);
+      ThetaSketchAccumulator accumulator = accumulators.get(0);
       if (i < 5) {
-        assertEquals(Math.round(sketch.getEstimate()), NUM_RECORDS);
+        assertEquals(Math.round(accumulator.getResult().getEstimate()), 
NUM_RECORDS);
       } else {
-        assertEquals(Math.round(sketch.getEstimate()), 3 * NUM_RECORDS);
+        assertEquals(Math.round(accumulator.getResult().getEstimate()), 3 * 
NUM_RECORDS);
       }
     }
 
@@ -220,9 +221,10 @@ public class DistinctCountThetaSketchQueriesTest extends 
BaseQueriesTest {
         numGroups++;
         GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
         for (int i = 0; i < 6; i++) {
-          List<Sketch> sketches = (List<Sketch>) 
aggregationGroupByResult.getResultForGroupId(i, groupKey._groupId);
-          assertEquals(sketches.size(), 1);
-          Sketch sketch = sketches.get(0);
+          List<ThetaSketchAccumulator> accumulators =
+              (List<ThetaSketchAccumulator>) 
aggregationGroupByResult.getResultForGroupId(i, groupKey._groupId);
+          assertEquals(accumulators.size(), 1);
+          Sketch sketch = accumulators.get(0).getResult();
           if (i < 5) {
             assertEquals(Math.round(sketch.getEstimate()), 1);
           } else {
@@ -279,13 +281,13 @@ public class DistinctCountThetaSketchQueriesTest extends 
BaseQueriesTest {
     List<Object> aggregationResult = resultsBlock.getResults();
     assertNotNull(aggregationResult);
     assertEquals(aggregationResult.size(), 1);
-    List<Sketch> sketches = (List<Sketch>) aggregationResult.get(0);
-    assertEquals(sketches.size(), 5);
-    assertTrue(sketches.get(0).isEmpty());
-    assertEquals(Math.round(sketches.get(1).getEstimate()), 300);
-    assertEquals(Math.round(sketches.get(2).getEstimate()), 450);
-    assertEquals(Math.round(sketches.get(3).getEstimate()), 175);
-    assertEquals(Math.round(sketches.get(4).getEstimate()), 100);
+    List<ThetaSketchAccumulator> accumulators = (List<ThetaSketchAccumulator>) 
aggregationResult.get(0);
+    assertEquals(accumulators.size(), 5);
+    assertTrue(accumulators.get(0).getResult().isEmpty());
+    assertEquals(Math.round(accumulators.get(1).getResult().getEstimate()), 
300);
+    assertEquals(Math.round(accumulators.get(2).getResult().getEstimate()), 
450);
+    assertEquals(Math.round(accumulators.get(3).getResult().getEstimate()), 
175);
+    assertEquals(Math.round(accumulators.get(4).getResult().getEstimate()), 
100);
 
     // Inter segments
     Object[] expectedResults = new Object[]{225L};
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
new file mode 100644
index 0000000000..b1cb94812c
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import javax.annotation.Nonnull;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+
+
+/**
+ * Intermediate state used by {@code 
DistinctCountThetaSketchAggregationFunction} which gives
+ * the end user more control over how sketches are merged for performance.
+ * The end user can set parameters that trade-off more memory usage for more 
pre-aggregation.
+ * This permits use of the Union "early-stop" optimisation where ordered 
sketches require no further
+ * processing beyond the minimum Theta value.
+ * The union operation initialises an empty "gadget" bookkeeping sketch that 
is updated with hashed entries
+ * that fall below the minimum Theta value for all input sketches ("Broder 
Rule").  When the initial
+ * Theta value is set to the minimum immediately, further gains can be 
realised.
+ */
+public class ThetaSketchAccumulator {
+  private ArrayList<Sketch> _accumulator;
+  private boolean _ordered = false;
+  private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+  private Union _union;
+  private int _threshold;
+  private int _numInputs = 0;
+
+  public ThetaSketchAccumulator() {
+  }
+
+  // Note: The accumulator is serialized as a sketch.  This means that the 
majority of the processing
+  // happens on serialization. Therefore, when deserialized, the values may be 
null and will
+  // require re-initialisation. Since the primary use case is at query time 
for the Broker
+  // and Server, these properties are already in memory and are re-set.
+  public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, 
boolean ordered, int threshold) {
+    _setOperationBuilder = setOperationBuilder;
+    _ordered = ordered;
+    _threshold = threshold;
+  }
+
+  public void setOrdered(boolean ordered) {
+    _ordered = ordered;
+  }
+
+  public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
+    _setOperationBuilder = setOperationBuilder;
+  }
+
+  public void setThreshold(int threshold) {
+    _threshold = threshold;
+  }
+
+  public boolean isEmpty() {
+    return _numInputs == 0;
+  }
+
+  @Nonnull
+  public Sketch getResult() {
+    return unionAll();
+  }
+
+  public void apply(Sketch sketch) {
+    internalAdd(sketch);
+  }
+
+  public void merge(ThetaSketchAccumulator thetaUnion) {
+    if (thetaUnion.isEmpty()) {
+      return;
+    }
+    Sketch sketch = thetaUnion.getResult();
+    internalAdd(sketch);
+  }
+
+  private void internalAdd(Sketch sketch) {
+    if (sketch.isEmpty()) {
+      return;
+    }
+    if (_accumulator == null) {
+      _accumulator = new ArrayList<>(_threshold);
+    }
+    _accumulator.add(sketch);
+    _numInputs += 1;
+
+    if (_accumulator.size() >= _threshold) {
+      unionAll();
+    }
+  }
+
+  private Sketch unionAll() {
+    if (_union == null) {
+      _union = _setOperationBuilder.buildUnion();
+    }
+    // Return the default update "gadget" sketch as a compact sketch
+    if (isEmpty()) {
+      return _union.getResult(_ordered, null);
+    }
+    // Corner-case: the parameters are not strictly respected when there is a 
single sketch.
+    // This single sketch might have been the result of a previously 
accumulated union and
+    // would already have the parameters set.  The sketch is returned as-is 
without adjusting
+    // ordering and nominal entries which requires an additional union 
operation.
+    if (_numInputs == 1) {
+      return _accumulator.get(0);
+    }
+
+    // Performance optimization: ensure that the minimum Theta is used for 
"early stop".
+    // The "early stop" optimization is implemented in the Apache Datasketches 
Union operation for
+    // ordered and compact Theta sketches. Internally, a compact and ordered 
Theta sketch can be
+    // compared to a sorted array of K items.  When performing a union, only 
those items from
+    // the input sketch less than Theta need to be processed.  The loop 
terminates as soon as a hash
+    // is seen that is > Theta.
+    // The following "sort" improves on this further by selecting the minimal 
Theta value up-front,
+    // which results in fewer redundant entries being retained and 
subsequently discarded during the
+    // union operation.
+    _accumulator.sort(Comparator.comparingDouble(Sketch::getTheta));
+    for (Sketch accumulatedSketch : _accumulator) {
+      _union.union(accumulatedSketch);
+    }
+    _accumulator.clear();
+
+    return _union.getResult(_ordered, null);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
index dcd689afae..2879c474a5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
@@ -243,9 +243,12 @@ public class CustomSerDeUtils {
 
     @Override
     public byte[] serialize(Sketch value) {
-      // NOTE: Compact the sketch in unsorted, on-heap fashion for performance 
concern.
-      //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html 
for more details.
-      return value.compact(false, null).toByteArray();
+      // The serializer should respect existing ordering to enable "early stop"
+      // optimisations on unions.
+      if (!value.isCompact()) {
+        return value.compact(value.isOrdered(), null).toByteArray();
+      }
+      return value.toByteArray();
     }
 
     @Override
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
index 822335cfb0..fdc820c120 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
@@ -27,7 +27,9 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
 
 
 public class DistinctCountThetaSketchValueAggregatorTest {
@@ -162,4 +164,15 @@ public class DistinctCountThetaSketchValueAggregatorTest {
     byte[][] zeroSketches = {};
     assertEquals(agg.getInitialAggregatedValue(zeroSketches).getEstimate(), 
0.0);
   }
+
+  @Test
+  public void shouldRetainSketchOrdering() {
+    UpdateSketch input = Sketches.updateSketchBuilder().build();
+    IntStream.range(0, 10).forEach(input::update);
+    Sketch unordered = input.compact(false, null);
+    Sketch ordered = input.compact(true, null);
+    DistinctCountThetaSketchValueAggregator agg = new 
DistinctCountThetaSketchValueAggregator();
+    assertTrue(agg.cloneAggregatedValue(ordered).isOrdered());
+    assertFalse(agg.cloneAggregatedValue(unordered).isOrdered());
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
new file mode 100644
index 0000000000..d34ddd6447
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.stream.IntStream;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class ThetaSketchAccumulatorTest {
+  private SetOperationBuilder _setOperationBuilder;
+
+  @BeforeMethod
+  public void setUp() {
+    _setOperationBuilder = new SetOperationBuilder();
+  }
+
+  @Test
+  public void testEmptyAccumulator() {
+    ThetaSketchAccumulator accumulator = new 
ThetaSketchAccumulator(_setOperationBuilder, false, 2);
+    Assert.assertTrue(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+  }
+
+  @Test
+  public void testAccumulatorWithSingleSketch() {
+    UpdateSketch input = Sketches.updateSketchBuilder().build();
+    IntStream.range(0, 1000).forEach(input::update);
+    Sketch sketch = input.compact();
+
+    ThetaSketchAccumulator accumulator = new 
ThetaSketchAccumulator(_setOperationBuilder, false, 2);
+    accumulator.apply(sketch);
+
+    Assert.assertFalse(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 
sketch.getEstimate());
+  }
+
+  @Test
+  public void testAccumulatorMerge() {
+    UpdateSketch input1 = Sketches.updateSketchBuilder().build();
+    IntStream.range(0, 1000).forEach(input1::update);
+    Sketch sketch1 = input1.compact();
+    UpdateSketch input2 = Sketches.updateSketchBuilder().build();
+    IntStream.range(1000, 2000).forEach(input2::update);
+    Sketch sketch2 = input2.compact();
+
+    ThetaSketchAccumulator accumulator1 = new 
ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    accumulator1.apply(sketch1);
+    ThetaSketchAccumulator accumulator2 = new 
ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    accumulator2.apply(sketch2);
+    accumulator1.merge(accumulator2);
+
+    Assert.assertEquals(accumulator1.getResult().getEstimate(), 
sketch1.getEstimate() + sketch2.getEstimate());
+  }
+
+  @Test
+  public void testThresholdBehavior() {
+    UpdateSketch input1 = Sketches.updateSketchBuilder().build();
+    IntStream.range(0, 1000).forEach(input1::update);
+    Sketch sketch1 = input1.compact();
+    UpdateSketch input2 = Sketches.updateSketchBuilder().build();
+    IntStream.range(1000, 2000).forEach(input2::update);
+    Sketch sketch2 = input2.compact();
+
+    ThetaSketchAccumulator accumulator = new 
ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    accumulator.apply(sketch1);
+    accumulator.apply(sketch2);
+
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 
sketch1.getEstimate() + sketch2.getEstimate());
+  }
+
+  @Test
+  public void testUnionWithEmptyInput() {
+    ThetaSketchAccumulator accumulator = new 
ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    ThetaSketchAccumulator emptyAccumulator = new 
ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+
+    accumulator.merge(emptyAccumulator);
+
+    Assert.assertTrue(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 09fe5129e0..84bcea58e2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -98,9 +98,9 @@ public class CommonConstants {
     public static final int DEFAULT_HYPERLOGLOG_PLUS_P = 14;
     public static final int DEFAULT_HYPERLOGLOG_PLUS_SP = 0;
 
-    // 2 to the power of 16, for tradeoffs see datasketches library 
documentation:
+    // 2 to the power of 14, for tradeoffs see datasketches library 
documentation:
     // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
-    public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
+    public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 16384;
 
     public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to