This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a9e319964f Theta Sketch Aggregation Enhancements (#12042)
a9e319964f is described below
commit a9e319964f642ff86f2c4a6122058bb033a2475a
Author: David Cromberge <[email protected]>
AuthorDate: Tue Dec 5 23:22:30 2023 +0000
Theta Sketch Aggregation Enhancements (#12042)
* Theta Sketch Aggregation Enhancements
Introduces additional parameters to the DistinctCountThetaSketch
aggregation function that
give the end-user more control over how sketches are merged. The defaults
are selected
to ensure that the behaviour remains unchanged over the current
implementation.
Furthermore, an accumulator custom object is added to ensure that pairwise
union
operations are avoided as much as possible. Instead, sketches can be
aggregated
and merged when a threshold is met.
* Use correct naming convention for private variable
* Fetch flaky test edge-case
* Decrease default constant value
This better aligns to the default nominal values parameter
that is used in the query aggregation function.
* Attempt 2: Simplify implementation
Removes intermediate array list to buffer/accumulate sketch
elements. Instead, inputs are fed directly to the underlying
union. This ensures that the memory usage of the merge is
kept under control.
* Rename sampling probability parameter
* Minor code improvements
* Revert "Attempt 2: Simplify implementation"
This reverts commit 2ed38f395ae1e3071637f58da2386e867dbc80e3.
* Add toString methods for supported sketches to enable debugging
* Additional inline commentary on early stop optimization.
* Refactor serializer for Theta to remove temp variables
---
.../apache/pinot/core/common/ObjectSerDeUtils.java | 43 ++++-
.../core/function/scalar/SketchFunctions.java | 10 +
...inctCountRawThetaSketchAggregationFunction.java | 19 +-
...istinctCountThetaSketchAggregationFunction.java | 202 ++++++++++++++-------
.../pinot/core/common/ObjectSerDeUtilsTest.java | 53 ++++++
.../core/function/scalar/SketchFunctionsTest.java | 22 +++
.../DistinctCountThetaSketchQueriesTest.java | 32 ++--
.../local/customobject/ThetaSketchAccumulator.java | 141 ++++++++++++++
.../segment/local/utils/CustomSerDeUtils.java | 9 +-
...istinctCountThetaSketchValueAggregatorTest.java | 13 ++
.../customobject/ThetaSketchAccumulatorTest.java | 104 +++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 4 +-
12 files changed, 554 insertions(+), 98 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 903ef712db..14aa2d2f10 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -88,6 +88,7 @@ import
org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import org.apache.pinot.segment.local.customobject.VarianceTuple;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -154,7 +155,8 @@ public class ObjectSerDeUtils {
LongArrayList(43),
FloatArrayList(44),
StringArrayList(45),
- UltraLogLog(46);
+ UltraLogLog(46),
+ ThetaSketchAccumulator(47);
private final int _value;
@@ -273,6 +275,8 @@ public class ObjectSerDeUtils {
return ObjectType.CompressedProbabilisticCounting;
} else if (value instanceof UltraLogLog) {
return ObjectType.UltraLogLog;
+ } else if (value instanceof ThetaSketchAccumulator) {
+ return ObjectType.ThetaSketchAccumulator;
} else {
throw new IllegalArgumentException("Unsupported type of value: " +
value.getClass().getSimpleName());
}
@@ -1125,9 +1129,12 @@ public class ObjectSerDeUtils {
@Override
public byte[] serialize(Sketch value) {
- // NOTE: Compact the sketch in unsorted, on-heap fashion for performance
concern.
- // See https://datasketches.apache.org/docs/Theta/ThetaSize.html
for more details.
- return value.compact(false, null).toByteArray();
+ // The serializer should respect existing ordering to enable "early stop"
+ // optimisations on unions.
+ if (!value.isCompact()) {
+ return value.compact(value.isOrdered(), null).toByteArray();
+ }
+ return value.toByteArray();
}
@Override
@@ -1580,6 +1587,33 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<ThetaSketchAccumulator>
DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE =
+ new ObjectSerDe<ThetaSketchAccumulator>() {
+
+ @Override
+ public byte[] serialize(ThetaSketchAccumulator thetaSketchBuffer) {
+ Sketch sketch = thetaSketchBuffer.getResult();
+ return sketch.toByteArray();
+ }
+
+ @Override
+ public ThetaSketchAccumulator deserialize(byte[] bytes) {
+ return deserialize(ByteBuffer.wrap(bytes));
+ }
+
+ // Note: The accumulator is designed to serialize as a sketch and
should
+ // not be deserialized in practice.
+ @Override
+ public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) {
+ ThetaSketchAccumulator thetaSketchAccumulator = new
ThetaSketchAccumulator();
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ Sketch sketch = Sketch.wrap(Memory.wrap(bytes));
+ thetaSketchAccumulator.apply(sketch);
+ return thetaSketchAccumulator;
+ }
+ };
+
// NOTE: DO NOT change the order, it has to be the same order as the
ObjectType
//@formatter:off
private static final ObjectSerDe[] SER_DES = {
@@ -1630,6 +1664,7 @@ public class ObjectSerDeUtils {
FLOAT_ARRAY_LIST_SER_DE,
STRING_ARRAY_LIST_SER_DE,
ULTRA_LOG_LOG_OBJECT_SER_DE,
+ DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE,
};
//@formatter:on
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index 74e35e8bb7..90e313edb2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -261,6 +261,11 @@ public class SketchFunctions {
return diff.getResult(false, null, false);
}
+ @ScalarFunction(names = {"thetaSketchToString", "theta_sketch_to_string"})
+ public static String thetaSketchToString(Object sketchObject) {
+ return asThetaSketch(sketchObject).toString();
+ }
+
private static Sketch thetaSketchUnionVar(Object... sketchObjects) {
Union union = SET_OPERATION_BUILDER.buildUnion();
for (Object sketchObj : sketchObjects) {
@@ -417,6 +422,11 @@ public class SketchFunctions {
return cpcSketchUnionVar(o1, o2, o3, o4, o5);
}
+ @ScalarFunction(names = {"cpcSketchToString", "cpc_sketch_to_string"})
+ public static String cpcSketchToString(Object sketchObject) {
+ return asCpcSketch(sketchObject).toString();
+ }
+
/**
* Create a CPC Sketch containing the input, with a configured nominal
entries
*
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
index cd75fa3807..00d6ec2906 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
@@ -18,11 +18,13 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import org.apache.datasketches.theta.Sketch;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -47,11 +49,18 @@ public class DistinctCountRawThetaSketchAggregationFunction
extends DistinctCoun
}
@Override
- public String extractFinalResult(List<Sketch> sketches) {
- Sketch sketch = evaluatePostAggregationExpression(sketches);
+ public String extractFinalResult(List<ThetaSketchAccumulator> accumulators) {
+ int numAccumulators = accumulators.size();
+ List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
- // NOTE: Compact the sketch in unsorted, on-heap fashion for performance
concern.
- // See https://datasketches.apache.org/docs/Theta/ThetaSize.html for
more details.
- return Base64.getEncoder().encodeToString(sketch.compact(false,
null).toByteArray());
+ for (ThetaSketchAccumulator accumulator : accumulators) {
+ accumulator.setOrdered(_intermediateOrdering);
+ accumulator.setThreshold(_accumulatorThreshold);
+ accumulator.setSetOperationBuilder(_setOperationBuilder);
+ mergedSketches.add(accumulator.getResult());
+ }
+
+ Sketch sketch = evaluatePostAggregationExpression(mergedSketches);
+ return Base64.getEncoder().encodeToString(sketch.toByteArray());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index f3a9adfda6..0fea53db34 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.common.ResizeFactor;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.AnotB;
import org.apache.datasketches.theta.Intersection;
@@ -48,6 +49,7 @@ import
org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -68,29 +70,36 @@ import org.apache.pinot.sql.parsers.CalciteSqlParser;
* 'dimName=''course'' AND dimValue=''math''', 'SET_INTERSECT($1,$2)')
* </li>
* </ul>
- * Currently there is only 1 parameter for the function:
+ * Currently, there are 5 parameters to the function:
* <ul>
* <li>
* nominalEntries: The nominal entries used to create the sketch. (Default
4096)
+ * resizeFactor: Controls the size multiple that affects how fast the
internal cache grows (Default 2^3=8)
+ * samplingProbability: Sets the upfront uniform sampling probability, p.
(Default 1.0)
+ * intermediateOrdering: Whether compacted sketches should be ordered.
(Default false)
+ * accumulatorThreshold: How many sketches should be kept in memory before
merging. (Default 2)
* </li>
* </ul>
* <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, 'nominalEntries=8192')
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class DistinctCountThetaSketchAggregationFunction
- extends BaseSingleInputAggregationFunction<List<Sketch>, Comparable> {
+ extends BaseSingleInputAggregationFunction<List<ThetaSketchAccumulator>,
Comparable> {
private static final String SET_UNION = "setunion";
private static final String SET_INTERSECT = "setintersect";
private static final String SET_DIFF = "setdiff";
private static final String DEFAULT_SKETCH_IDENTIFIER = "$0";
- private static final Sketch EMPTY_SKETCH = new
UpdateSketchBuilder().build().compact();
+ private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2;
+ private static final boolean DEFAULT_INTERMEDIATE_ORDERING = false;
private final List<ExpressionContext> _inputExpressions;
private final boolean _includeDefaultSketch;
private final List<FilterEvaluator> _filterEvaluators;
private final ExpressionContext _postAggregationExpression;
private final UpdateSketchBuilder _updateSketchBuilder = new
UpdateSketchBuilder();
- private final SetOperationBuilder _setOperationBuilder = new
SetOperationBuilder();
+ protected final SetOperationBuilder _setOperationBuilder = new
SetOperationBuilder();
+ protected boolean _intermediateOrdering = DEFAULT_INTERMEDIATE_ORDERING;
+ protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext>
arguments) {
super(arguments.get(0));
@@ -102,9 +111,22 @@ public class DistinctCountThetaSketchAggregationFunction
Preconditions.checkArgument(paramsExpression.getType() ==
ExpressionContext.Type.LITERAL,
"Second argument of DISTINCT_COUNT_THETA_SKETCH aggregation function
must be literal (parameters)");
Parameters parameters = new
Parameters(paramsExpression.getLiteral().getStringValue());
+ // Allows the user to trade-off memory usage for merge CPU; higher
values use more memory
+ _accumulatorThreshold = parameters.getAccumulatorThreshold();
+ // Ordering controls whether intermediate compact sketches are ordered
in set operations
+ _intermediateOrdering = parameters.getIntermediateOrdering();
+ // Nominal entries controls sketch accuracy and size
int nominalEntries = parameters.getNominalEntries();
_updateSketchBuilder.setNominalEntries(nominalEntries);
_setOperationBuilder.setNominalEntries(nominalEntries);
+ // Sampling probability sets the initial value of Theta, defaults to 1.0
+ float p = parameters.getSamplingProbability();
+ _setOperationBuilder.setP(p);
+ _updateSketchBuilder.setP(p);
+ // Resize factor controls the size multiple that affects how fast the
internal cache grows
+ ResizeFactor rf = parameters.getResizeFactor();
+ _setOperationBuilder.setResizeFactor(rf);
+ _updateSketchBuilder.setResizeFactor(rf);
}
if (numArguments < 4) {
@@ -401,20 +423,20 @@ public class DistinctCountThetaSketchAggregationFunction
}
} else {
// Serialized sketch
- List<Union> unions = getUnions(aggregationResultHolder);
+ List<ThetaSketchAccumulator> thetaSketchAccumulators =
getUnions(aggregationResultHolder);
Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0],
length);
if (_includeDefaultSketch) {
- Union defaultUnion = unions.get(0);
+ ThetaSketchAccumulator defaultThetaAccumulator =
thetaSketchAccumulators.get(0);
for (Sketch sketch : sketches) {
- defaultUnion.union(sketch);
+ defaultThetaAccumulator.apply(sketch);
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
- Union union = unions.get(i + 1);
+ ThetaSketchAccumulator thetaSketchAccumulator =
thetaSketchAccumulators.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays,
j)) {
- union.union(sketches[j]);
+ thetaSketchAccumulator.apply(sketches[j]);
}
}
}
@@ -631,14 +653,14 @@ public class DistinctCountThetaSketchAggregationFunction
// Serialized sketch
Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0],
length);
for (int i = 0; i < length; i++) {
- List<Union> unions = getUnions(groupByResultHolder, groupKeyArray[i]);
+ List<ThetaSketchAccumulator> thetaSketchAccumulators =
getUnions(groupByResultHolder, groupKeyArray[i]);
Sketch sketch = sketches[i];
if (_includeDefaultSketch) {
- unions.get(0).union(sketch);
+ thetaSketchAccumulators.get(0).apply(sketch);
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes,
valueArrays, i)) {
- unions.get(j + 1).union(sketch);
+ thetaSketchAccumulators.get(j + 1).apply(sketch);
}
}
}
@@ -907,7 +929,7 @@ public class DistinctCountThetaSketchAggregationFunction
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
- getUnions(groupByResultHolder, groupKey).get(0).union(sketches[i]);
+ getUnions(groupByResultHolder, groupKey).get(0).apply(sketches[i]);
}
}
}
@@ -916,7 +938,7 @@ public class DistinctCountThetaSketchAggregationFunction
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays,
j)) {
for (int groupKey : groupKeysArray[i]) {
- getUnions(groupByResultHolder, groupKey).get(i +
1).union(sketches[i]);
+ getUnions(groupByResultHolder, groupKey).get(i +
1).apply(sketches[i]);
}
}
}
@@ -925,57 +947,70 @@ public class DistinctCountThetaSketchAggregationFunction
}
@Override
- public List<Sketch> extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ public List<ThetaSketchAccumulator>
extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
List result = aggregationResultHolder.getResult();
if (result == null) {
int numSketches = _filterEvaluators.size() + 1;
- List<Sketch> sketches = new ArrayList<>(numSketches);
+ List<ThetaSketchAccumulator> sketches = new ArrayList<>(numSketches);
for (int i = 0; i < numSketches; i++) {
- sketches.add(EMPTY_SKETCH);
+ sketches.add(new ThetaSketchAccumulator(_setOperationBuilder,
_intermediateOrdering, _accumulatorThreshold));
}
return sketches;
}
if (result.get(0) instanceof Sketch) {
- return result;
+ int numSketches = result.size();
+ ArrayList<ThetaSketchAccumulator> thetaSketchAccumulators = new
ArrayList<>(numSketches);
+ for (Object o : result) {
+ ThetaSketchAccumulator thetaSketchAccumulator =
+ new ThetaSketchAccumulator(_setOperationBuilder,
_intermediateOrdering, _accumulatorThreshold);
+ thetaSketchAccumulator.apply((Sketch) o);
+ thetaSketchAccumulators.add(thetaSketchAccumulator);
+ }
+ return thetaSketchAccumulators;
} else {
- return convertToSketches(result);
+ return result;
}
}
@Override
- public List<Sketch> extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ public List<ThetaSketchAccumulator> extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
List result = groupByResultHolder.getResult(groupKey);
+
if (result.get(0) instanceof Sketch) {
- return result;
- } else {
- return convertToSketches(result);
+ int numSketches = result.size();
+ ArrayList<ThetaSketchAccumulator> thetaSketchAccumulators = new
ArrayList<>(numSketches);
+ for (Object o : result) {
+ ThetaSketchAccumulator thetaSketchAccumulator =
+ new ThetaSketchAccumulator(_setOperationBuilder,
_intermediateOrdering, _accumulatorThreshold);
+ thetaSketchAccumulator.apply((Sketch) o);
+ thetaSketchAccumulators.add(thetaSketchAccumulator);
+ }
+ return thetaSketchAccumulators;
}
+
+ return result;
}
@Override
- public List<Sketch> merge(List<Sketch> sketches1, List<Sketch> sketches2) {
- int numSketches = sketches1.size();
- List<Sketch> mergedSketches = new ArrayList<>(numSketches);
- for (int i = 0; i < numSketches; i++) {
- Sketch sketch1 = sketches1.get(i);
- Sketch sketch2 = sketches2.get(i);
- if (sketch1.isEmpty()) {
- mergedSketches.add(sketch2);
+ public List<ThetaSketchAccumulator> merge(List<ThetaSketchAccumulator> acc1,
List<ThetaSketchAccumulator> acc2) {
+ int numAccumulators = acc1.size();
+ List<ThetaSketchAccumulator> mergedAccumulators = new
ArrayList<>(numAccumulators);
+ for (int i = 0; i < numAccumulators; i++) {
+ ThetaSketchAccumulator thetaSketchAccumulator1 = acc1.get(i);
+ ThetaSketchAccumulator thetaSketchAccumulator2 = acc2.get(i);
+ if (thetaSketchAccumulator1.isEmpty()) {
+ mergedAccumulators.add(thetaSketchAccumulator2);
continue;
}
- if (sketch2.isEmpty()) {
- mergedSketches.add(sketch1);
+ if (thetaSketchAccumulator2.isEmpty()) {
+ mergedAccumulators.add(thetaSketchAccumulator1);
continue;
}
- Union union = _setOperationBuilder.buildUnion();
- union.union(sketch1);
- union.union(sketch2);
- // NOTE: Compact the sketch in unsorted, on-heap fashion for performance
concern.
- // See https://datasketches.apache.org/docs/Theta/ThetaSize.html
for more details.
- mergedSketches.add(union.getResult(false, null));
+ thetaSketchAccumulator1.merge(thetaSketchAccumulator2);
+ mergedAccumulators.add(thetaSketchAccumulator1);
}
- return mergedSketches;
+ return mergedAccumulators;
}
@Override
@@ -989,8 +1024,18 @@ public class DistinctCountThetaSketchAggregationFunction
}
@Override
- public Comparable extractFinalResult(List<Sketch> sketches) {
- return
Math.round(evaluatePostAggregationExpression(_postAggregationExpression,
sketches).getEstimate());
+ public Comparable extractFinalResult(List<ThetaSketchAccumulator>
accumulators) {
+ int numAccumulators = accumulators.size();
+ List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
+
+ for (ThetaSketchAccumulator accumulator : accumulators) {
+ accumulator.setOrdered(_intermediateOrdering);
+ accumulator.setThreshold(_accumulatorThreshold);
+ accumulator.setSetOperationBuilder(_setOperationBuilder);
+ mergedSketches.add(accumulator.getResult());
+ }
+
+ return
Math.round(evaluatePostAggregationExpression(_postAggregationExpression,
mergedSketches).getEstimate());
}
/**
@@ -1172,8 +1217,8 @@ public class DistinctCountThetaSketchAggregationFunction
/**
* Returns the Union list from the result holder or creates a new one if it
does not exist.
*/
- private List<Union> getUnions(AggregationResultHolder
aggregationResultHolder) {
- List<Union> unions = aggregationResultHolder.getResult();
+ private List<ThetaSketchAccumulator> getUnions(AggregationResultHolder
aggregationResultHolder) {
+ List<ThetaSketchAccumulator> unions = aggregationResultHolder.getResult();
if (unions == null) {
unions = buildUnions();
aggregationResultHolder.setValue(unions);
@@ -1196,8 +1241,8 @@ public class DistinctCountThetaSketchAggregationFunction
/**
* Returns the Union list for the given group key or creates a new one if it
does not exist.
*/
- private List<Union> getUnions(GroupByResultHolder groupByResultHolder, int
groupKey) {
- List<Union> unions = groupByResultHolder.getResult(groupKey);
+ private List<ThetaSketchAccumulator> getUnions(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ List<ThetaSketchAccumulator> unions =
groupByResultHolder.getResult(groupKey);
if (unions == null) {
unions = buildUnions();
groupByResultHolder.setValueForKey(groupKey, unions);
@@ -1220,11 +1265,13 @@ public class DistinctCountThetaSketchAggregationFunction
/**
* Builds the Union list.
*/
- private List<Union> buildUnions() {
+ private List<ThetaSketchAccumulator> buildUnions() {
int numUnions = _filterEvaluators.size() + 1;
- List<Union> unions = new ArrayList<>(numUnions);
+ List<ThetaSketchAccumulator> unions = new ArrayList<>(numUnions);
for (int i = 0; i < numUnions; i++) {
- unions.add(_setOperationBuilder.buildUnion());
+ ThetaSketchAccumulator thetaSketchAccumulator =
+ new ThetaSketchAccumulator(_setOperationBuilder,
_intermediateOrdering, _accumulatorThreshold);
+ unions.add(thetaSketchAccumulator);
}
return unions;
}
@@ -1240,20 +1287,6 @@ public class DistinctCountThetaSketchAggregationFunction
return sketches;
}
- /**
- * Converts the given Unions to Sketches.
- */
- private List<Sketch> convertToSketches(List<Union> unions) {
- int numUnions = unions.size();
- List<Sketch> sketches = new ArrayList<>(numUnions);
- for (Union union : unions) {
- // NOTE: Compact the sketch in unsorted, on-heap fashion for performance
concern.
- // See https://datasketches.apache.org/docs/Theta/ThetaSize.html
for more details.
- sketches.add(union.getResult(false, null));
- }
- return sketches;
- }
-
/**
* Evaluates the post-aggregation expression.
*/
@@ -1269,8 +1302,6 @@ public class DistinctCountThetaSketchAggregationFunction
return sketches.get(extractSketchId(expression.getIdentifier()));
}
- // NOTE: Compact the sketch in unsorted, on-heap fashion for performance
concern.
- // See https://datasketches.apache.org/docs/Theta/ThetaSize.html for
more details.
FunctionContext function = expression.getFunction();
String functionName = function.getFunctionName();
List<ExpressionContext> arguments = function.getArguments();
@@ -1280,32 +1311,41 @@ public class DistinctCountThetaSketchAggregationFunction
for (ExpressionContext argument : arguments) {
union.union(evaluatePostAggregationExpression(argument, sketches));
}
- return union.getResult(false, null);
+ return union.getResult(_intermediateOrdering, null);
case SET_INTERSECT:
Intersection intersection = _setOperationBuilder.buildIntersection();
for (ExpressionContext argument : arguments) {
intersection.intersect(evaluatePostAggregationExpression(argument,
sketches));
}
- return intersection.getResult(false, null);
+ return intersection.getResult(_intermediateOrdering, null);
case SET_DIFF:
AnotB diff = _setOperationBuilder.buildANotB();
diff.setA(evaluatePostAggregationExpression(arguments.get(0),
sketches));
diff.notB(evaluatePostAggregationExpression(arguments.get(1),
sketches));
- return diff.getResult(false, null, false);
+ return diff.getResult(_intermediateOrdering, null, false);
default:
throw new IllegalStateException();
}
}
/**
- * Helper class to wrap the theta-sketch parameters.
+ * Helper class to wrap the theta-sketch parameters. The initial values for
the parameters are set to the
+ * same defaults in the Apache Datasketches library.
*/
private static class Parameters {
private static final char PARAMETER_DELIMITER = ';';
private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
+ private static final String RESIZE_FACTOR_KEY = "resizeFactor";
+ private static final String SAMPLING_PROBABILITY_KEY =
"samplingProbability";
+ private static final String INTERMEDIATE_ORDERING_KEY =
"intermediateOrdering";
+ private static final String ACCUMULATOR_THRESHOLD_KEY =
"accumulatorThreshold";
+ private int _resizeFactor = ResizeFactor.X8.getValue();
private int _nominalEntries = ThetaUtil.DEFAULT_NOMINAL_ENTRIES;
+ private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+ private boolean _intermediateOrdering = DEFAULT_INTERMEDIATE_ORDERING;
+ private float _samplingProbability = 1.0F;
Parameters(String parametersString) {
StringUtils.deleteWhitespace(parametersString);
@@ -1317,6 +1357,14 @@ public class DistinctCountThetaSketchAggregationFunction
String value = keyAndValue[1];
if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) {
_nominalEntries = Integer.parseInt(value);
+ } else if (key.equalsIgnoreCase(SAMPLING_PROBABILITY_KEY)) {
+ _samplingProbability = Float.parseFloat(value);
+ } else if (key.equalsIgnoreCase(RESIZE_FACTOR_KEY)) {
+ _resizeFactor = Integer.parseInt(value);
+ } else if (key.equalsIgnoreCase(INTERMEDIATE_ORDERING_KEY)) {
+ _intermediateOrdering = Boolean.parseBoolean(value);
+ } else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) {
+ _accumulatorThreshold = Integer.parseInt(value);
} else {
throw new IllegalArgumentException("Invalid parameter key: " + key);
}
@@ -1326,6 +1374,22 @@ public class DistinctCountThetaSketchAggregationFunction
int getNominalEntries() {
return _nominalEntries;
}
+
+ float getSamplingProbability() {
+ return _samplingProbability;
+ }
+
+ boolean getIntermediateOrdering() {
+ return _intermediateOrdering;
+ }
+
+ int getAccumulatorThreshold() {
+ return _accumulatorThreshold;
+ }
+
+ ResizeFactor getResizeFactor() {
+ return ResizeFactor.getRF(_resizeFactor);
+ }
}
/**
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 8fb198474c..b9f6f985b5 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -39,6 +39,10 @@ import java.util.Map;
import java.util.Random;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.UpdateSketch;
import
org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
import org.apache.pinot.segment.local.customobject.AvgPair;
@@ -49,6 +53,7 @@ import
org.apache.pinot.segment.local.customobject.LongLongPair;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
import org.testng.annotations.Test;
@@ -471,4 +476,52 @@ public class ObjectSerDeUtilsTest {
assertEquals(actual.getState(), ull.getState(), ERROR_MESSAGE);
}
}
+
+ @Test
+ public void testThetaSketch() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ UpdateSketch input = Sketches.updateSketchBuilder().build();
+ int size = RANDOM.nextInt(100) + 10;
+ boolean shouldOrder = RANDOM.nextBoolean();
+
+ for (int j = 0; j < size; j++) {
+ input.update(j);
+ }
+
+ Sketch sketch = input.compact(shouldOrder, null);
+
+ byte[] bytes = ObjectSerDeUtils.serialize(sketch);
+ Sketch actual = ObjectSerDeUtils.deserialize(bytes,
ObjectSerDeUtils.ObjectType.DataSketch);
+
+ assertEquals(actual.getEstimate(), sketch.getEstimate(), ERROR_MESSAGE);
+ assertEquals(actual.toByteArray(), sketch.toByteArray(), ERROR_MESSAGE);
+ assertEquals(actual.isOrdered(), shouldOrder, ERROR_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testThetaSketchAccumulator() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ UpdateSketch input = Sketches.updateSketchBuilder().build();
+ int size = RANDOM.nextInt(100) + 10;
+ boolean shouldOrder = RANDOM.nextBoolean();
+
+ for (int j = 0; j < size; j++) {
+ input.update(j);
+ }
+
+ SetOperationBuilder setOperationBuilder = new SetOperationBuilder();
+ ThetaSketchAccumulator accumulator = new
ThetaSketchAccumulator(setOperationBuilder, shouldOrder, 2);
+ Sketch sketch = input.compact(shouldOrder, null);
+ accumulator.apply(sketch);
+
+ byte[] bytes = ObjectSerDeUtils.serialize(accumulator);
+ ThetaSketchAccumulator actual =
+ ObjectSerDeUtils.deserialize(bytes,
ObjectSerDeUtils.ObjectType.ThetaSketchAccumulator);
+
+ assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(),
ERROR_MESSAGE);
+ assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(),
ERROR_MESSAGE);
+ assertEquals(actual.getResult().isOrdered(), shouldOrder, ERROR_MESSAGE);
+ }
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
index 11e31aaf0f..11662741db 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
@@ -20,6 +20,10 @@ package org.apache.pinot.core.function.scalar;
import com.dynatrace.hash4j.distinctcount.UltraLogLog;
import java.math.BigDecimal;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
import org.testng.Assert;
@@ -50,6 +54,15 @@ public class SketchFunctionsTest {
Assert.assertThrows(IllegalArgumentException.class, () ->
SketchFunctions.toThetaSketch(new Object(), 1024));
}
+ @Test
+ public void thetaThetaSketchSummary() {
+ for (Object i : _inputs) {
+ Sketch sketch =
Sketches.wrapSketch(Memory.wrap(SketchFunctions.toThetaSketch(i)));
+ Assert.assertEquals(SketchFunctions.thetaSketchToString(sketch),
sketch.toString());
+ }
+ Assert.assertThrows(RuntimeException.class, () ->
SketchFunctions.thetaSketchToString(new Object()));
+ }
+
private long hllEstimate(byte[] bytes) {
return
ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytes).cardinality();
}
@@ -97,6 +110,15 @@ public class SketchFunctionsTest {
Assert.assertThrows(IllegalArgumentException.class, () ->
SketchFunctions.toCpcSketch(new Object(), 11));
}
+ @Test
+ public void thetaCpcSketchToString() {
+ for (Object i : _inputs) {
+ CpcSketch sketch =
CpcSketch.heapify(Memory.wrap(SketchFunctions.toCpcSketch(i)));
+ Assert.assertEquals(SketchFunctions.cpcSketchToString(sketch),
sketch.toString());
+ }
+ Assert.assertThrows(RuntimeException.class, () ->
SketchFunctions.cpcSketchToString(new Object()));
+ }
+
private long ullEstimate(byte[] bytes) {
// round it to a long to make it easier to assert on
return
Math.round(ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize(bytes).getDistinctCountEstimate());
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
index 4d04539b4a..98e9b90900 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
@@ -37,6 +37,7 @@ import
org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.GroupByOperator;
import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -172,13 +173,13 @@ public class DistinctCountThetaSketchQueriesTest extends
BaseQueriesTest {
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 11);
for (int i = 0; i < 11; i++) {
- List<Sketch> sketches = (List<Sketch>) aggregationResult.get(i);
- assertEquals(sketches.size(), 1);
- Sketch sketch = sketches.get(0);
+ List<ThetaSketchAccumulator> accumulators =
(List<ThetaSketchAccumulator>) aggregationResult.get(i);
+ assertEquals(accumulators.size(), 1);
+ ThetaSketchAccumulator accumulator = accumulators.get(0);
if (i < 5) {
- assertEquals(Math.round(sketch.getEstimate()), NUM_RECORDS);
+ assertEquals(Math.round(accumulator.getResult().getEstimate()),
NUM_RECORDS);
} else {
- assertEquals(Math.round(sketch.getEstimate()), 3 * NUM_RECORDS);
+ assertEquals(Math.round(accumulator.getResult().getEstimate()), 3 *
NUM_RECORDS);
}
}
@@ -220,9 +221,10 @@ public class DistinctCountThetaSketchQueriesTest extends
BaseQueriesTest {
numGroups++;
GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
for (int i = 0; i < 6; i++) {
- List<Sketch> sketches = (List<Sketch>)
aggregationGroupByResult.getResultForGroupId(i, groupKey._groupId);
- assertEquals(sketches.size(), 1);
- Sketch sketch = sketches.get(0);
+ List<ThetaSketchAccumulator> accumulators =
+ (List<ThetaSketchAccumulator>)
aggregationGroupByResult.getResultForGroupId(i, groupKey._groupId);
+ assertEquals(accumulators.size(), 1);
+ Sketch sketch = accumulators.get(0).getResult();
if (i < 5) {
assertEquals(Math.round(sketch.getEstimate()), 1);
} else {
@@ -279,13 +281,13 @@ public class DistinctCountThetaSketchQueriesTest extends
BaseQueriesTest {
List<Object> aggregationResult = resultsBlock.getResults();
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 1);
- List<Sketch> sketches = (List<Sketch>) aggregationResult.get(0);
- assertEquals(sketches.size(), 5);
- assertTrue(sketches.get(0).isEmpty());
- assertEquals(Math.round(sketches.get(1).getEstimate()), 300);
- assertEquals(Math.round(sketches.get(2).getEstimate()), 450);
- assertEquals(Math.round(sketches.get(3).getEstimate()), 175);
- assertEquals(Math.round(sketches.get(4).getEstimate()), 100);
+ List<ThetaSketchAccumulator> accumulators = (List<ThetaSketchAccumulator>)
aggregationResult.get(0);
+ assertEquals(accumulators.size(), 5);
+ assertTrue(accumulators.get(0).getResult().isEmpty());
+ assertEquals(Math.round(accumulators.get(1).getResult().getEstimate()),
300);
+ assertEquals(Math.round(accumulators.get(2).getResult().getEstimate()),
450);
+ assertEquals(Math.round(accumulators.get(3).getResult().getEstimate()),
175);
+ assertEquals(Math.round(accumulators.get(4).getResult().getEstimate()),
100);
// Inter segments
Object[] expectedResults = new Object[]{225L};
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
new file mode 100644
index 0000000000..b1cb94812c
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import javax.annotation.Nonnull;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+
+
+/**
+ * Intermediate state used by {@code
DistinctCountThetaSketchAggregationFunction} which gives
+ * the end user more control over how sketches are merged for performance.
+ * The end user can set parameters that trade-off more memory usage for more
pre-aggregation.
+ * This permits use of the Union "early-stop" optimisation where ordered
sketches require no further
+ * processing beyond the minimum Theta value.
+ * The union operation initialises an empty "gadget" bookkeeping sketch that
is updated with hashed entries
+ * that fall below the minimum Theta value for all input sketches ("Broder
Rule"). When the initial
+ * Theta value is set to the minimum immediately, further gains can be
realised.
+ */
+public class ThetaSketchAccumulator {
+ private ArrayList<Sketch> _accumulator;
+ private boolean _ordered = false;
+ private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+ private Union _union;
+ private int _threshold;
+ private int _numInputs = 0;
+
+ public ThetaSketchAccumulator() {
+ }
+
+ // Note: The accumulator is serialized as a sketch. This means that the
majority of the processing
+ // happens on serialization. Therefore, when deserialized, the values may be
null and will
+ // require re-initialisation. Since the primary use case is at query time
for the Broker
+ // and Server, these properties are already in memory and are re-set.
+ public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder,
boolean ordered, int threshold) {
+ _setOperationBuilder = setOperationBuilder;
+ _ordered = ordered;
+ _threshold = threshold;
+ }
+
+ public void setOrdered(boolean ordered) {
+ _ordered = ordered;
+ }
+
+ public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
+ _setOperationBuilder = setOperationBuilder;
+ }
+
+ public void setThreshold(int threshold) {
+ _threshold = threshold;
+ }
+
+ public boolean isEmpty() {
+ return _numInputs == 0;
+ }
+
+ @Nonnull
+ public Sketch getResult() {
+ return unionAll();
+ }
+
+ public void apply(Sketch sketch) {
+ internalAdd(sketch);
+ }
+
+ public void merge(ThetaSketchAccumulator thetaUnion) {
+ if (thetaUnion.isEmpty()) {
+ return;
+ }
+ Sketch sketch = thetaUnion.getResult();
+ internalAdd(sketch);
+ }
+
+ private void internalAdd(Sketch sketch) {
+ if (sketch.isEmpty()) {
+ return;
+ }
+ if (_accumulator == null) {
+ _accumulator = new ArrayList<>(_threshold);
+ }
+ _accumulator.add(sketch);
+ _numInputs += 1;
+
+ if (_accumulator.size() >= _threshold) {
+ unionAll();
+ }
+ }
+
+ private Sketch unionAll() {
+ if (_union == null) {
+ _union = _setOperationBuilder.buildUnion();
+ }
+ // Return the default update "gadget" sketch as a compact sketch
+ if (isEmpty()) {
+ return _union.getResult(_ordered, null);
+ }
+ // Corner-case: the parameters are not strictly respected when there is a
single sketch.
+ // This single sketch might have been the result of a previously
accumulated union and
+ // would already have the parameters set. The sketch is returned as-is
without adjusting
+ // ordering and nominal entries which requires an additional union
operation.
+ if (_numInputs == 1) {
+ return _accumulator.get(0);
+ }
+
+ // Performance optimization: ensure that the minimum Theta is used for
"early stop".
+ // The "early stop" optimization is implemented in the Apache Datasketches
Union operation for
+ // ordered and compact Theta sketches. Internally, a compact and ordered
Theta sketch can be
+ // compared to a sorted array of K items. When performing a union, only
those items from
+ // the input sketch less than Theta need to be processed. The loop
terminates as soon as a hash
+ // is seen that is > Theta.
+ // The following "sort" improves on this further by selecting the minimal
Theta value up-front,
+ // which results in fewer redundant entries being retained and
subsequently discarded during the
+ // union operation.
+ _accumulator.sort(Comparator.comparingDouble(Sketch::getTheta));
+ for (Sketch accumulatedSketch : _accumulator) {
+ _union.union(accumulatedSketch);
+ }
+ _accumulator.clear();
+
+ return _union.getResult(_ordered, null);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
index dcd689afae..2879c474a5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
@@ -243,9 +243,12 @@ public class CustomSerDeUtils {
@Override
public byte[] serialize(Sketch value) {
- // NOTE: Compact the sketch in unsorted, on-heap fashion for performance
concern.
- // See https://datasketches.apache.org/docs/Theta/ThetaSize.html
for more details.
- return value.compact(false, null).toByteArray();
+ // The serializer should respect existing ordering to enable "early stop"
+ // optimisations on unions.
+ if (!value.isCompact()) {
+ return value.compact(value.isOrdered(), null).toByteArray();
+ }
+ return value.toByteArray();
}
@Override
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
index 822335cfb0..fdc820c120 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
@@ -27,7 +27,9 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
public class DistinctCountThetaSketchValueAggregatorTest {
@@ -162,4 +164,15 @@ public class DistinctCountThetaSketchValueAggregatorTest {
byte[][] zeroSketches = {};
assertEquals(agg.getInitialAggregatedValue(zeroSketches).getEstimate(),
0.0);
}
+
+ @Test
+ public void shouldRetainSketchOrdering() {
+ UpdateSketch input = Sketches.updateSketchBuilder().build();
+ IntStream.range(0, 10).forEach(input::update);
+ Sketch unordered = input.compact(false, null);
+ Sketch ordered = input.compact(true, null);
+ DistinctCountThetaSketchValueAggregator agg = new
DistinctCountThetaSketchValueAggregator();
+ assertTrue(agg.cloneAggregatedValue(ordered).isOrdered());
+ assertFalse(agg.cloneAggregatedValue(unordered).isOrdered());
+ }
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
new file mode 100644
index 0000000000..d34ddd6447
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.stream.IntStream;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class ThetaSketchAccumulatorTest {
+ private SetOperationBuilder _setOperationBuilder;
+
+ @BeforeMethod
+ public void setUp() {
+ _setOperationBuilder = new SetOperationBuilder();
+ }
+
+ @Test
+ public void testEmptyAccumulator() {
+ ThetaSketchAccumulator accumulator = new
ThetaSketchAccumulator(_setOperationBuilder, false, 2);
+ Assert.assertTrue(accumulator.isEmpty());
+ Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+ }
+
+ @Test
+ public void testAccumulatorWithSingleSketch() {
+ UpdateSketch input = Sketches.updateSketchBuilder().build();
+ IntStream.range(0, 1000).forEach(input::update);
+ Sketch sketch = input.compact();
+
+ ThetaSketchAccumulator accumulator = new
ThetaSketchAccumulator(_setOperationBuilder, false, 2);
+ accumulator.apply(sketch);
+
+ Assert.assertFalse(accumulator.isEmpty());
+ Assert.assertEquals(accumulator.getResult().getEstimate(),
sketch.getEstimate());
+ }
+
+ @Test
+ public void testAccumulatorMerge() {
+ UpdateSketch input1 = Sketches.updateSketchBuilder().build();
+ IntStream.range(0, 1000).forEach(input1::update);
+ Sketch sketch1 = input1.compact();
+ UpdateSketch input2 = Sketches.updateSketchBuilder().build();
+ IntStream.range(1000, 2000).forEach(input2::update);
+ Sketch sketch2 = input2.compact();
+
+ ThetaSketchAccumulator accumulator1 = new
ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+ accumulator1.apply(sketch1);
+ ThetaSketchAccumulator accumulator2 = new
ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+ accumulator2.apply(sketch2);
+ accumulator1.merge(accumulator2);
+
+ Assert.assertEquals(accumulator1.getResult().getEstimate(),
sketch1.getEstimate() + sketch2.getEstimate());
+ }
+
+ @Test
+ public void testThresholdBehavior() {
+ UpdateSketch input1 = Sketches.updateSketchBuilder().build();
+ IntStream.range(0, 1000).forEach(input1::update);
+ Sketch sketch1 = input1.compact();
+ UpdateSketch input2 = Sketches.updateSketchBuilder().build();
+ IntStream.range(1000, 2000).forEach(input2::update);
+ Sketch sketch2 = input2.compact();
+
+ ThetaSketchAccumulator accumulator = new
ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+ accumulator.apply(sketch1);
+ accumulator.apply(sketch2);
+
+ Assert.assertEquals(accumulator.getResult().getEstimate(),
sketch1.getEstimate() + sketch2.getEstimate());
+ }
+
+ @Test
+ public void testUnionWithEmptyInput() {
+ ThetaSketchAccumulator accumulator = new
ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+ ThetaSketchAccumulator emptyAccumulator = new
ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+
+ accumulator.merge(emptyAccumulator);
+
+ Assert.assertTrue(accumulator.isEmpty());
+ Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 09fe5129e0..84bcea58e2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -98,9 +98,9 @@ public class CommonConstants {
public static final int DEFAULT_HYPERLOGLOG_PLUS_P = 14;
public static final int DEFAULT_HYPERLOGLOG_PLUS_SP = 0;
- // 2 to the power of 16, for tradeoffs see datasketches library
documentation:
+ // 2 to the power of 14, for tradeoffs see datasketches library
documentation:
// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
- public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
+ public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 16384;
public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]