This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8404bc872e Add base class for custom object accumulator (#12685)
8404bc872e is described below
commit 8404bc872eb6540a9df75260c5206a2cd41d2ed1
Author: David Cromberge <[email protected]>
AuthorDate: Mon Mar 25 23:33:29 2024 +0000
Add base class for custom object accumulator (#12685)
---
.../apache/pinot/core/common/ObjectSerDeUtils.java | 72 +++++-
...ValueIntegerTupleSketchAggregationFunction.java | 22 +-
.../DistinctCountCPCSketchAggregationFunction.java | 248 +++++++++++++------
...CountIntegerTupleSketchAggregationFunction.java | 12 +-
...stinctCountRawCPCSketchAggregationFunction.java | 8 +-
.../IntegerTupleSketchAggregationFunction.java | 271 +++++++++++++++------
...aluesIntegerTupleSketchAggregationFunction.java | 16 +-
.../pinot/core/common/ObjectSerDeUtilsTest.java | 54 ++++
.../local/customobject/CpcSketchAccumulator.java | 79 ++++++
.../customobject/CustomObjectAccumulator.java | 121 +++++++++
.../local/customobject/ThetaSketchAccumulator.java | 53 +---
...mulator.java => TupleIntSketchAccumulator.java} | 93 +++----
.../customobject/CpcSketchAccumulatorTest.java | 92 +++++++
.../TupleIntSketchAccumulatorTest.java | 106 ++++++++
14 files changed, 957 insertions(+), 290 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 14aa2d2f10..80483b2640 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -80,6 +80,7 @@ import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.query.utils.idset.IdSets;
import org.apache.pinot.segment.local.customobject.AvgPair;
import org.apache.pinot.segment.local.customobject.CovarianceTuple;
+import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator;
import org.apache.pinot.segment.local.customobject.DoubleLongPair;
import org.apache.pinot.segment.local.customobject.FloatLongPair;
import org.apache.pinot.segment.local.customobject.IntLongPair;
@@ -89,6 +90,7 @@ import
org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
import org.apache.pinot.segment.local.customobject.VarianceTuple;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -156,7 +158,9 @@ public class ObjectSerDeUtils {
FloatArrayList(44),
StringArrayList(45),
UltraLogLog(46),
- ThetaSketchAccumulator(47);
+ ThetaSketchAccumulator(47),
+ TupleIntSketchAccumulator(48),
+ CpcSketchAccumulator(49);
private final int _value;
@@ -277,6 +281,10 @@ public class ObjectSerDeUtils {
return ObjectType.UltraLogLog;
} else if (value instanceof ThetaSketchAccumulator) {
return ObjectType.ThetaSketchAccumulator;
+ } else if (value instanceof TupleIntSketchAccumulator) {
+ return ObjectType.TupleIntSketchAccumulator;
+ } else if (value instanceof CpcSketchAccumulator) {
+ return ObjectType.CpcSketchAccumulator;
} else {
throw new IllegalArgumentException("Unsupported type of value: " +
value.getClass().getSimpleName());
}
@@ -1587,7 +1595,7 @@ public class ObjectSerDeUtils {
}
};
- public static final ObjectSerDe<ThetaSketchAccumulator>
DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE =
+ public static final ObjectSerDe<ThetaSketchAccumulator>
DATA_SKETCH_THETA_ACCUMULATOR_SER_DE =
new ObjectSerDe<ThetaSketchAccumulator>() {
@Override
@@ -1614,6 +1622,62 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<TupleIntSketchAccumulator>
DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE =
+ new ObjectSerDe<TupleIntSketchAccumulator>() {
+
+ @Override
+ public byte[] serialize(TupleIntSketchAccumulator
tupleIntSketchBuffer) {
+ org.apache.datasketches.tuple.Sketch<IntegerSummary> sketch =
tupleIntSketchBuffer.getResult();
+ return sketch.toByteArray();
+ }
+
+ @Override
+ public TupleIntSketchAccumulator deserialize(byte[] bytes) {
+ return deserialize(ByteBuffer.wrap(bytes));
+ }
+
+ // Note: The accumulator is designed to serialize as a sketch and
should
+ // not be deserialized in practice.
+ @Override
+ public TupleIntSketchAccumulator deserialize(ByteBuffer byteBuffer) {
+ TupleIntSketchAccumulator tupleIntSketchAccumulator = new
TupleIntSketchAccumulator();
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ org.apache.datasketches.tuple.Sketch<IntegerSummary> sketch =
+
org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+ new IntegerSummaryDeserializer());
+ tupleIntSketchAccumulator.apply(sketch);
+ return tupleIntSketchAccumulator;
+ }
+ };
+
+ public static final ObjectSerDe<CpcSketchAccumulator>
DATA_SKETCH_CPC_ACCUMULATOR_SER_DE =
+ new ObjectSerDe<CpcSketchAccumulator>() {
+
+ @Override
+ public byte[] serialize(CpcSketchAccumulator cpcSketchBuffer) {
+ CpcSketch sketch = cpcSketchBuffer.getResult();
+ return sketch.toByteArray();
+ }
+
+ @Override
+ public CpcSketchAccumulator deserialize(byte[] bytes) {
+ return deserialize(ByteBuffer.wrap(bytes));
+ }
+
+ // Note: The accumulator is designed to serialize as a sketch and
should
+ // not be deserialized in practice.
+ @Override
+ public CpcSketchAccumulator deserialize(ByteBuffer byteBuffer) {
+ CpcSketchAccumulator cpcSketchAccumulator = new
CpcSketchAccumulator();
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ CpcSketch sketch = CpcSketch.heapify(Memory.wrap(bytes));
+ cpcSketchAccumulator.apply(sketch);
+ return cpcSketchAccumulator;
+ }
+ };
+
// NOTE: DO NOT change the order, it has to be the same order as the
ObjectType
//@formatter:off
private static final ObjectSerDe[] SER_DES = {
@@ -1664,7 +1728,9 @@ public class ObjectSerDeUtils {
FLOAT_ARRAY_LIST_SER_DE,
STRING_ARRAY_LIST_SER_DE,
ULTRA_LOG_LOG_OBJECT_SER_DE,
- DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE,
+ DATA_SKETCH_THETA_ACCUMULATOR_SER_DE,
+ DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE,
+ DATA_SKETCH_CPC_ACCUMULATOR_SER_DE,
};
//@formatter:on
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
index 3b3718dba2..16fd6751b2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
@@ -19,12 +19,12 @@
package org.apache.pinot.core.query.aggregation.function;
import java.util.List;
-import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.TupleSketchIterator;
-import org.apache.datasketches.tuple.Union;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -48,22 +48,20 @@ public class AvgValueIntegerTupleSketchAggregationFunction
}
@Override
- public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>>
integerSummarySketches) {
- if (integerSummarySketches == null) {
+ public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) {
+ accumulator.setNominalEntries(_nominalEntries);
+ accumulator.setSetOperations(_setOps);
+ accumulator.setThreshold(_accumulatorThreshold);
+ Sketch<IntegerSummary> result = accumulator.getResult();
+ if (result.isEmpty() || result.getRetainedEntries() == 0) {
+ // there is nothing to average, return null
return null;
}
- Union<IntegerSummary> union = new Union<>(_entries, _setOps);
- integerSummarySketches.forEach(union::union);
- double retainedTotal = 0L;
- CompactSketch<IntegerSummary> result = union.getResult();
TupleSketchIterator<IntegerSummary> summaries = result.iterator();
+ double retainedTotal = 0L;
while (summaries.next()) {
retainedTotal += summaries.getSummary().getValue();
}
- if (result.getRetainedEntries() == 0) {
- // there is nothing to average, return null
- return null;
- }
double estimate = retainedTotal / result.getRetainedEntries();
return Math.round(estimate);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
index 1946200842..4a33086bb8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
@@ -21,16 +21,17 @@ package org.apache.pinot.core.query.aggregation.function;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
import org.apache.datasketches.cpc.CpcSketch;
-import org.apache.datasketches.cpc.CpcUnion;
+import org.apache.datasketches.memory.Memory;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec;
@@ -80,8 +81,10 @@ import org.roaringbitmap.RoaringBitmap;
*/
@SuppressWarnings({"rawtypes"})
public class DistinctCountCPCSketchAggregationFunction
- extends BaseSingleInputAggregationFunction<CpcSketch, Comparable> {
- protected final int _lgK;
+ extends BaseSingleInputAggregationFunction<CpcSketchAccumulator,
Comparable> {
+ private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2;
+ protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+ protected int _lgNominalEntries;
public DistinctCountCPCSketchAggregationFunction(List<ExpressionContext>
arguments) {
super(arguments.get(0));
@@ -92,9 +95,22 @@ public class DistinctCountCPCSketchAggregationFunction
Preconditions.checkArgument(numExpressions <= 2, "DistinctCountCPC expects
1 or 2 arguments, got: %s",
numExpressions);
if (arguments.size() == 2) {
- _lgK = arguments.get(1).getLiteral().getIntValue();
+ ExpressionContext secondArgument = arguments.get(1);
+ Preconditions.checkArgument(secondArgument.getType() ==
ExpressionContext.Type.LITERAL,
+ "CPC Sketch Aggregation Function expects the second argument to be a
literal (parameters)," + " but got: ",
+ secondArgument.getType());
+
+ if (secondArgument.getLiteral().getType() == FieldSpec.DataType.STRING) {
+ Parameters parameters = new
Parameters(secondArgument.getLiteral().getStringValue());
+ // Allows the user to trade-off memory usage for merge CPU; higher
values use more memory
+ _accumulatorThreshold = parameters.getAccumulatorThreshold();
+ // Nominal entries controls sketch accuracy and size
+ _lgNominalEntries = parameters.getLgNominalEntries();
+ } else {
+ _lgNominalEntries = secondArgument.getLiteral().getIntValue();
+ }
} else {
- _lgK = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK;
+ _lgNominalEntries = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK;
}
}
@@ -123,15 +139,11 @@ public class DistinctCountCPCSketchAggregationFunction
if (storedType == DataType.BYTES) {
byte[][] bytesValues = blockValSet.getBytesValuesSV();
try {
- CpcSketch cpcSketch = aggregationResultHolder.getResult();
- CpcUnion union = new CpcUnion(_lgK);
- if (cpcSketch != null) {
- union.update(cpcSketch);
- }
- for (int i = 0; i < length; i++) {
-
union.update(ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]));
+ CpcSketchAccumulator cpcSketchAccumulator =
getAccumulator(aggregationResultHolder);
+ CpcSketch[] sketches = deserializeSketches(bytesValues, length);
+ for (CpcSketch sketch : sketches) {
+ cpcSketchAccumulator.apply(sketch);
}
- aggregationResultHolder.setValue(union.getResult());
} catch (Exception e) {
throw new RuntimeException("Caught exception while merging CPC
sketches", e);
}
@@ -182,6 +194,8 @@ public class DistinctCountCPCSketchAggregationFunction
default:
throw new IllegalStateException("Illegal data type for
DISTINCT_COUNT_CPC aggregation function: " + storedType);
}
+ CpcSketchAccumulator cpcSketchAccumulator =
getAccumulator(aggregationResultHolder);
+ cpcSketchAccumulator.apply(cpcSketch);
}
@Override
@@ -191,24 +205,17 @@ public class DistinctCountCPCSketchAggregationFunction
// Treat BYTES value as serialized CPC Sketch
DataType storedType = blockValSet.getValueType().getStoredType();
- if (storedType == DataType.BYTES) {
+ if (storedType == FieldSpec.DataType.BYTES) {
byte[][] bytesValues = blockValSet.getBytesValuesSV();
try {
+ CpcSketch[] sketches = deserializeSketches(bytesValues, length);
for (int i = 0; i < length; i++) {
- CpcSketch value =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]);
- int groupKey = groupKeyArray[i];
- CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
- if (cpcSketch != null) {
- CpcUnion union = new CpcUnion(_lgK);
- union.update(cpcSketch);
- union.update(value);
- groupByResultHolder.setValueForKey(groupKey, union.getResult());
- } else {
- groupByResultHolder.setValueForKey(groupKey, value);
- }
+ CpcSketchAccumulator cpcSketchAccumulator =
getAccumulator(groupByResultHolder, groupKeyArray[i]);
+ CpcSketch sketch = sketches[i];
+ cpcSketchAccumulator.apply(sketch);
}
} catch (Exception e) {
- throw new RuntimeException("Caught exception while merging CPC
sketches", e);
+ throw new RuntimeException("Caught exception while aggregating CPC
Sketches", e);
}
return;
}
@@ -267,25 +274,19 @@ public class DistinctCountCPCSketchAggregationFunction
// Treat BYTES value as serialized CPC Sketch
DataType storedType = blockValSet.getValueType().getStoredType();
- if (storedType == DataType.BYTES) {
+ boolean singleValue = blockValSet.isSingleValue();
+
+ if (singleValue && storedType == DataType.BYTES) {
byte[][] bytesValues = blockValSet.getBytesValuesSV();
try {
+ CpcSketch[] sketches = deserializeSketches(bytesValues, length);
for (int i = 0; i < length; i++) {
- CpcSketch value =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]);
for (int groupKey : groupKeysArray[i]) {
- CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
- if (cpcSketch != null) {
- CpcUnion union = new CpcUnion(_lgK);
- union.update(cpcSketch);
- union.update(value);
- groupByResultHolder.setValueForKey(groupKey, union.getResult());
- } else {
- groupByResultHolder.setValueForKey(groupKey, value);
- }
+ getAccumulator(groupByResultHolder, groupKey).apply(sketches[i]);
}
}
} catch (Exception e) {
- throw new RuntimeException("Caught exception while merging CPC
sketches", e);
+ throw new RuntimeException("Caught exception while aggregating CPC
sketches", e);
}
return;
}
@@ -348,51 +349,50 @@ public class DistinctCountCPCSketchAggregationFunction
}
@Override
- public CpcSketch extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ public CpcSketchAccumulator extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
Object result = aggregationResultHolder.getResult();
if (result == null) {
- return new CpcSketch(_lgK);
+ return new CpcSketchAccumulator(_lgNominalEntries,
_accumulatorThreshold);
}
- if (result instanceof DictIdsWrapper) {
+ if (result instanceof CpcSketch) {
+ return convertSketchAccumulator(result);
+ } else if (result instanceof DictIdsWrapper) {
// For dictionary-encoded expression, convert dictionary ids to CpcSketch
- return convertToCpcSketch((DictIdsWrapper) result);
+ return convertSketchAccumulator(dictionaryToCpcSketch((DictIdsWrapper)
result));
} else {
- // For non-dictionary-encoded expression, directly return the CpcSketch
- return (CpcSketch) result;
+ return (CpcSketchAccumulator) result;
}
}
@Override
- public CpcSketch extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ public CpcSketchAccumulator extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
Object result = groupByResultHolder.getResult(groupKey);
if (result == null) {
- return new CpcSketch(_lgK);
+ return new CpcSketchAccumulator(_lgNominalEntries,
_accumulatorThreshold);
}
- if (result instanceof DictIdsWrapper) {
+ if (result instanceof CpcSketch) {
+ return convertSketchAccumulator(result);
+ } else if (result instanceof DictIdsWrapper) {
// For dictionary-encoded expression, convert dictionary ids to CpcSketch
- return convertToCpcSketch((DictIdsWrapper) result);
+ return convertSketchAccumulator(dictionaryToCpcSketch((DictIdsWrapper)
result));
} else {
- // For non-dictionary-encoded expression, directly return the CpcSketch
- return (CpcSketch) result;
+ return (CpcSketchAccumulator) result;
}
}
@Override
- public CpcSketch merge(CpcSketch intermediateResult1, CpcSketch
intermediateResult2) {
- if (intermediateResult1 == null && intermediateResult2 != null) {
+ public CpcSketchAccumulator merge(CpcSketchAccumulator intermediateResult1,
+ CpcSketchAccumulator intermediateResult2) {
+ if (intermediateResult1 == null || intermediateResult1.isEmpty()) {
return intermediateResult2;
- } else if (intermediateResult1 != null && intermediateResult2 == null) {
+ }
+ if (intermediateResult2 == null || intermediateResult2.isEmpty()) {
return intermediateResult1;
- } else if (intermediateResult1 == null) {
- return new CpcSketch(_lgK);
}
-
- CpcUnion union = new CpcUnion(_lgK);
- union.update(intermediateResult1);
- union.update(intermediateResult2);
- return union.getResult();
+ intermediateResult1.merge(intermediateResult2);
+ return intermediateResult1;
}
@Override
@@ -406,8 +406,22 @@ public class DistinctCountCPCSketchAggregationFunction
}
@Override
- public Comparable extractFinalResult(CpcSketch intermediateResult) {
- return Math.round(intermediateResult.getEstimate());
+ public Comparable extractFinalResult(CpcSketchAccumulator
intermediateResult) {
+ intermediateResult.setLgNominalEntries(_lgNominalEntries);
+ intermediateResult.setThreshold(_accumulatorThreshold);
+ return Math.round(intermediateResult.getResult().getEstimate());
+ }
+
+ /**
+ * Returns the CpcSketch from the result holder or creates a new one if it
does not exist.
+ */
+ protected CpcSketch getCpcSketch(AggregationResultHolder
aggregationResultHolder) {
+ CpcSketch cpcSketch = aggregationResultHolder.getResult();
+ if (cpcSketch == null) {
+ cpcSketch = new CpcSketch(_lgNominalEntries);
+ aggregationResultHolder.setValue(cpcSketch);
+ }
+ return cpcSketch;
}
/**
@@ -423,18 +437,6 @@ public class DistinctCountCPCSketchAggregationFunction
return dictIdsWrapper._dictIdBitmap;
}
- /**
- * Returns the CpcSketch from the result holder or creates a new one if it
does not exist.
- */
- protected CpcSketch getCpcSketch(AggregationResultHolder
aggregationResultHolder) {
- CpcSketch cpcSketch = aggregationResultHolder.getResult();
- if (cpcSketch == null) {
- cpcSketch = new CpcSketch(_lgK);
- aggregationResultHolder.setValue(cpcSketch);
- }
- return cpcSketch;
- }
-
/**
* Returns the dictionary id bitmap for the given group key or creates a new
one if it does not exist.
*/
@@ -454,7 +456,7 @@ public class DistinctCountCPCSketchAggregationFunction
protected CpcSketch getCpcSketch(GroupByResultHolder groupByResultHolder,
int groupKey) {
CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
if (cpcSketch == null) {
- cpcSketch = new CpcSketch(_lgK);
+ cpcSketch = new CpcSketch(_lgNominalEntries);
groupByResultHolder.setValueForKey(groupKey, cpcSketch);
}
return cpcSketch;
@@ -470,8 +472,8 @@ public class DistinctCountCPCSketchAggregationFunction
}
}
- private CpcSketch convertToCpcSketch(DictIdsWrapper dictIdsWrapper) {
- CpcSketch cpcSketch = new CpcSketch(_lgK);
+ private CpcSketch dictionaryToCpcSketch(DictIdsWrapper dictIdsWrapper) {
+ CpcSketch cpcSketch = new CpcSketch(_lgNominalEntries);
Dictionary dictionary = dictIdsWrapper._dictionary;
RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
@@ -528,6 +530,56 @@ public class DistinctCountCPCSketchAggregationFunction
}
}
+ /**
+ * Returns the accumulator from the result holder or creates a new one if it
does not exist.
+ */
+ private CpcSketchAccumulator getAccumulator(AggregationResultHolder
aggregationResultHolder) {
+ CpcSketchAccumulator accumulator = aggregationResultHolder.getResult();
+ if (accumulator == null) {
+ accumulator = new CpcSketchAccumulator(_lgNominalEntries,
_accumulatorThreshold);
+ aggregationResultHolder.setValue(accumulator);
+ }
+ return accumulator;
+ }
+
+ /**
+ * Returns the accumulator for the given group key or creates a new one if
it does not exist.
+ */
+ private CpcSketchAccumulator getAccumulator(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ CpcSketchAccumulator accumulator = groupByResultHolder.getResult(groupKey);
+ if (accumulator == null) {
+ accumulator = new CpcSketchAccumulator(_lgNominalEntries,
_accumulatorThreshold);
+ groupByResultHolder.setValueForKey(groupKey, accumulator);
+ }
+ return accumulator;
+ }
+
+ /**
+ * Deserializes the sketches from the bytes.
+ */
+ @SuppressWarnings({"unchecked"})
+ private CpcSketch[] deserializeSketches(byte[][] serializedSketches, int
length) {
+ CpcSketch[] sketches = new CpcSketch[length];
+ for (int i = 0; i < length; i++) {
+ sketches[i] = CpcSketch.heapify(Memory.wrap(serializedSketches[i]));
+ }
+ return sketches;
+ }
+
+ // This ensures backward compatibility with servers that still return
sketches directly.
+ // The AggregationDataTableReducer casts intermediate results to Objects and
although the code compiles,
+ // types might still be incompatible at runtime due to type erasure.
+ // Due to performance overheads of redundant casts, this should be removed
at some future point.
+ protected CpcSketchAccumulator convertSketchAccumulator(Object result) {
+ if (result instanceof CpcSketch) {
+ CpcSketch sketch = (CpcSketch) result;
+ CpcSketchAccumulator accumulator = new
CpcSketchAccumulator(_lgNominalEntries, _accumulatorThreshold);
+ accumulator.apply(sketch);
+ return accumulator;
+ }
+ return (CpcSketchAccumulator) result;
+ }
+
private static final class DictIdsWrapper {
final Dictionary _dictionary;
final RoaringBitmap _dictIdBitmap;
@@ -537,4 +589,44 @@ public class DistinctCountCPCSketchAggregationFunction
_dictIdBitmap = new RoaringBitmap();
}
}
+
+ /**
+ * Helper class to wrap the CpcSketch parameters. The initial values for
the parameters are set to the
+ * same defaults in the Apache Datasketches library.
+ */
+ private static class Parameters {
+ private static final char PARAMETER_DELIMITER = ';';
+ private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
+ private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
+ private static final String ACCUMULATOR_THRESHOLD_KEY =
"accumulatorThreshold";
+
+ private int _nominalEntries = (int) Math.pow(2,
CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+ private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+
+ Parameters(String parametersString) {
+ StringUtils.deleteWhitespace(parametersString);
+ String[] keyValuePairs = StringUtils.split(parametersString,
PARAMETER_DELIMITER);
+ for (String keyValuePair : keyValuePairs) {
+ String[] keyAndValue = StringUtils.split(keyValuePair,
PARAMETER_KEY_VALUE_SEPARATOR);
+ Preconditions.checkArgument(keyAndValue.length == 2, "Invalid
parameter: %s", keyValuePair);
+ String key = keyAndValue[0];
+ String value = keyAndValue[1];
+ if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) {
+ _nominalEntries = Integer.parseInt(value);
+ } else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) {
+ _accumulatorThreshold = Integer.parseInt(value);
+ } else {
+ throw new IllegalArgumentException("Invalid parameter key: " + key);
+ }
+ }
+ }
+
+ int getLgNominalEntries() {
+ return
org.apache.datasketches.common.Util.exactLog2OfInt(_nominalEntries);
+ }
+
+ int getAccumulatorThreshold() {
+ return _accumulatorThreshold;
+ }
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
index 087337472d..68ec18e401 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
@@ -19,11 +19,10 @@
package org.apache.pinot.core.query.aggregation.function;
import java.util.List;
-import org.apache.datasketches.tuple.CompactSketch;
-import org.apache.datasketches.tuple.Union;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -46,9 +45,10 @@ public class
DistinctCountIntegerTupleSketchAggregationFunction extends IntegerT
}
@Override
- public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>>
integerSummarySketches) {
- Union<IntegerSummary> union = new Union<>(_entries, _setOps);
- integerSummarySketches.forEach(union::union);
- return Double.valueOf(union.getResult().getEstimate()).longValue();
+ public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) {
+ accumulator.setNominalEntries(_nominalEntries);
+ accumulator.setSetOperations(_setOps);
+ accumulator.setThreshold(_accumulatorThreshold);
+ return Double.valueOf(accumulator.getResult().getEstimate()).longValue();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
index ab153c8835..ff3a587881 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
@@ -19,9 +19,9 @@
package org.apache.pinot.core.query.aggregation.function;
import java.util.List;
-import org.apache.datasketches.cpc.CpcSketch;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator;
import org.apache.pinot.segment.local.customobject.SerializedCPCSketch;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -47,7 +47,9 @@ public class DistinctCountRawCPCSketchAggregationFunction
extends DistinctCountC
}
@Override
- public SerializedCPCSketch extractFinalResult(CpcSketch sketch) {
- return new SerializedCPCSketch(sketch);
+ public SerializedCPCSketch extractFinalResult(CpcSketchAccumulator
intermediateResult) {
+ intermediateResult.setLgNominalEntries(_lgNominalEntries);
+ intermediateResult.setThreshold(_accumulatorThreshold);
+ return new SerializedCPCSketch(intermediateResult.getResult());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
index 1fdace955c..992ef5d7a1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
@@ -19,41 +19,93 @@
package org.apache.pinot.core.query.aggregation.function;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Base64;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.tuple.Sketch;
-import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.Sketches;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer;
import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.CommonConstants;
-/***
- * This is the base class for all Integer Tuple Sketch aggregations
+/**
+ * The {@code IntegerTupleSketchAggregationFunction} is the base class for all
integer-based Tuple Sketch aggregations.
+ * Apache Datasketches Tuple Sketches are an extension of the Apache
Datasketches Theta Sketch. Tuple sketches store an
+ * additional summary value with each retained entry which makes the sketch
ideal for summarizing attributes
+ * such as impressions or clicks.
+ *
+ * Tuple sketches are interoperable with the Theta Sketch and enable set
operations over a stream of data, and can
+ * also be used for cardinality estimation.
+ *
+ * Note: The current implementation of this aggregation function is limited to
binary columns that contain sketches
+ * built outside of Pinot.
*
- * Note that it only supports BYTES columns containing serialized sketches
currently, but could be expanded to more
+ * Usage examples:
+ * <ul>
+ * <li>
+ * Simple union (1 or 2 arguments): main expression to aggregate on,
followed by an optional Tuple sketch size
+ * argument. The second argument is the sketch lgK – the given log_base2
of k, and defaults to 16.
+ * The "raw" equivalents return serialised sketches in base64-encoded
strings.
+ * <p>DISTINCT_COUNT_TUPLE_SKETCH(col)</p>
+ * <p>DISTINCT_COUNT_TUPLE_SKETCH(col, 12)</p>
+ * <p>DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col)</p>
+ * <p>DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col, 12)</p>
+ * <li>
+ * Extracting a cardinality estimate from a CPC sketch:
+ * <p>GET_INT_TUPLE_SKETCH_ESTIMATE(sketch_bytes)</p>
+ *
<p>GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_TUPLE_SKETCH(col))</p>
+ * </li>
+ * <li>
+ * Union between two sketches summaries are merged using addition for hash
keys in common:
+ * <p>
+ * INT_SUM_TUPLE_SKETCH_UNION(
+ * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col1),
+ * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col2)
+ * )
+ * </p>
+ * </li>
+ * <li>
+ * Union between two sketches summaries are merged using maximum for hash
keys in common:
+ * <p>
+ * INT_MAX_TUPLE_SKETCH_UNION(
+ * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col1),
+ * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col2)
+ * )
+ * </p>
+ * </li>
+ * <li>
+ * Union between two sketches summaries are merged using minimum for hash
keys in common:
+ * <p>
+ * INT_MIN_TUPLE_SKETCH_UNION(
+ * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col1),
+ * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col2)
+ * )
+ * </p>
+ * </li>
+ * </ul>
*/
+@SuppressWarnings({"rawtypes"})
public class IntegerTupleSketchAggregationFunction
- extends
BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>,
Comparable> {
+ extends BaseSingleInputAggregationFunction<TupleIntSketchAccumulator,
Comparable> {
+ private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2;
final ExpressionContext _expressionContext;
final IntegerSummarySetOperations _setOps;
- final int _entries;
+ protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+ protected int _nominalEntries;
public IntegerTupleSketchAggregationFunction(List<ExpressionContext>
arguments, IntegerSummary.Mode mode) {
super(arguments.get(0));
@@ -65,11 +117,20 @@ public class IntegerTupleSketchAggregationFunction
if (arguments.size() == 2) {
ExpressionContext secondArgument = arguments.get(1);
Preconditions.checkArgument(secondArgument.getType() ==
ExpressionContext.Type.LITERAL,
- "Tuple Sketch Aggregation Function expects the second argument to be
a literal (number of entries to keep),"
- + " but got: ", secondArgument.getType());
- _entries = secondArgument.getLiteral().getIntValue();
+ "Tuple Sketch Aggregation Function expects the second argument to be
a literal (parameters)," + " but got: ",
+ secondArgument.getType());
+
+ if (secondArgument.getLiteral().getType() == FieldSpec.DataType.STRING) {
+ Parameters parameters = new
Parameters(secondArgument.getLiteral().getStringValue());
+ // Allows the user to trade-off memory usage for merge CPU; higher
values use more memory
+ _accumulatorThreshold = parameters.getAccumulatorThreshold();
+ // Nominal entries controls sketch accuracy and size
+ _nominalEntries = parameters.getNominalEntries();
+ } else {
+ _nominalEntries = secondArgument.getLiteral().getIntValue();
+ }
} else {
- _entries = (int) Math.pow(2,
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+ _nominalEntries = (int) Math.pow(2,
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
}
}
@@ -99,20 +160,13 @@ public class IntegerTupleSketchAggregationFunction
if (storedType == FieldSpec.DataType.BYTES) {
byte[][] bytesValues = blockValSet.getBytesValuesSV();
try {
- List<CompactSketch<IntegerSummary>> integerSketch =
aggregationResultHolder.getResult();
- if (integerSketch != null) {
- List<CompactSketch<IntegerSummary>> sketches =
-
Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
- .map(Sketch::compact).collect(Collectors.toList());
-
aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(),
sketches));
- } else {
- List<CompactSketch<IntegerSummary>> sketches =
-
Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
- .map(Sketch::compact).collect(Collectors.toList());
- aggregationResultHolder.setValue(sketches);
+ TupleIntSketchAccumulator tupleIntSketchAccumulator =
getAccumulator(aggregationResultHolder);
+ Sketch<IntegerSummary>[] sketches = deserializeSketches(bytesValues,
length);
+ for (Sketch<IntegerSummary> sketch : sketches) {
+ tupleIntSketchAccumulator.apply(sketch);
}
} catch (Exception e) {
- throw new RuntimeException("Caught exception while merging Tuple
Sketches", e);
+ throw new RuntimeException("Caught exception while aggregating Tuple
Sketches", e);
}
} else {
throw new IllegalStateException("Illegal data type for " + getType() + "
aggregation function: " + storedType);
@@ -131,21 +185,14 @@ public class IntegerTupleSketchAggregationFunction
if (storedType == FieldSpec.DataType.BYTES) {
byte[][] bytesValues = blockValSet.getBytesValuesSV();
try {
+ Sketch<IntegerSummary>[] sketches = deserializeSketches(bytesValues,
length);
for (int i = 0; i < length; i++) {
- byte[] value = bytesValues[i];
- int groupKey = groupKeyArray[i];
- CompactSketch<IntegerSummary> newSketch =
-
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
- if (groupByResultHolder.getResult(groupKey) == null) {
- ArrayList<CompactSketch<IntegerSummary>> newList = new
ArrayList<>();
- newList.add(newSketch);
- groupByResultHolder.setValueForKey(groupKey, newList);
- } else {
-
groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
- }
+ TupleIntSketchAccumulator tupleIntSketchAccumulator =
getAccumulator(groupByResultHolder, groupKeyArray[i]);
+ Sketch<IntegerSummary> sketch = sketches[i];
+ tupleIntSketchAccumulator.apply(sketch);
}
} catch (Exception e) {
- throw new RuntimeException("Caught exception while merging Tuple
Sketches", e);
+ throw new RuntimeException("Caught exception while aggregating Tuple
Sketches", e);
}
} else {
throw new IllegalStateException(
@@ -156,47 +203,55 @@ public class IntegerTupleSketchAggregationFunction
@Override
public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
- byte[][] valueArray = blockValSetMap.get(_expression).getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- byte[] value = valueArray[i];
- CompactSketch<IntegerSummary> newSketch =
-
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
- for (int groupKey : groupKeysArray[i]) {
- if (groupByResultHolder.getResult(groupKey) == null) {
- groupByResultHolder.setValueForKey(groupKey,
Collections.singletonList(newSketch));
- } else {
-
groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized Integer Tuple Sketch
+ FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+ boolean singleValue = blockValSet.isSingleValue();
+
+ if (singleValue && storedType == FieldSpec.DataType.BYTES) {
+ byte[][] bytesValues =
blockValSetMap.get(_expression).getBytesValuesSV();
+ try {
+ Sketch<IntegerSummary>[] sketches = deserializeSketches(bytesValues,
length);
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ getAccumulator(groupByResultHolder, groupKey).apply(sketches[i]);
+ }
}
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while aggregating Tuple
Sketches", e);
}
+ } else {
+ throw new IllegalStateException(
+ "Illegal data type for INTEGER_TUPLE_SKETCH_UNION aggregation
function: " + storedType);
}
}
@Override
- public List<CompactSketch<IntegerSummary>>
extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
- return aggregationResultHolder.getResult();
+ public TupleIntSketchAccumulator
extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+ TupleIntSketchAccumulator result = aggregationResultHolder.getResult();
+ if (result == null) {
+ return new TupleIntSketchAccumulator(_setOps, _nominalEntries,
_accumulatorThreshold);
+ }
+ return result;
}
@Override
- public List<CompactSketch<IntegerSummary>>
extractGroupByResult(GroupByResultHolder groupByResultHolder,
- int groupKey) {
+ public TupleIntSketchAccumulator extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
return groupByResultHolder.getResult(groupKey);
}
@Override
- public List<CompactSketch<IntegerSummary>>
merge(List<CompactSketch<IntegerSummary>> intermediateResult1,
- List<CompactSketch<IntegerSummary>> intermediateResult2) {
- if (intermediateResult1 == null && intermediateResult2 != null) {
+ public TupleIntSketchAccumulator merge(TupleIntSketchAccumulator
intermediateResult1,
+ TupleIntSketchAccumulator intermediateResult2) {
+ if (intermediateResult1 == null || intermediateResult1.isEmpty()) {
return intermediateResult2;
- } else if (intermediateResult1 != null && intermediateResult2 == null) {
+ }
+ if (intermediateResult2 == null || intermediateResult2.isEmpty()) {
return intermediateResult1;
- } else if (intermediateResult1 == null && intermediateResult2 == null) {
- return new ArrayList<>(0);
}
- ArrayList<CompactSketch<IntegerSummary>> merged =
- new ArrayList<>(intermediateResult1.size() +
intermediateResult2.size());
- merged.addAll(intermediateResult1);
- merged.addAll(intermediateResult2);
- return merged;
+ intermediateResult1.merge(intermediateResult2);
+ return intermediateResult1;
}
@Override
@@ -210,12 +265,86 @@ public class IntegerTupleSketchAggregationFunction
}
@Override
- public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>>
integerSummarySketches) {
- if (integerSummarySketches == null) {
- return null;
+ public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) {
+ accumulator.setNominalEntries(_nominalEntries);
+ accumulator.setSetOperations(_setOps);
+ accumulator.setThreshold(_accumulatorThreshold);
+ return
Base64.getEncoder().encodeToString(accumulator.getResult().toByteArray());
+ }
+
+ /**
+ * Returns the accumulator from the result holder or creates a new one if it
does not exist.
+ */
+ private TupleIntSketchAccumulator getAccumulator(AggregationResultHolder
aggregationResultHolder) {
+ TupleIntSketchAccumulator accumulator =
aggregationResultHolder.getResult();
+ if (accumulator == null) {
+ accumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries,
_accumulatorThreshold);
+ aggregationResultHolder.setValue(accumulator);
+ }
+ return accumulator;
+ }
+
+ /**
+ * Returns the accumulator for the given group key or creates a new one if
it does not exist.
+ */
+ private TupleIntSketchAccumulator getAccumulator(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ TupleIntSketchAccumulator accumulator =
groupByResultHolder.getResult(groupKey);
+ if (accumulator == null) {
+ accumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries,
_accumulatorThreshold);
+ groupByResultHolder.setValueForKey(groupKey, accumulator);
+ }
+ return accumulator;
+ }
+
+ /**
+ * Deserializes the sketches from the bytes.
+ */
+ @SuppressWarnings({"unchecked"})
+ private Sketch<IntegerSummary>[] deserializeSketches(byte[][]
serializedSketches, int length) {
+ Sketch<IntegerSummary>[] sketches = new Sketch[length];
+ for (int i = 0; i < length; i++) {
+ sketches[i] = Sketches.heapifySketch(Memory.wrap(serializedSketches[i]),
new IntegerSummaryDeserializer());
+ }
+ return sketches;
+ }
+
+ /**
+ * Helper class to wrap the tuple-sketch parameters. The initial values for
the parameters are set to the
+ * same defaults in the Apache Datasketches library.
+ */
+ private static class Parameters {
+ private static final char PARAMETER_DELIMITER = ';';
+ private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
+ private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
+ private static final String ACCUMULATOR_THRESHOLD_KEY =
"accumulatorThreshold";
+
+ private int _nominalEntries = (int) Math.pow(2,
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+ private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+
+ Parameters(String parametersString) {
+ StringUtils.deleteWhitespace(parametersString);
+ String[] keyValuePairs = StringUtils.split(parametersString,
PARAMETER_DELIMITER);
+ for (String keyValuePair : keyValuePairs) {
+ String[] keyAndValue = StringUtils.split(keyValuePair,
PARAMETER_KEY_VALUE_SEPARATOR);
+ Preconditions.checkArgument(keyAndValue.length == 2, "Invalid
parameter: %s", keyValuePair);
+ String key = keyAndValue[0];
+ String value = keyAndValue[1];
+ if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) {
+ _nominalEntries = Integer.parseInt(value);
+ } else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) {
+ _accumulatorThreshold = Integer.parseInt(value);
+ } else {
+ throw new IllegalArgumentException("Invalid parameter key: " + key);
+ }
+ }
+ }
+
+ int getNominalEntries() {
+ return _nominalEntries;
+ }
+
+ int getAccumulatorThreshold() {
+ return _accumulatorThreshold;
}
- Union<IntegerSummary> union = new Union<>(_entries, _setOps);
- integerSummarySketches.forEach(union::union);
- return Base64.getEncoder().encodeToString(union.getResult().toByteArray());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
index 33f746a1da..d37854b1b0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
@@ -19,12 +19,12 @@
package org.apache.pinot.core.query.aggregation.function;
import java.util.List;
-import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.TupleSketchIterator;
-import org.apache.datasketches.tuple.Union;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -46,14 +46,12 @@ public class SumValuesIntegerTupleSketchAggregationFunction
extends IntegerTuple
}
@Override
- public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>>
integerSummarySketches) {
- if (integerSummarySketches == null) {
- return null;
- }
- Union<IntegerSummary> union = new Union<>(_entries, _setOps);
- integerSummarySketches.forEach(union::union);
+ public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) {
double retainedTotal = 0L;
- CompactSketch<IntegerSummary> result = union.getResult();
+ accumulator.setNominalEntries(_nominalEntries);
+ accumulator.setSetOperations(_setOps);
+ accumulator.setThreshold(_accumulatorThreshold);
+ Sketch<IntegerSummary> result = accumulator.getResult();
TupleSketchIterator<IntegerSummary> summaries = result.iterator();
while (summaries.next()) {
retainedTotal += summaries.getSummary().getValue();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 01c39b0105..b397bd151c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -43,9 +43,13 @@ import org.apache.datasketches.theta.SetOperationBuilder;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Sketches;
import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
import
org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator;
import org.apache.pinot.segment.local.customobject.DoubleLongPair;
import org.apache.pinot.segment.local.customobject.FloatLongPair;
import org.apache.pinot.segment.local.customobject.IntLongPair;
@@ -54,6 +58,7 @@ import
org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
import org.testng.annotations.Test;
@@ -522,4 +527,53 @@ public class ObjectSerDeUtilsTest {
assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(),
ERROR_MESSAGE);
}
}
+
+ @Test
+ public void testTupleIntSketchAccumulator() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ int lgK = 4;
+ int size = RANDOM.nextInt(100) + 10;
+ IntegerSketch input = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+
+ for (int j = 0; j < size; j++) {
+ input.update(j, RANDOM.nextInt(100));
+ }
+
+ IntegerSummarySetOperations setOps =
+ new IntegerSummarySetOperations(IntegerSummary.Mode.Sum,
IntegerSummary.Mode.Sum);
+ TupleIntSketchAccumulator accumulator = new
TupleIntSketchAccumulator(setOps, (int) Math.pow(2, lgK), 2);
+ org.apache.datasketches.tuple.Sketch<IntegerSummary> sketch =
input.compact();
+ accumulator.apply(sketch);
+
+ byte[] bytes = ObjectSerDeUtils.serialize(accumulator);
+ TupleIntSketchAccumulator actual =
+ ObjectSerDeUtils.deserialize(bytes,
ObjectSerDeUtils.ObjectType.TupleIntSketchAccumulator);
+
+ assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(),
ERROR_MESSAGE);
+ assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(),
ERROR_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testCpcSketchAccumulator() {
+ int lgK = 4;
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ int size = RANDOM.nextInt(100) + 10;
+ CpcSketch sketch = new CpcSketch(lgK);
+
+ for (int j = 0; j < size; j++) {
+ sketch.update(j);
+ }
+
+ CpcSketchAccumulator accumulator = new CpcSketchAccumulator(lgK, 2);
+ accumulator.apply(sketch);
+
+ byte[] bytes = ObjectSerDeUtils.serialize(accumulator);
+ CpcSketchAccumulator actual =
+ ObjectSerDeUtils.deserialize(bytes,
ObjectSerDeUtils.ObjectType.CpcSketchAccumulator);
+
+ assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(),
ERROR_MESSAGE);
+ assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(),
ERROR_MESSAGE);
+ }
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulator.java
new file mode 100644
index 0000000000..7d24da87cd
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulator.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import javax.annotation.Nonnull;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
+
+
+/**
+ * Intermediate state used by {@code
DistinctCountCPCSketchAggregationFunction} which gives
+ * the end user more control over how sketches are merged for performance.
+ * The end user can set parameters that trade-off more memory usage for more
pre-aggregation.
+ */
+public class CpcSketchAccumulator extends CustomObjectAccumulator<CpcSketch> {
+ private int _lgNominalEntries = 4;
+ private CpcUnion _union;
+
+ public CpcSketchAccumulator() {
+ }
+
+ // Note: The accumulator is serialized as a sketch. This means that the
majority of the processing
+ // happens on serialization. Therefore, when deserialized, the values may be
null and will
+ // require re-initialisation. Since the primary use case is at query time
for the Broker
+ // and Server, these properties are already in memory and are re-set.
+ public CpcSketchAccumulator(int lgNominalEntries, int threshold) {
+ super(threshold);
+ _lgNominalEntries = lgNominalEntries;
+ }
+
+ public void setLgNominalEntries(int lgNominalEntries) {
+ _lgNominalEntries = lgNominalEntries;
+ }
+
+ @Nonnull
+ @Override
+ public CpcSketch getResult() {
+ return unionAll();
+ }
+
+ private CpcSketch unionAll() {
+ if (_union == null) {
+ _union = new CpcUnion(_lgNominalEntries);
+ }
+ // Return the default update "gadget" sketch as a compact sketch
+ if (isEmpty()) {
+ return _union.getResult();
+ }
+ // Corner-case: the parameters are not strictly respected when there is a
single sketch.
+ // This single sketch might have been the result of a previously
accumulated union and
+ // would already have the parameters set. The sketch is returned as-is
without adjusting
+ // nominal entries which requires an additional union operation.
+ if (getNumInputs() == 1) {
+ return _accumulator.get(0);
+ }
+
+ for (CpcSketch accumulatedSketch : _accumulator) {
+ _union.update(accumulatedSketch);
+ }
+
+ return _union.getResult();
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CustomObjectAccumulator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CustomObjectAccumulator.java
new file mode 100644
index 0000000000..3e90695bec
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CustomObjectAccumulator.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import javax.annotation.Nonnull;
+
+
+/**
+ * Intermediate state used by some aggregation functions which gives the end
user more control over how custom objects
+ * are merged for performance reasons. Some custom objects such as
DataSketches have better merge performance when
+ * more than two items are merged at once through the elimination of
intermediate bookkeeping overheads.
+ *
+ * The end user can set a value for the "threshold" parameter that defers the
merge operation until at least as many
+ * items are ready to be merged, or the callee forces the merge directly via
"getResult" - e.g. on serialization.
+ * This data structure trades-off more memory usage for a greater degree of
pre-aggregation in the accumulator state.
+ */
+public abstract class CustomObjectAccumulator<T> {
+ protected ArrayList<T> _accumulator;
+ private int _threshold;
+ private int _numInputs = 0;
+
+ public CustomObjectAccumulator() {
+ this(2);
+ }
+
+ public CustomObjectAccumulator(int threshold) {
+ setThreshold(threshold);
+ }
+
+ /**
+ * Sets the threshold that determines how much memory to use for the
internal accumulator state before
+ * the intermediate state is merged.
+ * @param threshold the threshold [> 0].
+ */
+ public void setThreshold(int threshold) {
+ Preconditions.checkArgument(threshold > 0, "Invalid threshold: %s, must be
positive", threshold);
+ _threshold = threshold;
+ }
+
+ /**
+ * Returns the configured threshold for this accumulator.
+ */
+ public int getThreshold() {
+ return _threshold;
+ }
+
+ /**
+ * Returns the number of inputs that have been added to the accumulator
state.
+ */
+ public int getNumInputs() {
+ return _numInputs;
+ }
+
+ /**
+ * Returns true if no inputs have been added to the accumulator state.
+ */
+ public boolean isEmpty() {
+ return _numInputs == 0;
+ }
+
+ @Nonnull
+ /**
+ * Forces the item T in internal state to be merged with all pending items
in the accumulator state
+ * and returns the result. This should not result in the accumulator state
being updated or cleared.
+ * @return T result of the merge.
+ */
+ public abstract T getResult();
+
+ /**
+ * Merges another accumulator with this one, by extracting the result from
"other".
+ * @param other the custom object accumulator to merge.
+ */
+ public void merge(CustomObjectAccumulator<T> other) {
+ if (other.isEmpty()) {
+ return;
+ }
+ T result = other.getResult();
+ applyInternal(result);
+ }
+
+ /**
+ * Adds a new item to the accumulator state. If the accumulator state is
equal to the threshold value,
+ * the internal state is updated and the accumulator state is cleared.
+ * @param item the item to add to the accumulator state, cannot be null.
+ */
+ public void apply(T item) {
+ Preconditions.checkNotNull(item);
+ applyInternal(item);
+ }
+
+ private void applyInternal(T item) {
+ if (_accumulator == null) {
+ _accumulator = new ArrayList<>(_threshold);
+ }
+ _accumulator.add(item);
+ _numInputs += 1;
+
+ if (_accumulator.size() >= _threshold) {
+ getResult();
+ _accumulator.clear();
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
index c9554ce9bf..5e2219f12d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.segment.local.customobject;
-import java.util.ArrayList;
import java.util.Comparator;
import javax.annotation.Nonnull;
import org.apache.datasketches.theta.SetOperationBuilder;
@@ -29,19 +28,15 @@ import org.apache.datasketches.theta.Union;
/**
* Intermediate state used by {@code
DistinctCountThetaSketchAggregationFunction} which gives
* the end user more control over how sketches are merged for performance.
- * The end user can set parameters that trade-off more memory usage for more
pre-aggregation.
- * This permits use of the Union "early-stop" optimisation where ordered
sketches require no further
+ * In particular, the Theta Sketch Union "early-stop" optimisation can be used
- ordered sketches require no further
* processing beyond the minimum Theta value.
* The union operation initialises an empty "gadget" bookkeeping sketch that
is updated with hashed entries
- * that fall below the minimum Theta value for all input sketches ("Broder
Rule"). When the initial
- * Theta value is set to the minimum immediately, further gains can be
realised.
+ * that fall below the minimum Theta value for all input sketches ("Broder
Rule"). When the initial Theta value is
+ * set to the minimum immediately, further gains can be realised.
*/
-public class ThetaSketchAccumulator {
- private ArrayList<Sketch> _accumulator;
+public class ThetaSketchAccumulator extends CustomObjectAccumulator<Sketch> {
private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
private Union _union;
- private int _threshold;
- private int _numInputs = 0;
public ThetaSketchAccumulator() {
}
@@ -51,54 +46,20 @@ public class ThetaSketchAccumulator {
// require re-initialisation. Since the primary use case is at query time
for the Broker
// and Server, these properties are already in memory and are re-set.
public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, int
threshold) {
+ super(threshold);
_setOperationBuilder = setOperationBuilder;
- _threshold = threshold;
}
public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
_setOperationBuilder = setOperationBuilder;
}
- public void setThreshold(int threshold) {
- _threshold = threshold;
- }
-
- public boolean isEmpty() {
- return _numInputs == 0;
- }
-
@Nonnull
+ @Override
public Sketch getResult() {
return unionAll();
}
- public void apply(Sketch sketch) {
- internalAdd(sketch);
- }
-
- public void merge(ThetaSketchAccumulator thetaUnion) {
- if (thetaUnion.isEmpty()) {
- return;
- }
- Sketch sketch = thetaUnion.getResult();
- internalAdd(sketch);
- }
-
- private void internalAdd(Sketch sketch) {
- if (sketch.isEmpty()) {
- return;
- }
- if (_accumulator == null) {
- _accumulator = new ArrayList<>(_threshold);
- }
- _accumulator.add(sketch);
- _numInputs += 1;
-
- if (_accumulator.size() >= _threshold) {
- unionAll();
- }
- }
-
private Sketch unionAll() {
if (_union == null) {
_union = _setOperationBuilder.buildUnion();
@@ -111,7 +72,7 @@ public class ThetaSketchAccumulator {
// This single sketch might have been the result of a previously
accumulated union and
// would already have the parameters set. The sketch is returned as-is
without adjusting
// nominal entries which requires an additional union operation.
- if (_numInputs == 1) {
+ if (getNumInputs() == 1) {
return _accumulator.get(0);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulator.java
similarity index 58%
copy from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
copy to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulator.java
index c9554ce9bf..5a24324913 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulator.java
@@ -18,100 +18,69 @@
*/
package org.apache.pinot.segment.local.customobject;
-import java.util.ArrayList;
import java.util.Comparator;
import javax.annotation.Nonnull;
-import org.apache.datasketches.theta.SetOperationBuilder;
-import org.apache.datasketches.theta.Sketch;
-import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
/**
- * Intermediate state used by {@code
DistinctCountThetaSketchAggregationFunction} which gives
+ * Intermediate state used by {@code IntegerTupleSketchAggregationFunction}
which gives
* the end user more control over how sketches are merged for performance.
- * The end user can set parameters that trade-off more memory usage for more
pre-aggregation.
- * This permits use of the Union "early-stop" optimisation where ordered
sketches require no further
- * processing beyond the minimum Theta value.
+ * In particular, the Theta Sketch Union "early-stop" optimisation can be used
- ordered sketches require no further
+ * processing beyond the minimum Theta value. This applies to Tuple sketches
because they are an extension of the
+ * Theta sketch.
* The union operation initialises an empty "gadget" bookkeeping sketch that
is updated with hashed entries
- * that fall below the minimum Theta value for all input sketches ("Broder
Rule"). When the initial
- * Theta value is set to the minimum immediately, further gains can be
realised.
+ * that fall below the minimum Theta value for all input sketches ("Broder
Rule"). When the initial Theta value is
+ * set to the minimum immediately, further gains can be realised.
*/
-public class ThetaSketchAccumulator {
- private ArrayList<Sketch> _accumulator;
- private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
- private Union _union;
- private int _threshold;
- private int _numInputs = 0;
+public class TupleIntSketchAccumulator extends
CustomObjectAccumulator<Sketch<IntegerSummary>> {
+ private IntegerSummarySetOperations _setOperations;
+ private int _nominalEntries;
+ private Union<IntegerSummary> _union;
- public ThetaSketchAccumulator() {
+ public TupleIntSketchAccumulator() {
}
// Note: The accumulator is serialized as a sketch. This means that the
majority of the processing
// happens on serialization. Therefore, when deserialized, the values may be
null and will
// require re-initialisation. Since the primary use case is at query time
for the Broker
// and Server, these properties are already in memory and are re-set.
- public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, int
threshold) {
- _setOperationBuilder = setOperationBuilder;
- _threshold = threshold;
+ public TupleIntSketchAccumulator(IntegerSummarySetOperations setOperations,
int nominalEntries, int threshold) {
+ super(threshold);
+ _nominalEntries = nominalEntries;
+ _setOperations = setOperations;
}
- public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
- _setOperationBuilder = setOperationBuilder;
+ public void setSetOperations(IntegerSummarySetOperations setOperations) {
+ _setOperations = setOperations;
}
- public void setThreshold(int threshold) {
- _threshold = threshold;
- }
-
- public boolean isEmpty() {
- return _numInputs == 0;
+ public void setNominalEntries(int nominalEntries) {
+ _nominalEntries = nominalEntries;
}
@Nonnull
- public Sketch getResult() {
+ @Override
+ public Sketch<IntegerSummary> getResult() {
return unionAll();
}
- public void apply(Sketch sketch) {
- internalAdd(sketch);
- }
-
- public void merge(ThetaSketchAccumulator thetaUnion) {
- if (thetaUnion.isEmpty()) {
- return;
- }
- Sketch sketch = thetaUnion.getResult();
- internalAdd(sketch);
- }
-
- private void internalAdd(Sketch sketch) {
- if (sketch.isEmpty()) {
- return;
- }
- if (_accumulator == null) {
- _accumulator = new ArrayList<>(_threshold);
- }
- _accumulator.add(sketch);
- _numInputs += 1;
-
- if (_accumulator.size() >= _threshold) {
- unionAll();
- }
- }
-
- private Sketch unionAll() {
+ private Sketch<IntegerSummary> unionAll() {
if (_union == null) {
- _union = _setOperationBuilder.buildUnion();
+ _union = new Union<>(_nominalEntries, _setOperations);
}
// Return the default update "gadget" sketch as a compact sketch
if (isEmpty()) {
- return _union.getResult(false, null);
+ return _union.getResult();
}
// Corner-case: the parameters are not strictly respected when there is a
single sketch.
// This single sketch might have been the result of a previously
accumulated union and
// would already have the parameters set. The sketch is returned as-is
without adjusting
// nominal entries which requires an additional union operation.
- if (_numInputs == 1) {
+ if (getNumInputs() == 1) {
return _accumulator.get(0);
}
@@ -125,11 +94,11 @@ public class ThetaSketchAccumulator {
// which results in fewer redundant entries being retained and
subsequently discarded during the
// union operation.
_accumulator.sort(Comparator.comparingDouble(Sketch::getTheta));
- for (Sketch accumulatedSketch : _accumulator) {
+ for (Sketch<IntegerSummary> accumulatedSketch : _accumulator) {
_union.union(accumulatedSketch);
}
_accumulator.clear();
- return _union.getResult(false, null);
+ return _union.getResult();
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulatorTest.java
new file mode 100644
index 0000000000..a86144ed03
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulatorTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.stream.IntStream;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class CpcSketchAccumulatorTest {
+ private final int _lgNominalEntries = 20;
+ private final double _epsilon = 0.5;
+
+ @Test
+ public void testEmptyAccumulator() {
+ CpcSketchAccumulator accumulator = new
CpcSketchAccumulator(_lgNominalEntries, 2);
+ Assert.assertTrue(accumulator.isEmpty());
+ Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+ }
+
+ @Test
+ public void testAccumulatorWithSingleSketch() {
+ CpcSketch sketch = new CpcSketch(_lgNominalEntries);
+ IntStream.range(0, 1000).forEach(sketch::update);
+
+ CpcSketchAccumulator accumulator = new
CpcSketchAccumulator(_lgNominalEntries, 2);
+ accumulator.apply(sketch);
+
+ Assert.assertFalse(accumulator.isEmpty());
+ Assert.assertEquals(accumulator.getResult().getEstimate(),
sketch.getEstimate());
+ }
+
+ @Test
+ public void testAccumulatorMerge() {
+ CpcSketch sketch1 = new CpcSketch(_lgNominalEntries);
+ IntStream.range(0, 1000).forEach(sketch1::update);
+ CpcSketch sketch2 = new CpcSketch(_lgNominalEntries);
+ IntStream.range(1000, 2000).forEach(sketch2::update);
+
+ CpcSketchAccumulator accumulator1 = new
CpcSketchAccumulator(_lgNominalEntries, 3);
+ accumulator1.apply(sketch1);
+ CpcSketchAccumulator accumulator2 = new
CpcSketchAccumulator(_lgNominalEntries, 3);
+ accumulator2.apply(sketch2);
+ accumulator1.merge(accumulator2);
+
+ Assert.assertEquals(accumulator1.getResult().getEstimate(),
sketch1.getEstimate() + sketch2.getEstimate(),
+ _epsilon);
+ }
+
+ @Test
+ public void testThresholdBehavior() {
+ CpcSketch sketch1 = new CpcSketch(_lgNominalEntries);
+ IntStream.range(0, 1000).forEach(sketch1::update);
+ CpcSketch sketch2 = new CpcSketch(_lgNominalEntries);
+ IntStream.range(1000, 2000).forEach(sketch2::update);
+
+ CpcSketchAccumulator accumulator = new
CpcSketchAccumulator(_lgNominalEntries, 3);
+ accumulator.apply(sketch1);
+ accumulator.apply(sketch2);
+
+ Assert.assertEquals(accumulator.getResult().getEstimate(),
sketch1.getEstimate() + sketch2.getEstimate(), _epsilon);
+ }
+
+ @Test
+ public void testUnionWithEmptyInput() {
+ CpcSketchAccumulator accumulator = new
CpcSketchAccumulator(_lgNominalEntries, 3);
+ CpcSketchAccumulator emptyAccumulator = new
CpcSketchAccumulator(_lgNominalEntries, 3);
+
+ accumulator.merge(emptyAccumulator);
+
+ Assert.assertTrue(accumulator.isEmpty());
+ Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulatorTest.java
new file mode 100644
index 0000000000..7755416211
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulatorTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.stream.IntStream;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TupleIntSketchAccumulatorTest {
+ private IntegerSummarySetOperations _setOps;
+ private final int _lgK = 12;
+ private final int _nominalEntries = (int) Math.pow(2, _lgK);
+
+ @BeforeMethod
+ public void setUp() {
+ _setOps = new IntegerSummarySetOperations(IntegerSummary.Mode.Sum,
IntegerSummary.Mode.Sum);
+ }
+
+ @Test
+ public void testEmptyAccumulator() {
+ TupleIntSketchAccumulator accumulator = new
TupleIntSketchAccumulator(_setOps, _nominalEntries, 2);
+ Assert.assertTrue(accumulator.isEmpty());
+ Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+ }
+
+ @Test
+ public void testAccumulatorWithSingleSketch() {
+ IntegerSketch input = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum);
+ IntStream.range(0, 1000).forEach(i -> input.update(i, 1));
+ CompactSketch<IntegerSummary> sketch = input.compact();
+
+ TupleIntSketchAccumulator accumulator = new
TupleIntSketchAccumulator(_setOps, _nominalEntries, 2);
+ accumulator.apply(sketch);
+
+ Assert.assertFalse(accumulator.isEmpty());
+ Assert.assertEquals(accumulator.getResult().getEstimate(),
sketch.getEstimate());
+ }
+
+ @Test
+ public void testAccumulatorMerge() {
+ IntegerSketch input1 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum);
+ IntStream.range(0, 1000).forEach(i -> input1.update(i, 1));
+ CompactSketch<IntegerSummary> sketch1 = input1.compact();
+ IntegerSketch input2 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum);
+ IntStream.range(1000, 2000).forEach(i -> input2.update(i, 1));
+ CompactSketch<IntegerSummary> sketch2 = input2.compact();
+
+ TupleIntSketchAccumulator accumulator1 = new
TupleIntSketchAccumulator(_setOps, _nominalEntries, 3);
+ accumulator1.apply(sketch1);
+ TupleIntSketchAccumulator accumulator2 = new
TupleIntSketchAccumulator(_setOps, _nominalEntries, 3);
+ accumulator2.apply(sketch2);
+ accumulator1.merge(accumulator2);
+
+ Assert.assertEquals(accumulator1.getResult().getEstimate(),
sketch1.getEstimate() + sketch2.getEstimate());
+ }
+
+ @Test
+ public void testThresholdBehavior() {
+ IntegerSketch input1 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum);
+ IntStream.range(0, 1000).forEach(i -> input1.update(i, 1));
+ CompactSketch<IntegerSummary> sketch1 = input1.compact();
+ IntegerSketch input2 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum);
+ IntStream.range(1000, 2000).forEach(i -> input2.update(i, 1));
+ CompactSketch<IntegerSummary> sketch2 = input2.compact();
+
+ TupleIntSketchAccumulator accumulator = new
TupleIntSketchAccumulator(_setOps, _nominalEntries, 3);
+ accumulator.apply(sketch1);
+ accumulator.apply(sketch2);
+
+ Assert.assertEquals(accumulator.getResult().getEstimate(),
sketch1.getEstimate() + sketch2.getEstimate());
+ }
+
+ @Test
+ public void testUnionWithEmptyInput() {
+ TupleIntSketchAccumulator accumulator = new
TupleIntSketchAccumulator(_setOps, _nominalEntries, 3);
+ TupleIntSketchAccumulator emptyAccumulator = new
TupleIntSketchAccumulator(_setOps, _nominalEntries, 3);
+
+ accumulator.merge(emptyAccumulator);
+
+ Assert.assertTrue(accumulator.isEmpty());
+ Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]