This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 555e5769ed0 Add distinctCountSmartULL aggregation (set→ULL smart
promotion) (#16605)
555e5769ed0 is described below
commit 555e5769ed0054634de7f5ebe07ea6bb04457dca
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Aug 16 09:51:58 2025 -0700
Add distinctCountSmartULL aggregation (set→ULL smart promotion) (#16605)
* core: add DistinctCountSmartULL aggregation
- Implement DistinctCountSmartULLAggregationFunction (set→ULL promotion
with threshold+p)
- Wire DISTINCTCOUNTSMARTULL in AggregationFunctionType and
AggregationFunctionFactory
- Non-scan operator + planner support (dictionary-based paths)
- Tests: query coverage and enum recognition
Also improves ULL handling for RAWULL/HLL paths in non-scan operator.
* core: introduce BaseDistinctCountSmartSketchAggregationFunction for
shared raw-value handling; refactor DistinctCountSmartHLL/ULL to extend it;
clean up duplicates and unused imports
---
.../function/AggregationFunctionTypeTest.java | 2 +
.../query/NonScanBasedAggregationOperator.java | 63 +-
.../pinot/core/plan/AggregationPlanNode.java | 3 +-
.../function/AggregationFunctionFactory.java | 2 +
...stinctCountSmartSketchAggregationFunction.java} | 628 +++++---------------
.../DistinctCountSmartHLLAggregationFunction.java | 643 +--------------------
.../DistinctCountSmartULLAggregationFunction.java | 496 ++++++++++++++++
.../DistinctCountULLAggregationFunction.java | 10 +-
.../pinot/queries/DistinctCountQueriesTest.java | 44 ++
.../tests/MultiStageEngineIntegrationTest.java | 34 +-
.../pinot/segment/spi/AggregationFunctionType.java | 2 +
11 files changed, 798 insertions(+), 1129 deletions(-)
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
index dc631eb346c..c069c5d152e 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
@@ -87,6 +87,8 @@ public class AggregationFunctionTypeTest {
AggregationFunctionType.DISTINCTCOUNTULL);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnTrAwUll"),
AggregationFunctionType.DISTINCTCOUNTRAWULL);
+
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnTsMaRtUlL"),
+ AggregationFunctionType.DISTINCTCOUNTSMARTULL);
}
@Test(expectedExceptions = IllegalArgumentException.class)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
index 04f388cd464..42394a19283 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator.query;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.dynatrace.hash4j.distinctcount.UltraLogLog;
import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@@ -42,8 +43,11 @@ import
org.apache.pinot.core.query.aggregation.function.DistinctCountOffHeapAggr
import
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLPlusAggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.DistinctCountSmartULLAggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.DistinctCountULLAggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
+import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec;
@@ -54,10 +58,11 @@ import org.apache.pinot.spi.utils.ByteArray;
* Aggregation operator that utilizes dictionary or column metadata for
serving aggregation queries to avoid scanning.
* The scanless operator is selected in the plan maker, if the query is of
aggregation type min, max, minmaxrange,
* distinctcount, distinctcounthll, distinctcountrawhll,
segmentpartitioneddistinctcount, distinctcountsmarthll,
- * distinctcounthllplus, distinctcountrawhllplus, and the column has a
dictionary, or has column metadata with min and
- * max value defined. It also supports count(*) if the query has no filter.
- * We don't use this operator if the segment has star tree,
- * as the dictionary will have aggregated values for the metrics, and
dimensions will have star node value.
+ * distinctcounthllplus, distinctcountrawhllplus, distinctcountUll,
distinctcountsmartUll and the column has a
+ * dictionary, or has column metadata with min and max value defined. It also
supports count(*) if the query has
+ * no filter.
+ * We don't use this operator if the segment has star tree, as the dictionary
will have aggregated values for the
+ * metrics, and dimensions will have star node value.
*
* For min value, we use the first value from the dictionary, falling back to
the column metadata min value if there
* is no dictionary.
@@ -144,6 +149,18 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
result =
getDistinctCountSmartHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
(DistinctCountSmartHLLAggregationFunction) aggregationFunction);
break;
+ case DISTINCTCOUNTULL:
+ result =
getDistinctCountULLResult(Objects.requireNonNull(dataSource.getDictionary()),
+ (DistinctCountULLAggregationFunction) aggregationFunction);
+ break;
+ case DISTINCTCOUNTSMARTULL:
+ result =
getDistinctCountSmartULLResult(Objects.requireNonNull(dataSource.getDictionary()),
+ (DistinctCountSmartULLAggregationFunction) aggregationFunction);
+ break;
+ case DISTINCTCOUNTRAWULL:
+ result =
getDistinctCountULLResult(Objects.requireNonNull(dataSource.getDictionary()),
+ (DistinctCountULLAggregationFunction) aggregationFunction);
+ break;
default:
throw new IllegalStateException(
"Non-scan based aggregation operator does not support function
type: " + aggregationFunction.getType());
@@ -234,6 +251,16 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
return hll;
}
+ private static UltraLogLog getDistinctValueULL(Dictionary dictionary, int p)
{
+ UltraLogLog ull = UltraLogLog.create(p);
+ int length = dictionary.length();
+ for (int i = 0; i < length; i++) {
+ Object value = dictionary.get(i);
+ UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+ }
+ return ull;
+ }
+
private static HyperLogLogPlus getDistinctValueHLLPlus(Dictionary
dictionary, int p, int sp) {
HyperLogLogPlus hllPlus = new HyperLogLogPlus(p, sp);
int length = dictionary.length();
@@ -291,6 +318,34 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
}
}
+ private static UltraLogLog getDistinctCountULLResult(Dictionary dictionary,
+ DistinctCountULLAggregationFunction function) {
+ if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+ // Treat BYTES value as serialized UltraLogLog and merge
+ try {
+ UltraLogLog ull =
ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize(dictionary.getBytesValue(0));
+ int length = dictionary.length();
+ for (int i = 1; i < length; i++) {
+
ull.add(ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize(dictionary.getBytesValue(i)));
+ }
+ return ull;
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging
UltraLogLogs", e);
+ }
+ } else {
+ return getDistinctValueULL(dictionary, function.getP());
+ }
+ }
+
+ private static Object getDistinctCountSmartULLResult(Dictionary dictionary,
+ DistinctCountSmartULLAggregationFunction function) {
+ if (dictionary.length() > function.getThreshold()) {
+ return getDistinctValueULL(dictionary, function.getP());
+ } else {
+ return getDistinctValueSet(dictionary);
+ }
+ }
+
@Override
public String toExplainString() {
return EXPLAIN_NAME;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 7db0aefd296..f60de0466cc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -52,7 +52,8 @@ public class AggregationPlanNode implements PlanNode {
EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV,
DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTSUM,
DISTINCTSUMMV, DISTINCTAVG, DISTINCTAVGMV, DISTINCTCOUNTOFFHEAP,
DISTINCTCOUNTHLL, DISTINCTCOUNTHLLMV,
DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, DISTINCTCOUNTHLLPLUS,
DISTINCTCOUNTHLLPLUSMV,
- DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTRAWHLLPLUSMV,
SEGMENTPARTITIONEDDISTINCTCOUNT, DISTINCTCOUNTSMARTHLL);
+ DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTRAWHLLPLUSMV,
DISTINCTCOUNTULL, DISTINCTCOUNTRAWULL,
+ SEGMENTPARTITIONEDDISTINCTCOUNT, DISTINCTCOUNTSMARTHLL,
DISTINCTCOUNTSMARTULL);
// DISTINCTCOUNT excluded because consuming segment metadata contains
unknown cardinality when there is no dictionary
private static final EnumSet<AggregationFunctionType>
METADATA_BASED_FUNCTIONS =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index d05316b5480..092fe60c7f3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -376,6 +376,8 @@ public class AggregationFunctionFactory {
return new DistinctCountRawHLLAggregationFunction(arguments);
case DISTINCTCOUNTSMARTHLL:
return new DistinctCountSmartHLLAggregationFunction(arguments);
+ case DISTINCTCOUNTSMARTULL:
+ return new DistinctCountSmartULLAggregationFunction(arguments);
case FASTHLL:
return new FastHLLAggregationFunction(arguments);
case DISTINCTCOUNTTHETASKETCH:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctCountSmartSketchAggregationFunction.java
similarity index 52%
copy from
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctCountSmartSketchAggregationFunction.java
index e93b48def80..0f12378b929 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctCountSmartSketchAggregationFunction.java
@@ -18,30 +18,22 @@
*/
package org.apache.pinot.core.query.aggregation.function;
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.HyperLogLog;
-import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
-import it.unimi.dsi.fastutil.objects.ObjectSet;
-import java.util.List;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ByteArray;
@@ -50,239 +42,102 @@ import org.roaringbitmap.RoaringBitmap;
/**
- * The {@code DistinctCountSmartHLLAggregationFunction} calculates the number
of distinct values for a given expression
- * (both single-valued and multi-valued are supported).
+ * Base class that encapsulates the common raw-value handling for Smart
distinct-count functions
+ * that switch from exact Set aggregation to a sketch when a threshold is
exceeded.
*
- * For aggregation-only queries, the distinct values are stored in a Set
initially. Once the number of distinct values
- * exceeds a threshold, the Set will be converted into a HyperLogLog, and
approximate result will be returned.
- *
- * The function takes an optional second argument for parameters:
- * - threshold: Threshold of the number of distinct values to trigger the
conversion, 100_000 by default. Non-positive
- * value means never convert.
- * - log2m: Log2m for the converted HyperLogLog, 12 by default.
- * Example of second argument: 'threshold=10;log2m=8'
+ * Subclasses provide the sketch-specific behavior (conversion and dictionary
handling).
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class DistinctCountSmartHLLAggregationFunction extends
BaseSingleInputAggregationFunction<Object, Integer> {
+abstract class BaseDistinctCountSmartSketchAggregationFunction
+ extends BaseSingleInputAggregationFunction<Object, Integer> {
// Use empty IntOpenHashSet as a placeholder for empty result
- private static final IntSet EMPTY_PLACEHOLDER = new IntOpenHashSet();
-
- private final int _threshold;
- private final int _log2m;
+ protected static final IntSet EMPTY_PLACEHOLDER = new IntOpenHashSet();
- public DistinctCountSmartHLLAggregationFunction(List<ExpressionContext>
arguments) {
- super(arguments.get(0));
-
- if (arguments.size() > 1) {
- Parameters parameters = new
Parameters(arguments.get(1).getLiteral().getStringValue());
- _threshold = parameters._threshold;
- _log2m = parameters._log2m;
- } else {
- _threshold = Parameters.DEFAULT_THRESHOLD;
- _log2m = Parameters.DEFAULT_LOG2M;
- }
+ protected BaseDistinctCountSmartSketchAggregationFunction(ExpressionContext
expression) {
+ super(expression);
}
- public int getThreshold() {
- return _threshold;
- }
+ protected abstract int getThreshold();
- public int getLog2m() {
- return _log2m;
- }
+ protected abstract Object convertSetToSketch(Set valueSet, DataType
storedType);
- @Override
- public AggregationFunctionType getType() {
- return AggregationFunctionType.DISTINCTCOUNTSMARTHLL;
- }
+ protected abstract Object convertToSketch(DictIdsWrapper dictIdsWrapper);
+
+ protected abstract IllegalStateException
getIllegalDataTypeException(DataType dataType, boolean singleValue);
@Override
- public AggregationResultHolder createAggregationResultHolder() {
+ public final AggregationResultHolder createAggregationResultHolder() {
return new ObjectAggregationResultHolder();
}
@Override
- public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ public final GroupByResultHolder createGroupByResultHolder(int
initialCapacity, int maxCapacity) {
return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
}
- @Override
- public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- BlockValSet blockValSet = blockValSetMap.get(_expression);
-
- // For dictionary-encoded expression, store dictionary ids into the bitmap
- Dictionary dictionary = blockValSet.getDictionary();
- if (dictionary != null) {
- RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder,
dictionary);
- if (blockValSet.isSingleValue()) {
- int[] dictIds = blockValSet.getDictionaryIdsSV();
- dictIdBitmap.addN(dictIds, 0, length);
- } else {
- int[][] dictIds = blockValSet.getDictionaryIdsMV();
- for (int i = 0; i < length; i++) {
- dictIdBitmap.add(dictIds[i]);
- }
- }
- return;
- }
-
- // For non-dictionary-encoded expression, store values into the value set
or HLL
- if (aggregationResultHolder.getResult() instanceof HyperLogLog) {
- aggregateIntoHLL(length, aggregationResultHolder, blockValSet);
- } else {
- aggregateIntoSet(length, aggregationResultHolder, blockValSet);
- }
- }
-
- private void aggregateIntoHLL(int length, AggregationResultHolder
aggregationResultHolder, BlockValSet blockValSet) {
- DataType valueType = blockValSet.getValueType();
- DataType storedType = valueType.getStoredType();
- HyperLogLog hll = aggregationResultHolder.getResult();
- if (blockValSet.isSingleValue()) {
- switch (storedType) {
- case INT:
- int[] intValues = blockValSet.getIntValuesSV();
- for (int i = 0; i < length; i++) {
- hll.offer(intValues[i]);
- }
- break;
- case LONG:
- long[] longValues = blockValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- hll.offer(longValues[i]);
- }
- break;
- case FLOAT:
- float[] floatValues = blockValSet.getFloatValuesSV();
- for (int i = 0; i < length; i++) {
- hll.offer(floatValues[i]);
- }
- break;
- case DOUBLE:
- double[] doubleValues = blockValSet.getDoubleValuesSV();
- for (int i = 0; i < length; i++) {
- hll.offer(doubleValues[i]);
- }
- break;
- case STRING:
- String[] stringValues = blockValSet.getStringValuesSV();
- for (int i = 0; i < length; i++) {
- hll.offer(stringValues[i]);
- }
- break;
- case BYTES:
- byte[][] bytesValues = blockValSet.getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- hll.offer(bytesValues[i]);
- }
- break;
- default:
- throw getIllegalDataTypeException(valueType, true);
- }
- } else {
- switch (storedType) {
- case INT:
- int[][] intValues = blockValSet.getIntValuesMV();
- for (int i = 0; i < length; i++) {
- for (int value : intValues[i]) {
- hll.offer(value);
- }
- }
- break;
- case LONG:
- long[][] longValues = blockValSet.getLongValuesMV();
- for (int i = 0; i < length; i++) {
- for (long value : longValues[i]) {
- hll.offer(value);
- }
- }
- break;
- case FLOAT:
- float[][] floatValues = blockValSet.getFloatValuesMV();
- for (int i = 0; i < length; i++) {
- for (float value : floatValues[i]) {
- hll.offer(value);
- }
- }
- break;
- case DOUBLE:
- double[][] doubleValues = blockValSet.getDoubleValuesMV();
- for (int i = 0; i < length; i++) {
- for (double value : doubleValues[i]) {
- hll.offer(value);
- }
- }
- break;
- case STRING:
- String[][] stringValues = blockValSet.getStringValuesMV();
- for (int i = 0; i < length; i++) {
- for (String value : stringValues[i]) {
- hll.offer(value);
- }
- }
- break;
- default:
- throw getIllegalDataTypeException(valueType, false);
- }
- }
- }
-
- private void aggregateIntoSet(int length, AggregationResultHolder
aggregationResultHolder, BlockValSet blockValSet) {
+ /**
+ * Common aggregation for non-dictionary-encoded expressions into a
per-segment Set.
+ * Subclasses can call this from their aggregate(...) when the current
result is not a sketch.
+ */
+ protected final void aggregateIntoSet(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet) {
DataType valueType = blockValSet.getValueType();
DataType storedType = valueType.getStoredType();
Set valueSet = getValueSet(aggregationResultHolder, storedType);
if (blockValSet.isSingleValue()) {
switch (storedType) {
- case INT:
+ case INT: {
IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
int[] intValues = blockValSet.getIntValuesSV();
for (int i = 0; i < length; i++) {
intSet.add(intValues[i]);
}
break;
- case LONG:
+ }
+ case LONG: {
LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
long[] longValues = blockValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
longSet.add(longValues[i]);
}
break;
- case FLOAT:
+ }
+ case FLOAT: {
FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
float[] floatValues = blockValSet.getFloatValuesSV();
for (int i = 0; i < length; i++) {
floatSet.add(floatValues[i]);
}
break;
- case DOUBLE:
+ }
+ case DOUBLE: {
DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
double[] doubleValues = blockValSet.getDoubleValuesSV();
for (int i = 0; i < length; i++) {
doubleSet.add(doubleValues[i]);
}
break;
- case STRING:
+ }
+ case STRING: {
ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>)
valueSet;
String[] stringValues = blockValSet.getStringValuesSV();
- //noinspection ManualArrayToCollectionCopy
- for (int i = 0; i < length; i++) {
- stringSet.add(stringValues[i]);
- }
+ stringSet.addAll(Arrays.asList(stringValues).subList(0, length));
break;
- case BYTES:
+ }
+ case BYTES: {
ObjectOpenHashSet<ByteArray> bytesSet =
(ObjectOpenHashSet<ByteArray>) valueSet;
byte[][] bytesValues = blockValSet.getBytesValuesSV();
for (int i = 0; i < length; i++) {
bytesSet.add(new ByteArray(bytesValues[i]));
}
break;
+ }
default:
throw getIllegalDataTypeException(valueType, true);
}
} else {
switch (storedType) {
- case INT:
+ case INT: {
IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
int[][] intValues = blockValSet.getIntValuesMV();
for (int i = 0; i < length; i++) {
@@ -291,7 +146,8 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case LONG:
+ }
+ case LONG: {
LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
long[][] longValues = blockValSet.getLongValuesMV();
for (int i = 0; i < length; i++) {
@@ -300,7 +156,8 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case FLOAT:
+ }
+ case FLOAT: {
FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
float[][] floatValues = blockValSet.getFloatValuesMV();
for (int i = 0; i < length; i++) {
@@ -309,7 +166,8 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case DOUBLE:
+ }
+ case DOUBLE: {
DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
double[][] doubleValues = blockValSet.getDoubleValuesMV();
for (int i = 0; i < length; i++) {
@@ -318,58 +176,30 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case STRING:
+ }
+ case STRING: {
ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>)
valueSet;
String[][] stringValues = blockValSet.getStringValuesMV();
for (int i = 0; i < length; i++) {
- //noinspection ManualArrayToCollectionCopy
- for (String value : stringValues[i]) {
- //noinspection UseBulkOperation
- stringSet.add(value);
- }
+ Collections.addAll(stringSet, stringValues[i]);
}
break;
+ }
default:
throw getIllegalDataTypeException(valueType, false);
}
}
- // Convert to HLL if the set size exceeds the threshold
- if (valueSet.size() > _threshold) {
- aggregationResultHolder.setValue(convertSetToHLL(valueSet, storedType));
- }
- }
-
- protected HyperLogLog convertSetToHLL(Set valueSet, DataType storedType) {
- if (storedType == DataType.BYTES) {
- return convertByteArraySetToHLL((ObjectSet<ByteArray>) valueSet);
- } else {
- return convertNonByteArraySetToHLL(valueSet);
+ if (valueSet.size() > getThreshold()) {
+ aggregationResultHolder.setValue(convertSetToSketch(valueSet,
storedType));
}
}
- protected HyperLogLog convertByteArraySetToHLL(ObjectSet<ByteArray>
valueSet) {
- HyperLogLog hll = new HyperLogLog(_log2m);
- for (ByteArray value : valueSet) {
- hll.offer(value.getBytes());
- }
- return hll;
- }
-
- protected HyperLogLog convertNonByteArraySetToHLL(Set valueSet) {
- HyperLogLog hll = new HyperLogLog(_log2m);
- for (Object value : valueSet) {
- hll.offer(value);
- }
- return hll;
- }
-
@Override
- public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ public final void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- // For dictionary-encoded expression, store dictionary ids into the bitmap
Dictionary dictionary = blockValSet.getDictionary();
if (dictionary != null) {
if (blockValSet.isSingleValue()) {
@@ -386,56 +216,61 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
return;
}
- // For non-dictionary-encoded expression, store values into the value set
DataType valueType = blockValSet.getValueType();
DataType storedType = valueType.getStoredType();
if (blockValSet.isSingleValue()) {
switch (storedType) {
- case INT:
+ case INT: {
int[] intValues = blockValSet.getIntValuesSV();
for (int i = 0; i < length; i++) {
((IntOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.INT)).add(intValues[i]);
}
break;
- case LONG:
+ }
+ case LONG: {
long[] longValues = blockValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
((LongOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.LONG)).add(longValues[i]);
}
break;
- case FLOAT:
+ }
+ case FLOAT: {
float[] floatValues = blockValSet.getFloatValuesSV();
for (int i = 0; i < length; i++) {
((FloatOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.FLOAT)).add(floatValues[i]);
}
break;
- case DOUBLE:
+ }
+ case DOUBLE: {
double[] doubleValues = blockValSet.getDoubleValuesSV();
for (int i = 0; i < length; i++) {
((DoubleOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.DOUBLE)).add(
doubleValues[i]);
}
break;
- case STRING:
+ }
+ case STRING: {
String[] stringValues = blockValSet.getStringValuesSV();
for (int i = 0; i < length; i++) {
((ObjectOpenHashSet<String>) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.STRING)).add(
stringValues[i]);
}
break;
- case BYTES:
+ }
+ case BYTES: {
byte[][] bytesValues = blockValSet.getBytesValuesSV();
for (int i = 0; i < length; i++) {
((ObjectOpenHashSet<ByteArray>) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.BYTES)).add(
new ByteArray(bytesValues[i]));
}
break;
+ }
default:
throw getIllegalDataTypeException(valueType, true);
}
} else {
switch (storedType) {
- case INT:
+ case INT: {
int[][] intValues = blockValSet.getIntValuesMV();
for (int i = 0; i < length; i++) {
IntOpenHashSet intSet = (IntOpenHashSet)
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT);
@@ -444,7 +279,8 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case LONG:
+ }
+ case LONG: {
long[][] longValues = blockValSet.getLongValuesMV();
for (int i = 0; i < length; i++) {
LongOpenHashSet longSet =
@@ -454,7 +290,8 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case FLOAT:
+ }
+ case FLOAT: {
float[][] floatValues = blockValSet.getFloatValuesMV();
for (int i = 0; i < length; i++) {
FloatOpenHashSet floatSet =
@@ -464,7 +301,8 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case DOUBLE:
+ }
+ case DOUBLE: {
double[][] doubleValues = blockValSet.getDoubleValuesMV();
for (int i = 0; i < length; i++) {
DoubleOpenHashSet doubleSet =
@@ -474,18 +312,16 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case STRING:
+ }
+ case STRING: {
String[][] stringValues = blockValSet.getStringValuesMV();
for (int i = 0; i < length; i++) {
ObjectOpenHashSet<String> stringSet =
(ObjectOpenHashSet<String>) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.STRING);
- //noinspection ManualArrayToCollectionCopy
- for (String value : stringValues[i]) {
- //noinspection UseBulkOperation
- stringSet.add(value);
- }
+ Collections.addAll(stringSet, stringValues[i]);
}
break;
+ }
default:
throw getIllegalDataTypeException(valueType, false);
}
@@ -493,11 +329,10 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
@Override
- public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ public final void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- // For dictionary-encoded expression, store dictionary ids into the bitmap
Dictionary dictionary = blockValSet.getDictionary();
if (dictionary != null) {
if (blockValSet.isSingleValue()) {
@@ -516,53 +351,58 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
return;
}
- // For non-dictionary-encoded expression, store values into the value set
DataType valueType = blockValSet.getValueType();
DataType storedType = valueType.getStoredType();
if (blockValSet.isSingleValue()) {
switch (storedType) {
- case INT:
+ case INT: {
int[] intValues = blockValSet.getIntValuesSV();
for (int i = 0; i < length; i++) {
setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
intValues[i]);
}
break;
- case LONG:
+ }
+ case LONG: {
long[] longValues = blockValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
longValues[i]);
}
break;
- case FLOAT:
+ }
+ case FLOAT: {
float[] floatValues = blockValSet.getFloatValuesSV();
for (int i = 0; i < length; i++) {
setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
floatValues[i]);
}
break;
- case DOUBLE:
+ }
+ case DOUBLE: {
double[] doubleValues = blockValSet.getDoubleValuesSV();
for (int i = 0; i < length; i++) {
setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
doubleValues[i]);
}
break;
- case STRING:
+ }
+ case STRING: {
String[] stringValues = blockValSet.getStringValuesSV();
for (int i = 0; i < length; i++) {
setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
stringValues[i]);
}
break;
- case BYTES:
+ }
+ case BYTES: {
byte[][] bytesValues = blockValSet.getBytesValuesSV();
for (int i = 0; i < length; i++) {
setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], new
ByteArray(bytesValues[i]));
}
break;
+ }
default:
throw getIllegalDataTypeException(valueType, true);
}
} else {
switch (storedType) {
- case INT:
+ case INT: {
int[][] intValues = blockValSet.getIntValuesMV();
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
@@ -573,7 +413,8 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case LONG:
+ }
+ case LONG: {
long[][] longValues = blockValSet.getLongValuesMV();
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
@@ -584,7 +425,8 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case FLOAT:
+ }
+ case FLOAT: {
float[][] floatValues = blockValSet.getFloatValuesMV();
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
@@ -595,7 +437,8 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case DOUBLE:
+ }
+ case DOUBLE: {
double[][] doubleValues = blockValSet.getDoubleValuesMV();
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
@@ -607,20 +450,18 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
break;
- case STRING:
+ }
+ case STRING: {
String[][] stringValues = blockValSet.getStringValuesMV();
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
ObjectOpenHashSet<String> stringSet =
(ObjectOpenHashSet<String>) getValueSet(groupByResultHolder,
groupKey, DataType.STRING);
- //noinspection ManualArrayToCollectionCopy
- for (String value : stringValues[i]) {
- //noinspection UseBulkOperation
- stringSet.add(value);
- }
+ Collections.addAll(stringSet, stringValues[i]);
}
}
break;
+ }
default:
throw getIllegalDataTypeException(valueType, false);
}
@@ -628,138 +469,39 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
@Override
- public Object extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ public final Object extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
Object result = aggregationResultHolder.getResult();
if (result == null) {
return EMPTY_PLACEHOLDER;
}
if (result instanceof DictIdsWrapper) {
- // For dictionary-encoded expression, convert dictionary ids to values
DictIdsWrapper dictIdsWrapper = (DictIdsWrapper) result;
- if (dictIdsWrapper._dictIdBitmap.cardinalityExceeds(_threshold)) {
- return convertToHLL(dictIdsWrapper);
+ if (dictIdsWrapper._dictIdBitmap.cardinalityExceeds(getThreshold())) {
+ return convertToSketch(dictIdsWrapper);
} else {
return convertToValueSet(dictIdsWrapper);
}
} else {
- // For non-dictionary-encoded expression, directly return the value set
return result;
}
}
@Override
- public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey) {
+ public final Set extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
Object result = groupByResultHolder.getResult(groupKey);
if (result == null) {
return EMPTY_PLACEHOLDER;
}
if (result instanceof DictIdsWrapper) {
- // For dictionary-encoded expression, convert dictionary ids to values
return convertToValueSet((DictIdsWrapper) result);
} else {
- // For non-dictionary-encoded expression, directly return the value set
return (Set) result;
}
}
- @Override
- public Object merge(Object intermediateResult1, Object intermediateResult2) {
- if (intermediateResult1 instanceof HyperLogLog) {
- return mergeIntoHLL((HyperLogLog) intermediateResult1,
intermediateResult2);
- }
- if (intermediateResult2 instanceof HyperLogLog) {
- return mergeIntoHLL((HyperLogLog) intermediateResult2,
intermediateResult1);
- }
-
- Set valueSet1 = (Set) intermediateResult1;
- Set valueSet2 = (Set) intermediateResult2;
- if (valueSet1.isEmpty()) {
- return valueSet2;
- }
- if (valueSet2.isEmpty()) {
- return valueSet1;
- }
- valueSet1.addAll(valueSet2);
-
- // Convert to HLL if the set size exceeds the threshold
- if (valueSet1.size() > _threshold) {
- if (valueSet1 instanceof ObjectSet && valueSet1.iterator().next()
instanceof ByteArray) {
- return convertByteArraySetToHLL((ObjectSet<ByteArray>) valueSet1);
- } else {
- return convertNonByteArraySetToHLL(valueSet1);
- }
- } else {
- return valueSet1;
- }
- }
-
- private static HyperLogLog mergeIntoHLL(HyperLogLog hll, Object
intermediateResult) {
- if (intermediateResult instanceof HyperLogLog) {
- try {
- hll.addAll((HyperLogLog) intermediateResult);
- } catch (CardinalityMergeException e) {
- throw new RuntimeException("Caught exception while merging
HyperLogLog", e);
- }
- } else {
- Set valueSet = (Set) intermediateResult;
- if (!valueSet.isEmpty()) {
- if (valueSet instanceof ObjectSet && valueSet.iterator().next()
instanceof ByteArray) {
- for (Object value : valueSet) {
- hll.offer(((ByteArray) value).getBytes());
- }
- } else {
- for (Object value : valueSet) {
- hll.offer(value);
- }
- }
- }
- }
- return hll;
- }
-
- @Override
- public ColumnDataType getIntermediateResultColumnType() {
- return ColumnDataType.OBJECT;
- }
-
- @Override
- public SerializedIntermediateResult serializeIntermediateResult(Object o) {
- if (o instanceof HyperLogLog) {
- return new
SerializedIntermediateResult(ObjectSerDeUtils.ObjectType.HyperLogLog.getValue(),
- ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize((HyperLogLog) o));
- }
- return BaseDistinctAggregateAggregationFunction.serializeSet((Set) o);
- }
-
- @Override
- public Object deserializeIntermediateResult(CustomObject customObject) {
- return ObjectSerDeUtils.deserialize(customObject);
- }
-
- @Override
- public ColumnDataType getFinalResultColumnType() {
- return ColumnDataType.INT;
- }
-
- @Override
- public Integer extractFinalResult(Object intermediateResult) {
- if (intermediateResult instanceof HyperLogLog) {
- return (int) ((HyperLogLog) intermediateResult).cardinality();
- } else {
- return ((Set) intermediateResult).size();
- }
- }
-
- @Override
- public Integer mergeFinalResult(Integer finalResult1, Integer finalResult2) {
- return finalResult1 + finalResult2;
- }
-
- /**
- * Returns the dictionary id bitmap from the result holder or creates a new
one if it does not exist.
- */
+ /** Returns the dictionary id bitmap from the result holder or creates a new
one if it does not exist. */
protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder
aggregationResultHolder,
Dictionary dictionary) {
DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
@@ -770,9 +512,7 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
return dictIdsWrapper._dictIdBitmap;
}
- /**
- * Returns the value set from the result holder or creates a new one if it
does not exist.
- */
+ /** Returns the value set from the result holder or creates a new one if it
does not exist. */
protected static Set getValueSet(AggregationResultHolder
aggregationResultHolder, DataType valueType) {
Set valueSet = aggregationResultHolder.getResult();
if (valueSet == null) {
@@ -782,10 +522,8 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
return valueSet;
}
- /**
- * Helper method to create a value set for the given value type.
- */
- private static Set getValueSet(DataType valueType) {
+ /** Helper method to create a value set for the given value type. */
+ protected static Set getValueSet(DataType valueType) {
switch (valueType) {
case INT:
return new IntOpenHashSet();
@@ -799,13 +537,12 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
case BYTES:
return new ObjectOpenHashSet();
default:
- throw new IllegalStateException("Illegal data type for DISTINCT_COUNT
aggregation function: " + valueType);
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_SMART aggregation function:
" + valueType);
}
}
- /**
- * Returns the dictionary id bitmap for the given group key or creates a new
one if it does not exist.
- */
+ /** Returns the dictionary id bitmap for the given group key or creates a
new one if it does not exist. */
protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder
groupByResultHolder, int groupKey,
Dictionary dictionary) {
DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey);
@@ -816,9 +553,7 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
return dictIdsWrapper._dictIdBitmap;
}
- /**
- * Returns the value set for the given group key or creates a new one if it
does not exist.
- */
+ /** Returns the value set for the given group key or creates a new one if it
does not exist. */
protected static Set getValueSet(GroupByResultHolder groupByResultHolder,
int groupKey, DataType valueType) {
Set valueSet = groupByResultHolder.getResult(groupKey);
if (valueSet == null) {
@@ -828,198 +563,121 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
return valueSet;
}
- /**
- * Helper method to set dictionary id for the given group keys into the
result holder.
- */
- private static void setDictIdForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys,
+ /** Helper method to set dictionary id for the given group keys into the
result holder. */
+ protected static void setDictIdForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys,
Dictionary dictionary, int dictId) {
for (int groupKey : groupKeys) {
getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId);
}
}
- /**
- * Helper method to set INT value for the given group keys into the result
holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, int value) {
+ /** Helper method to set INT value for the given group keys into the result
holder. */
+ protected static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, int value) {
for (int groupKey : groupKeys) {
((IntOpenHashSet) getValueSet(groupByResultHolder, groupKey,
DataType.INT)).add(value);
}
}
- /**
- * Helper method to set LONG value for the given group keys into the result
holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, long value) {
+ /** Helper method to set LONG value for the given group keys into the result
holder. */
+ protected static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, long value) {
for (int groupKey : groupKeys) {
((LongOpenHashSet) getValueSet(groupByResultHolder, groupKey,
DataType.LONG)).add(value);
}
}
- /**
- * Helper method to set FLOAT value for the given group keys into the result
holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, float value) {
+ /** Helper method to set FLOAT value for the given group keys into the
result holder. */
+ protected static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, float value) {
for (int groupKey : groupKeys) {
((FloatOpenHashSet) getValueSet(groupByResultHolder, groupKey,
DataType.FLOAT)).add(value);
}
}
- /**
- * Helper method to set DOUBLE value for the given group keys into the
result holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, double value) {
+ /** Helper method to set DOUBLE value for the given group keys into the
result holder. */
+ protected static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, double value) {
for (int groupKey : groupKeys) {
((DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey,
DataType.DOUBLE)).add(value);
}
}
- /**
- * Helper method to set STRING value for the given group keys into the
result holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, String value) {
+ /** Helper method to set STRING value for the given group keys into the
result holder. */
+ protected static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, String value) {
for (int groupKey : groupKeys) {
((ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, groupKey,
DataType.STRING)).add(value);
}
}
- /**
- * Helper method to set BYTES value for the given group keys into the result
holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, ByteArray value) {
+ /** Helper method to set BYTES value for the given group keys into the
result holder. */
+ protected static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys,
+ ByteArray value) {
for (int groupKey : groupKeys) {
((ObjectOpenHashSet<ByteArray>) getValueSet(groupByResultHolder,
groupKey, DataType.BYTES)).add(value);
}
}
- /**
- * Helper method to read dictionary and convert dictionary ids to a value
set for dictionary-encoded expression.
- */
- private static Set convertToValueSet(DictIdsWrapper dictIdsWrapper) {
+ /** Helper method to read dictionary and convert dictionary ids to a value
set for dictionary-encoded expression. */
+ protected static Set convertToValueSet(DictIdsWrapper dictIdsWrapper) {
Dictionary dictionary = dictIdsWrapper._dictionary;
RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
int numValues = dictIdBitmap.getCardinality();
PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
DataType storedType = dictionary.getValueType();
switch (storedType) {
- case INT:
+ case INT: {
IntOpenHashSet intSet = new IntOpenHashSet(numValues);
while (iterator.hasNext()) {
intSet.add(dictionary.getIntValue(iterator.next()));
}
return intSet;
- case LONG:
+ }
+ case LONG: {
LongOpenHashSet longSet = new LongOpenHashSet(numValues);
while (iterator.hasNext()) {
longSet.add(dictionary.getLongValue(iterator.next()));
}
return longSet;
- case FLOAT:
+ }
+ case FLOAT: {
FloatOpenHashSet floatSet = new FloatOpenHashSet(numValues);
while (iterator.hasNext()) {
floatSet.add(dictionary.getFloatValue(iterator.next()));
}
return floatSet;
- case DOUBLE:
+ }
+ case DOUBLE: {
DoubleOpenHashSet doubleSet = new DoubleOpenHashSet(numValues);
while (iterator.hasNext()) {
doubleSet.add(dictionary.getDoubleValue(iterator.next()));
}
return doubleSet;
- case STRING:
+ }
+ case STRING: {
ObjectOpenHashSet<String> stringSet = new
ObjectOpenHashSet<>(numValues);
while (iterator.hasNext()) {
stringSet.add(dictionary.getStringValue(iterator.next()));
}
return stringSet;
- case BYTES:
+ }
+ case BYTES: {
ObjectOpenHashSet<ByteArray> bytesSet = new
ObjectOpenHashSet<>(numValues);
while (iterator.hasNext()) {
bytesSet.add(new
ByteArray(dictionary.getBytesValue(iterator.next())));
}
return bytesSet;
+ }
default:
- throw new IllegalStateException("Illegal data type for DISTINCT_COUNT
aggregation function: " + storedType);
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_SMART aggregation function:
" + storedType);
}
}
- /**
- * Helper method to read dictionary and convert dictionary ids to a
HyperLogLog for dictionary-encoded expression.
- */
- private HyperLogLog convertToHLL(DictIdsWrapper dictIdsWrapper) {
- HyperLogLog hyperLogLog = new HyperLogLog(_log2m);
- Dictionary dictionary = dictIdsWrapper._dictionary;
- RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
- PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
- while (iterator.hasNext()) {
- hyperLogLog.offer(dictionary.get(iterator.next()));
- }
- return hyperLogLog;
- }
-
- private static IllegalStateException getIllegalDataTypeException(DataType
dataType, boolean singleValue) {
- return new IllegalStateException(
- "Illegal data type for DISTINCT_COUNT_SMART_HLL aggregation function:
" + dataType + (singleValue ? ""
- : "_MV"));
- }
-
- private static final class DictIdsWrapper {
+ /** Wrapper of dictionary and dict-id bitmap used during aggregation. */
+ protected static final class DictIdsWrapper {
final Dictionary _dictionary;
final RoaringBitmap _dictIdBitmap;
- private DictIdsWrapper(Dictionary dictionary) {
+ DictIdsWrapper(Dictionary dictionary) {
_dictionary = dictionary;
_dictIdBitmap = new RoaringBitmap();
}
}
-
- /**
- * Helper class to wrap the parameters.
- */
- private static class Parameters {
- static final char PARAMETER_DELIMITER = ';';
- static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
-
- static final String THRESHOLD_KEY = "THRESHOLD";
- // 100K values to trigger HLL conversion by default
- static final int DEFAULT_THRESHOLD = 100_000;
- @Deprecated
- static final String DEPRECATED_THRESHOLD_KEY = "HLLCONVERSIONTHRESHOLD";
-
- static final String LOG2M_KEY = "LOG2M";
- // Use 12 by default to get good accuracy for DistinctCount
- static final int DEFAULT_LOG2M = 12;
- @Deprecated
- static final String DEPRECATED_LOG2M_KEY = "HLLLOG2M";
-
- int _threshold = DEFAULT_THRESHOLD;
- int _log2m = DEFAULT_LOG2M;
-
- Parameters(String parametersString) {
- StringUtils.deleteWhitespace(parametersString);
- String[] keyValuePairs = StringUtils.split(parametersString,
PARAMETER_DELIMITER);
- for (String keyValuePair : keyValuePairs) {
- String[] keyAndValue = StringUtils.split(keyValuePair,
PARAMETER_KEY_VALUE_SEPARATOR);
- Preconditions.checkArgument(keyAndValue.length == 2, "Invalid
parameter: %s", keyValuePair);
- String key = keyAndValue[0];
- String value = keyAndValue[1];
- switch (key.toUpperCase()) {
- case THRESHOLD_KEY:
- case DEPRECATED_THRESHOLD_KEY:
- _threshold = Integer.parseInt(value);
- // Treat non-positive threshold as unlimited
- if (_threshold <= 0) {
- _threshold = Integer.MAX_VALUE;
- }
- break;
- case LOG2M_KEY:
- case DEPRECATED_LOG2M_KEY:
- _log2m = Integer.parseInt(value);
- break;
- default:
- throw new IllegalArgumentException("Invalid parameter key: " +
key);
- }
- }
- }
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
index e93b48def80..5656212398f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartHLLAggregationFunction.java
@@ -21,12 +21,6 @@ package org.apache.pinot.core.query.aggregation.function;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.google.common.base.Preconditions;
-import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
-import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import it.unimi.dsi.fastutil.ints.IntSet;
-import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
-import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import it.unimi.dsi.fastutil.objects.ObjectSet;
import java.util.List;
import java.util.Map;
@@ -38,9 +32,6 @@ import
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
-import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -63,9 +54,7 @@ import org.roaringbitmap.RoaringBitmap;
* Example of second argument: 'threshold=10;log2m=8'
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class DistinctCountSmartHLLAggregationFunction extends
BaseSingleInputAggregationFunction<Object, Integer> {
- // Use empty IntOpenHashSet as a placeholder for empty result
- private static final IntSet EMPTY_PLACEHOLDER = new IntOpenHashSet();
+public class DistinctCountSmartHLLAggregationFunction extends
BaseDistinctCountSmartSketchAggregationFunction {
private final int _threshold;
private final int _log2m;
@@ -96,15 +85,7 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
return AggregationFunctionType.DISTINCTCOUNTSMARTHLL;
}
- @Override
- public AggregationResultHolder createAggregationResultHolder() {
- return new ObjectAggregationResultHolder();
- }
-
- @Override
- public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
- return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
- }
+ // Result holder creators are provided by the base class
@Override
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
@@ -228,117 +209,7 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
}
}
- private void aggregateIntoSet(int length, AggregationResultHolder
aggregationResultHolder, BlockValSet blockValSet) {
- DataType valueType = blockValSet.getValueType();
- DataType storedType = valueType.getStoredType();
- Set valueSet = getValueSet(aggregationResultHolder, storedType);
- if (blockValSet.isSingleValue()) {
- switch (storedType) {
- case INT:
- IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
- int[] intValues = blockValSet.getIntValuesSV();
- for (int i = 0; i < length; i++) {
- intSet.add(intValues[i]);
- }
- break;
- case LONG:
- LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
- long[] longValues = blockValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- longSet.add(longValues[i]);
- }
- break;
- case FLOAT:
- FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
- float[] floatValues = blockValSet.getFloatValuesSV();
- for (int i = 0; i < length; i++) {
- floatSet.add(floatValues[i]);
- }
- break;
- case DOUBLE:
- DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
- double[] doubleValues = blockValSet.getDoubleValuesSV();
- for (int i = 0; i < length; i++) {
- doubleSet.add(doubleValues[i]);
- }
- break;
- case STRING:
- ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>)
valueSet;
- String[] stringValues = blockValSet.getStringValuesSV();
- //noinspection ManualArrayToCollectionCopy
- for (int i = 0; i < length; i++) {
- stringSet.add(stringValues[i]);
- }
- break;
- case BYTES:
- ObjectOpenHashSet<ByteArray> bytesSet =
(ObjectOpenHashSet<ByteArray>) valueSet;
- byte[][] bytesValues = blockValSet.getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- bytesSet.add(new ByteArray(bytesValues[i]));
- }
- break;
- default:
- throw getIllegalDataTypeException(valueType, true);
- }
- } else {
- switch (storedType) {
- case INT:
- IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
- int[][] intValues = blockValSet.getIntValuesMV();
- for (int i = 0; i < length; i++) {
- for (int value : intValues[i]) {
- intSet.add(value);
- }
- }
- break;
- case LONG:
- LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
- long[][] longValues = blockValSet.getLongValuesMV();
- for (int i = 0; i < length; i++) {
- for (long value : longValues[i]) {
- longSet.add(value);
- }
- }
- break;
- case FLOAT:
- FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
- float[][] floatValues = blockValSet.getFloatValuesMV();
- for (int i = 0; i < length; i++) {
- for (float value : floatValues[i]) {
- floatSet.add(value);
- }
- }
- break;
- case DOUBLE:
- DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
- double[][] doubleValues = blockValSet.getDoubleValuesMV();
- for (int i = 0; i < length; i++) {
- for (double value : doubleValues[i]) {
- doubleSet.add(value);
- }
- }
- break;
- case STRING:
- ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>)
valueSet;
- String[][] stringValues = blockValSet.getStringValuesMV();
- for (int i = 0; i < length; i++) {
- //noinspection ManualArrayToCollectionCopy
- for (String value : stringValues[i]) {
- //noinspection UseBulkOperation
- stringSet.add(value);
- }
- }
- break;
- default:
- throw getIllegalDataTypeException(valueType, false);
- }
- }
-
- // Convert to HLL if the set size exceeds the threshold
- if (valueSet.size() > _threshold) {
- aggregationResultHolder.setValue(convertSetToHLL(valueSet, storedType));
- }
- }
+ // aggregateIntoSet is handled by the base class
protected HyperLogLog convertSetToHLL(Set valueSet, DataType storedType) {
if (storedType == DataType.BYTES) {
@@ -364,305 +235,13 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
return hll;
}
- @Override
- public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- BlockValSet blockValSet = blockValSetMap.get(_expression);
+ // group-by SV handled by the base class
- // For dictionary-encoded expression, store dictionary ids into the bitmap
- Dictionary dictionary = blockValSet.getDictionary();
- if (dictionary != null) {
- if (blockValSet.isSingleValue()) {
- int[] dictIds = blockValSet.getDictionaryIdsSV();
- for (int i = 0; i < length; i++) {
- getDictIdBitmap(groupByResultHolder, groupKeyArray[i],
dictionary).add(dictIds[i]);
- }
- } else {
- int[][] dictIds = blockValSet.getDictionaryIdsMV();
- for (int i = 0; i < length; i++) {
- getDictIdBitmap(groupByResultHolder, groupKeyArray[i],
dictionary).add(dictIds[i]);
- }
- }
- return;
- }
+ // group-by MV handled by the base class
- // For non-dictionary-encoded expression, store values into the value set
- DataType valueType = blockValSet.getValueType();
- DataType storedType = valueType.getStoredType();
- if (blockValSet.isSingleValue()) {
- switch (storedType) {
- case INT:
- int[] intValues = blockValSet.getIntValuesSV();
- for (int i = 0; i < length; i++) {
- ((IntOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.INT)).add(intValues[i]);
- }
- break;
- case LONG:
- long[] longValues = blockValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- ((LongOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.LONG)).add(longValues[i]);
- }
- break;
- case FLOAT:
- float[] floatValues = blockValSet.getFloatValuesSV();
- for (int i = 0; i < length; i++) {
- ((FloatOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.FLOAT)).add(floatValues[i]);
- }
- break;
- case DOUBLE:
- double[] doubleValues = blockValSet.getDoubleValuesSV();
- for (int i = 0; i < length; i++) {
- ((DoubleOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.DOUBLE)).add(
- doubleValues[i]);
- }
- break;
- case STRING:
- String[] stringValues = blockValSet.getStringValuesSV();
- for (int i = 0; i < length; i++) {
- ((ObjectOpenHashSet<String>) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.STRING)).add(
- stringValues[i]);
- }
- break;
- case BYTES:
- byte[][] bytesValues = blockValSet.getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- ((ObjectOpenHashSet<ByteArray>) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.BYTES)).add(
- new ByteArray(bytesValues[i]));
- }
- break;
- default:
- throw getIllegalDataTypeException(valueType, true);
- }
- } else {
- switch (storedType) {
- case INT:
- int[][] intValues = blockValSet.getIntValuesMV();
- for (int i = 0; i < length; i++) {
- IntOpenHashSet intSet = (IntOpenHashSet)
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT);
- for (int value : intValues[i]) {
- intSet.add(value);
- }
- }
- break;
- case LONG:
- long[][] longValues = blockValSet.getLongValuesMV();
- for (int i = 0; i < length; i++) {
- LongOpenHashSet longSet =
- (LongOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.LONG);
- for (long value : longValues[i]) {
- longSet.add(value);
- }
- }
- break;
- case FLOAT:
- float[][] floatValues = blockValSet.getFloatValuesMV();
- for (int i = 0; i < length; i++) {
- FloatOpenHashSet floatSet =
- (FloatOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.FLOAT);
- for (float value : floatValues[i]) {
- floatSet.add(value);
- }
- }
- break;
- case DOUBLE:
- double[][] doubleValues = blockValSet.getDoubleValuesMV();
- for (int i = 0; i < length; i++) {
- DoubleOpenHashSet doubleSet =
- (DoubleOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.DOUBLE);
- for (double value : doubleValues[i]) {
- doubleSet.add(value);
- }
- }
- break;
- case STRING:
- String[][] stringValues = blockValSet.getStringValuesMV();
- for (int i = 0; i < length; i++) {
- ObjectOpenHashSet<String> stringSet =
- (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.STRING);
- //noinspection ManualArrayToCollectionCopy
- for (String value : stringValues[i]) {
- //noinspection UseBulkOperation
- stringSet.add(value);
- }
- }
- break;
- default:
- throw getIllegalDataTypeException(valueType, false);
- }
- }
- }
+ // extraction is handled by the base class
- @Override
- public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- BlockValSet blockValSet = blockValSetMap.get(_expression);
-
- // For dictionary-encoded expression, store dictionary ids into the bitmap
- Dictionary dictionary = blockValSet.getDictionary();
- if (dictionary != null) {
- if (blockValSet.isSingleValue()) {
- int[] dictIds = blockValSet.getDictionaryIdsSV();
- for (int i = 0; i < length; i++) {
- setDictIdForGroupKeys(groupByResultHolder, groupKeysArray[i],
dictionary, dictIds[i]);
- }
- } else {
- int[][] dictIds = blockValSet.getDictionaryIdsMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- getDictIdBitmap(groupByResultHolder, groupKey,
dictionary).add(dictIds[i]);
- }
- }
- }
- return;
- }
-
- // For non-dictionary-encoded expression, store values into the value set
- DataType valueType = blockValSet.getValueType();
- DataType storedType = valueType.getStoredType();
- if (blockValSet.isSingleValue()) {
- switch (storedType) {
- case INT:
- int[] intValues = blockValSet.getIntValuesSV();
- for (int i = 0; i < length; i++) {
- setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
intValues[i]);
- }
- break;
- case LONG:
- long[] longValues = blockValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
longValues[i]);
- }
- break;
- case FLOAT:
- float[] floatValues = blockValSet.getFloatValuesSV();
- for (int i = 0; i < length; i++) {
- setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
floatValues[i]);
- }
- break;
- case DOUBLE:
- double[] doubleValues = blockValSet.getDoubleValuesSV();
- for (int i = 0; i < length; i++) {
- setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
doubleValues[i]);
- }
- break;
- case STRING:
- String[] stringValues = blockValSet.getStringValuesSV();
- for (int i = 0; i < length; i++) {
- setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
stringValues[i]);
- }
- break;
- case BYTES:
- byte[][] bytesValues = blockValSet.getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], new
ByteArray(bytesValues[i]));
- }
- break;
- default:
- throw getIllegalDataTypeException(valueType, true);
- }
- } else {
- switch (storedType) {
- case INT:
- int[][] intValues = blockValSet.getIntValuesMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- IntOpenHashSet intSet = (IntOpenHashSet)
getValueSet(groupByResultHolder, groupKey, DataType.INT);
- for (int value : intValues[i]) {
- intSet.add(value);
- }
- }
- }
- break;
- case LONG:
- long[][] longValues = blockValSet.getLongValuesMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- LongOpenHashSet longSet = (LongOpenHashSet)
getValueSet(groupByResultHolder, groupKey, DataType.LONG);
- for (long value : longValues[i]) {
- longSet.add(value);
- }
- }
- }
- break;
- case FLOAT:
- float[][] floatValues = blockValSet.getFloatValuesMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- FloatOpenHashSet floatSet = (FloatOpenHashSet)
getValueSet(groupByResultHolder, groupKey, DataType.FLOAT);
- for (float value : floatValues[i]) {
- floatSet.add(value);
- }
- }
- }
- break;
- case DOUBLE:
- double[][] doubleValues = blockValSet.getDoubleValuesMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- DoubleOpenHashSet doubleSet =
- (DoubleOpenHashSet) getValueSet(groupByResultHolder,
groupKey, DataType.DOUBLE);
- for (double value : doubleValues[i]) {
- doubleSet.add(value);
- }
- }
- }
- break;
- case STRING:
- String[][] stringValues = blockValSet.getStringValuesMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- ObjectOpenHashSet<String> stringSet =
- (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder,
groupKey, DataType.STRING);
- //noinspection ManualArrayToCollectionCopy
- for (String value : stringValues[i]) {
- //noinspection UseBulkOperation
- stringSet.add(value);
- }
- }
- }
- break;
- default:
- throw getIllegalDataTypeException(valueType, false);
- }
- }
- }
-
- @Override
- public Object extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
- Object result = aggregationResultHolder.getResult();
- if (result == null) {
- return EMPTY_PLACEHOLDER;
- }
-
- if (result instanceof DictIdsWrapper) {
- // For dictionary-encoded expression, convert dictionary ids to values
- DictIdsWrapper dictIdsWrapper = (DictIdsWrapper) result;
- if (dictIdsWrapper._dictIdBitmap.cardinalityExceeds(_threshold)) {
- return convertToHLL(dictIdsWrapper);
- } else {
- return convertToValueSet(dictIdsWrapper);
- }
- } else {
- // For non-dictionary-encoded expression, directly return the value set
- return result;
- }
- }
-
- @Override
- public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey) {
- Object result = groupByResultHolder.getResult(groupKey);
- if (result == null) {
- return EMPTY_PLACEHOLDER;
- }
-
- if (result instanceof DictIdsWrapper) {
- // For dictionary-encoded expression, convert dictionary ids to values
- return convertToValueSet((DictIdsWrapper) result);
- } else {
- // For non-dictionary-encoded expression, directly return the value set
- return (Set) result;
- }
- }
+ // extraction is handled by the base class
@Override
public Object merge(Object intermediateResult1, Object intermediateResult2) {
@@ -760,193 +339,12 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
/**
* Returns the dictionary id bitmap from the result holder or creates a new
one if it does not exist.
*/
- protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder
aggregationResultHolder,
- Dictionary dictionary) {
- DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
- if (dictIdsWrapper == null) {
- dictIdsWrapper = new DictIdsWrapper(dictionary);
- aggregationResultHolder.setValue(dictIdsWrapper);
- }
- return dictIdsWrapper._dictIdBitmap;
- }
-
- /**
- * Returns the value set from the result holder or creates a new one if it
does not exist.
- */
- protected static Set getValueSet(AggregationResultHolder
aggregationResultHolder, DataType valueType) {
- Set valueSet = aggregationResultHolder.getResult();
- if (valueSet == null) {
- valueSet = getValueSet(valueType);
- aggregationResultHolder.setValue(valueSet);
- }
- return valueSet;
- }
-
- /**
- * Helper method to create a value set for the given value type.
- */
- private static Set getValueSet(DataType valueType) {
- switch (valueType) {
- case INT:
- return new IntOpenHashSet();
- case LONG:
- return new LongOpenHashSet();
- case FLOAT:
- return new FloatOpenHashSet();
- case DOUBLE:
- return new DoubleOpenHashSet();
- case STRING:
- case BYTES:
- return new ObjectOpenHashSet();
- default:
- throw new IllegalStateException("Illegal data type for DISTINCT_COUNT
aggregation function: " + valueType);
- }
- }
-
- /**
- * Returns the dictionary id bitmap for the given group key or creates a new
one if it does not exist.
- */
- protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder
groupByResultHolder, int groupKey,
- Dictionary dictionary) {
- DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey);
- if (dictIdsWrapper == null) {
- dictIdsWrapper = new DictIdsWrapper(dictionary);
- groupByResultHolder.setValueForKey(groupKey, dictIdsWrapper);
- }
- return dictIdsWrapper._dictIdBitmap;
- }
-
- /**
- * Returns the value set for the given group key or creates a new one if it
does not exist.
- */
- protected static Set getValueSet(GroupByResultHolder groupByResultHolder,
int groupKey, DataType valueType) {
- Set valueSet = groupByResultHolder.getResult(groupKey);
- if (valueSet == null) {
- valueSet = getValueSet(valueType);
- groupByResultHolder.setValueForKey(groupKey, valueSet);
- }
- return valueSet;
- }
-
- /**
- * Helper method to set dictionary id for the given group keys into the
result holder.
- */
- private static void setDictIdForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys,
- Dictionary dictionary, int dictId) {
- for (int groupKey : groupKeys) {
- getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId);
- }
- }
-
- /**
- * Helper method to set INT value for the given group keys into the result
holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, int value) {
- for (int groupKey : groupKeys) {
- ((IntOpenHashSet) getValueSet(groupByResultHolder, groupKey,
DataType.INT)).add(value);
- }
- }
-
- /**
- * Helper method to set LONG value for the given group keys into the result
holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, long value) {
- for (int groupKey : groupKeys) {
- ((LongOpenHashSet) getValueSet(groupByResultHolder, groupKey,
DataType.LONG)).add(value);
- }
- }
-
- /**
- * Helper method to set FLOAT value for the given group keys into the result
holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, float value) {
- for (int groupKey : groupKeys) {
- ((FloatOpenHashSet) getValueSet(groupByResultHolder, groupKey,
DataType.FLOAT)).add(value);
- }
- }
-
- /**
- * Helper method to set DOUBLE value for the given group keys into the
result holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, double value) {
- for (int groupKey : groupKeys) {
- ((DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey,
DataType.DOUBLE)).add(value);
- }
- }
-
- /**
- * Helper method to set STRING value for the given group keys into the
result holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, String value) {
- for (int groupKey : groupKeys) {
- ((ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, groupKey,
DataType.STRING)).add(value);
- }
- }
-
- /**
- * Helper method to set BYTES value for the given group keys into the result
holder.
- */
- private static void setValueForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys, ByteArray value) {
- for (int groupKey : groupKeys) {
- ((ObjectOpenHashSet<ByteArray>) getValueSet(groupByResultHolder,
groupKey, DataType.BYTES)).add(value);
- }
- }
-
- /**
- * Helper method to read dictionary and convert dictionary ids to a value
set for dictionary-encoded expression.
- */
- private static Set convertToValueSet(DictIdsWrapper dictIdsWrapper) {
- Dictionary dictionary = dictIdsWrapper._dictionary;
- RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
- int numValues = dictIdBitmap.getCardinality();
- PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
- DataType storedType = dictionary.getValueType();
- switch (storedType) {
- case INT:
- IntOpenHashSet intSet = new IntOpenHashSet(numValues);
- while (iterator.hasNext()) {
- intSet.add(dictionary.getIntValue(iterator.next()));
- }
- return intSet;
- case LONG:
- LongOpenHashSet longSet = new LongOpenHashSet(numValues);
- while (iterator.hasNext()) {
- longSet.add(dictionary.getLongValue(iterator.next()));
- }
- return longSet;
- case FLOAT:
- FloatOpenHashSet floatSet = new FloatOpenHashSet(numValues);
- while (iterator.hasNext()) {
- floatSet.add(dictionary.getFloatValue(iterator.next()));
- }
- return floatSet;
- case DOUBLE:
- DoubleOpenHashSet doubleSet = new DoubleOpenHashSet(numValues);
- while (iterator.hasNext()) {
- doubleSet.add(dictionary.getDoubleValue(iterator.next()));
- }
- return doubleSet;
- case STRING:
- ObjectOpenHashSet<String> stringSet = new
ObjectOpenHashSet<>(numValues);
- while (iterator.hasNext()) {
- stringSet.add(dictionary.getStringValue(iterator.next()));
- }
- return stringSet;
- case BYTES:
- ObjectOpenHashSet<ByteArray> bytesSet = new
ObjectOpenHashSet<>(numValues);
- while (iterator.hasNext()) {
- bytesSet.add(new
ByteArray(dictionary.getBytesValue(iterator.next())));
- }
- return bytesSet;
- default:
- throw new IllegalStateException("Illegal data type for DISTINCT_COUNT
aggregation function: " + storedType);
- }
- }
+ // helper methods for dict/value set conversions are provided by the base
class
/**
* Helper method to read dictionary and convert dictionary ids to a
HyperLogLog for dictionary-encoded expression.
*/
- private HyperLogLog convertToHLL(DictIdsWrapper dictIdsWrapper) {
+ private HyperLogLog
convertToHLL(BaseDistinctCountSmartSketchAggregationFunction.DictIdsWrapper
dictIdsWrapper) {
HyperLogLog hyperLogLog = new HyperLogLog(_log2m);
Dictionary dictionary = dictIdsWrapper._dictionary;
RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
@@ -957,20 +355,21 @@ public class DistinctCountSmartHLLAggregationFunction
extends BaseSingleInputAgg
return hyperLogLog;
}
- private static IllegalStateException getIllegalDataTypeException(DataType
dataType, boolean singleValue) {
- return new IllegalStateException(
- "Illegal data type for DISTINCT_COUNT_SMART_HLL aggregation function:
" + dataType + (singleValue ? ""
- : "_MV"));
+ @Override
+ protected Object convertSetToSketch(Set valueSet, DataType storedType) {
+ return convertSetToHLL(valueSet, storedType);
}
- private static final class DictIdsWrapper {
- final Dictionary _dictionary;
- final RoaringBitmap _dictIdBitmap;
+ @Override
+ protected Object
convertToSketch(BaseDistinctCountSmartSketchAggregationFunction.DictIdsWrapper
dictIdsWrapper) {
+ return convertToHLL(dictIdsWrapper);
+ }
- private DictIdsWrapper(Dictionary dictionary) {
- _dictionary = dictionary;
- _dictIdBitmap = new RoaringBitmap();
- }
+ @Override
+ protected IllegalStateException getIllegalDataTypeException(DataType
dataType, boolean singleValue) {
+ return new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_SMART_HLL aggregation function:
" + dataType + (singleValue ? ""
+ : "_MV"));
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartULLAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartULLAggregationFunction.java
new file mode 100644
index 00000000000..de639f4b464
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountSmartULLAggregationFunction.java
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.dynatrace.hash4j.distinctcount.UltraLogLog;
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.CustomObject;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code DistinctCountSmartULLAggregationFunction} calculates the number
of distinct values for a given expression
+ * (both single-valued and multi-valued are supported).
+ *
+ * For aggregation-only queries, the distinct values are stored in a Set
initially. Once the number of distinct values
+ * exceeds a threshold, the Set will be converted into an UltraLogLog, and
approximate result will be returned.
+ *
+ * The function takes an optional second argument for parameters:
+ * - threshold: Threshold of the number of distinct values to trigger the
conversion, 100_000 by default. Non-positive
+ * value means never convert.
+ * - p: Parameter p for UltraLogLog, default defined by
CommonConstants.Helix.DEFAULT_ULTRALOGLOG_P.
+ * Example of second argument: 'threshold=10;p=16'
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctCountSmartULLAggregationFunction extends
BaseDistinctCountSmartSketchAggregationFunction {
+ // placeholder handled by base class
+
+ private final int _threshold;
+ private final int _p;
+
+ public DistinctCountSmartULLAggregationFunction(List<ExpressionContext>
arguments) {
+ super(arguments.get(0));
+ int numExpressions = arguments.size();
+ // This function expects 1 or 2 arguments.
+ Preconditions.checkArgument(numExpressions <= 2, "DistinctCountSmartULL
expects 1 or 2 arguments, got: %s",
+ numExpressions);
+ if (arguments.size() > 1) {
+ Parameters parameters = new
Parameters(arguments.get(1).getLiteral().getStringValue());
+ _threshold = parameters._threshold;
+ _p = parameters._p;
+ } else {
+ _threshold = Parameters.DEFAULT_THRESHOLD;
+ _p =
org.apache.pinot.spi.utils.CommonConstants.Helix.DEFAULT_ULTRALOGLOG_P;
+ }
+ }
+
+ public int getThreshold() {
+ return _threshold;
+ }
+
+ public int getP() {
+ return _p;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTSMARTULL;
+ }
+
+ // Result holder creators are provided by the base class
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder,
dictionary);
+ if (blockValSet.isSingleValue()) {
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ dictIdBitmap.addN(dictIds, 0, length);
+ } else {
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ dictIdBitmap.add(dictIds[i]);
+ }
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the value set
or ULL
+ if (aggregationResultHolder.getResult() instanceof UltraLogLog) {
+ aggregateIntoULL(length, aggregationResultHolder, blockValSet);
+ } else {
+ aggregateIntoSet(length, aggregationResultHolder, blockValSet);
+ }
+ }
+
+ private void aggregateIntoULL(int length, AggregationResultHolder
aggregationResultHolder, BlockValSet blockValSet) {
+ DataType valueType = blockValSet.getValueType();
+ DataType storedType = valueType.getStoredType();
+ UltraLogLog ull = aggregationResultHolder.getResult();
+ if (blockValSet.isSingleValue()) {
+ switch (storedType) {
+ case INT: {
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ UltraLogLogUtils.hashObject(intValues[i]).ifPresent(ull::add);
+ }
+ break;
+ }
+ case LONG: {
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ UltraLogLogUtils.hashObject(longValues[i]).ifPresent(ull::add);
+ }
+ break;
+ }
+ case FLOAT: {
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ UltraLogLogUtils.hashObject(floatValues[i]).ifPresent(ull::add);
+ }
+ break;
+ }
+ case DOUBLE: {
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ UltraLogLogUtils.hashObject(doubleValues[i]).ifPresent(ull::add);
+ }
+ break;
+ }
+ case STRING: {
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ UltraLogLogUtils.hashObject(stringValues[i]).ifPresent(ull::add);
+ }
+ break;
+ }
+ case BYTES: {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ UltraLogLogUtils.hashObject(bytesValues[i]).ifPresent(ull::add);
+ }
+ break;
+ }
+ default:
+ throw getIllegalDataTypeException(valueType, true);
+ }
+ } else {
+ switch (storedType) {
+ case INT: {
+ int[][] intValues = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int value : intValues[i]) {
+ UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+ }
+ }
+ break;
+ }
+ case LONG: {
+ long[][] longValues = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (long value : longValues[i]) {
+ UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+ }
+ }
+ break;
+ }
+ case FLOAT: {
+ float[][] floatValues = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (float value : floatValues[i]) {
+ UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+ }
+ }
+ break;
+ }
+ case DOUBLE: {
+ double[][] doubleValues = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (double value : doubleValues[i]) {
+ UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+ }
+ }
+ break;
+ }
+ case STRING: {
+ String[][] stringValues = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (String value : stringValues[i]) {
+ UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+ }
+ }
+ break;
+ }
+ case BYTES: {
+ byte[][][] bytesValues = blockValSet.getBytesValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (byte[] value : bytesValues[i]) {
+ UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+ }
+ }
+ break;
+ }
+ default:
+ throw getIllegalDataTypeException(valueType, false);
+ }
+ }
+ }
+
+ // aggregateIntoSet is handled by the base class
+
+ protected UltraLogLog convertSetToULL(Set valueSet, DataType storedType) {
+ if (storedType == DataType.BYTES) {
+ return convertByteArraySetToULL((ObjectSet<ByteArray>) valueSet);
+ } else {
+ return convertNonByteArraySetToULL(valueSet);
+ }
+ }
+
+ protected UltraLogLog convertByteArraySetToULL(ObjectSet<ByteArray>
valueSet) {
+ UltraLogLog ull = UltraLogLog.create(_p);
+ for (ByteArray value : valueSet) {
+ UltraLogLogUtils.hashObject(value.getBytes()).ifPresent(ull::add);
+ }
+ return ull;
+ }
+
+ protected UltraLogLog convertNonByteArraySetToULL(Set valueSet) {
+ UltraLogLog ull = UltraLogLog.create(_p);
+ for (Object value : valueSet) {
+ UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+ }
+ return ull;
+ }
+
+ // group-by SV handled by the base class
+
+ // group-by MV handled by the base class
+
+ // extraction is handled by the base class
+
+ @Override
+ public Object merge(Object intermediateResult1, Object intermediateResult2) {
+ if (intermediateResult1 instanceof UltraLogLog) {
+ return mergeIntoULL((UltraLogLog) intermediateResult1,
intermediateResult2);
+ }
+ if (intermediateResult2 instanceof UltraLogLog) {
+ return mergeIntoULL((UltraLogLog) intermediateResult2,
intermediateResult1);
+ }
+
+ Set valueSet1 = (Set) intermediateResult1;
+ Set valueSet2 = (Set) intermediateResult2;
+ if (valueSet1.isEmpty()) {
+ return valueSet2;
+ }
+ if (valueSet2.isEmpty()) {
+ return valueSet1;
+ }
+ valueSet1.addAll(valueSet2);
+
+ if (valueSet1.size() > _threshold) {
+ if (valueSet1 instanceof ObjectSet && valueSet1.iterator().next()
instanceof ByteArray) {
+ return convertByteArraySetToULL((ObjectSet<ByteArray>) valueSet1);
+ } else {
+ return convertNonByteArraySetToULL(valueSet1);
+ }
+ } else {
+ return valueSet1;
+ }
+ }
+
+ private UltraLogLog mergeIntoULL(UltraLogLog ull, Object intermediateResult)
{
+ if (intermediateResult instanceof UltraLogLog) {
+ UltraLogLog other = (UltraLogLog) intermediateResult;
+ int largerP = Math.max(ull.getP(), other.getP());
+ if (largerP != ull.getP()) {
+ UltraLogLog merged = UltraLogLog.create(largerP);
+ merged.add(ull);
+ merged.add(other);
+ return merged;
+ } else {
+ ull.add(other);
+ return ull;
+ }
+ } else {
+ Set valueSet = (Set) intermediateResult;
+ if (!valueSet.isEmpty()) {
+ if (valueSet instanceof ObjectSet && valueSet.iterator().next()
instanceof ByteArray) {
+ for (Object value : valueSet) {
+ UltraLogLogUtils.hashObject(((ByteArray)
value).getBytes()).ifPresent(ull::add);
+ }
+ } else {
+ for (Object value : valueSet) {
+ UltraLogLogUtils.hashObject(value).ifPresent(ull::add);
+ }
+ }
+ }
+ return ull;
+ }
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public SerializedIntermediateResult serializeIntermediateResult(Object o) {
+ if (o instanceof UltraLogLog) {
+ return new
SerializedIntermediateResult(ObjectSerDeUtils.ObjectType.UltraLogLog.getValue(),
+ ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.serialize((UltraLogLog)
o));
+ }
+ return BaseDistinctAggregateAggregationFunction.serializeSet((Set) o);
+ }
+
+ @Override
+ public Object deserializeIntermediateResult(CustomObject customObject) {
+ return ObjectSerDeUtils.deserialize(customObject);
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.INT;
+ }
+
+ @Override
+ public Integer extractFinalResult(Object intermediateResult) {
+ if (intermediateResult instanceof UltraLogLog) {
+ return (int) Math.round(((UltraLogLog)
intermediateResult).getDistinctCountEstimate());
+ } else {
+ return ((Set) intermediateResult).size();
+ }
+ }
+
+ @Override
+ public Integer mergeFinalResult(Integer finalResult1, Integer finalResult2) {
+ return finalResult1 + finalResult2;
+ }
+
+ /**
+ * Returns the dictionary id bitmap from the result holder or creates a new
one if it does not exist.
+ */
+ // getDictIdBitmap for result holder is provided by the base class
+
+ /**
+ * Returns the value set from the result holder or creates a new one if it
does not exist.
+ */
+ // value set helpers are provided by the base class
+
+ /**
+ * Returns the dictionary id bitmap for the given group key or creates a new
one if it does not exist.
+ */
+ // groupBy result helpers are provided by the base class
+
+ /**
+ * Returns the value set for the given group key or creates a new one if it
does not exist.
+ */
+ // group-by value set helper is provided by the base class
+
+ /**
+ * Helper method to set dictionary id for the given group keys into the
result holder.
+ */
+ // setDictIdForGroupKeys is provided by the base class
+
+ /**
+ * Helper method to set INT value for the given group keys into the result
holder.
+ */
+ // typed setValueForGroupKeys helpers are provided by the base class
+
+ /**
+ * Helper method to set LONG value for the given group keys into the result
holder.
+ */
+
+ /**
+ * Helper method to set FLOAT value for the given group keys into the result
holder.
+ */
+
+ /**
+ * Helper method to set DOUBLE value for the given group keys into the
result holder.
+ */
+
+ /**
+ * Helper method to set STRING value for the given group keys into the
result holder.
+ */
+
+ /**
+ * Helper method to set BYTES value for the given group keys into the result
holder.
+ */
+ // typed setValueForGroupKeys helpers are provided by the base class
+
+ /**
+ * Helper method to read dictionary and convert dictionary ids to a value
set for dictionary-encoded expression.
+ */
+ // value set conversion handled by the base class
+
+ /**
+ * Helper method to read dictionary and convert dictionary ids to an
UltraLogLog for dictionary-encoded expression.
+ */
+ private UltraLogLog
convertToULL(BaseDistinctCountSmartSketchAggregationFunction.DictIdsWrapper
dictIdsWrapper) {
+ UltraLogLog ull = UltraLogLog.create(_p);
+ Dictionary dictionary = dictIdsWrapper._dictionary;
+ RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
+ PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+ while (iterator.hasNext()) {
+
UltraLogLogUtils.hashObject(dictionary.get(iterator.next())).ifPresent(ull::add);
+ }
+ return ull;
+ }
+
+ @Override
+ protected IllegalStateException getIllegalDataTypeException(DataType
dataType, boolean singleValue) {
+ return new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_SMART_ULL aggregation function:
" + dataType + (singleValue ? ""
+ : "_MV"));
+ }
+
+ // DictIdsWrapper is provided by the base class
+
+ // threshold accessor for base class is provided by getThreshold()
+
+ @Override
+ protected Object convertSetToSketch(Set valueSet, DataType storedType) {
+ return convertSetToULL(valueSet, storedType);
+ }
+
+ @Override
+ protected Object
convertToSketch(BaseDistinctCountSmartSketchAggregationFunction.DictIdsWrapper
dictIdsWrapper) {
+ return convertToULL(dictIdsWrapper);
+ }
+
+ /**
+ * Helper class to wrap the parameters.
+ */
+ private static class Parameters {
+ static final char PARAMETER_DELIMITER = ';';
+ static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
+
+ static final String THRESHOLD_KEY = "THRESHOLD";
+ static final int DEFAULT_THRESHOLD = 100_000;
+
+ static final String P_KEY = "P";
+
+ int _threshold = DEFAULT_THRESHOLD;
+ int _p =
org.apache.pinot.spi.utils.CommonConstants.Helix.DEFAULT_ULTRALOGLOG_P;
+
+ Parameters(String parametersString) {
+ parametersString = StringUtils.deleteWhitespace(parametersString);
+ String[] keyValuePairs = StringUtils.split(parametersString,
PARAMETER_DELIMITER);
+ for (String keyValuePair : keyValuePairs) {
+ String[] keyAndValue = StringUtils.split(keyValuePair,
PARAMETER_KEY_VALUE_SEPARATOR);
+ Preconditions.checkArgument(keyAndValue.length == 2, "Invalid
parameter: %s", keyValuePair);
+ String key = keyAndValue[0];
+ String value = keyAndValue[1];
+ switch (key.toUpperCase()) {
+ case THRESHOLD_KEY:
+ _threshold = Integer.parseInt(value);
+ if (_threshold <= 0) {
+ _threshold = Integer.MAX_VALUE;
+ }
+ break;
+ case P_KEY:
+ _p = Integer.parseInt(value);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid parameter key: " +
key);
+ }
+ }
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
index e4045a9a02f..1a036ae5342 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
@@ -47,8 +47,8 @@ public class DistinctCountULLAggregationFunction extends
BaseSingleInputAggregat
public DistinctCountULLAggregationFunction(List<ExpressionContext>
arguments) {
super(arguments.get(0));
int numExpressions = arguments.size();
- // This function expects 1 or 2 or 3 arguments.
- Preconditions.checkArgument(numExpressions <= 2, "DistinctCountHLLPlus
expects 1 or 2 arguments, got: %s",
+ // This function expects 1 or 2 arguments.
+ Preconditions.checkArgument(numExpressions <= 2, "DistinctCountULL expects
1 or 2 arguments, got: %s",
numExpressions);
if (arguments.size() == 2) {
_p = arguments.get(1).getLiteral().getIntValue();
@@ -128,19 +128,19 @@ public class DistinctCountULLAggregationFunction extends
BaseSingleInputAggregat
case FLOAT:
float[] floatValues = blockValSet.getFloatValuesSV();
for (int i = 0; i < length; i++) {
- UltraLogLogUtils.hashObject(floatValues[i]).ifPresent(ull::add);
+ UltraLogLogUtils.hashObject(floatValues[i]).ifPresent(ull::add);
}
break;
case DOUBLE:
double[] doubleValues = blockValSet.getDoubleValuesSV();
for (int i = 0; i < length; i++) {
- UltraLogLogUtils.hashObject(doubleValues[i]).ifPresent(ull::add);
+ UltraLogLogUtils.hashObject(doubleValues[i]).ifPresent(ull::add);
}
break;
case STRING:
String[] stringValues = blockValSet.getStringValuesSV();
for (int i = 0; i < length; i++) {
- UltraLogLogUtils.hashObject(stringValues[i]).ifPresent(ull::add);
+ UltraLogLogUtils.hashObject(stringValues[i]).ifPresent(ull::add);
}
break;
default:
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
index e3c6b6240fc..be1666a9b6a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
@@ -521,6 +521,50 @@ public class DistinctCountQueriesTest extends
BaseQueriesTest {
assertEquals(function.getLog2m(), 8);
}
+ @Test
+ public void testSmartULL() {
+ String query = "SELECT DISTINCTCOUNTSMARTULL(intColumn, 'threshold=10'), "
+ + "DISTINCTCOUNTSMARTULL(longColumn, 'threshold=10'),
DISTINCTCOUNTSMARTULL(floatColumn, 'threshold=10'), "
+ + "DISTINCTCOUNTSMARTULL(doubleColumn, 'threshold=10'),
DISTINCTCOUNTSMARTULL(stringColumn, 'threshold=10') "
+ + "FROM testTable";
+
+ Object[] interSegmentsExpectedResults = new Object[5];
+ for (Object operator : Arrays.asList(getOperator(query),
getOperatorWithFilter(query))) {
+ assertTrue(operator instanceof NonScanBasedAggregationOperator);
+ AggregationResultsBlock resultsBlock =
((NonScanBasedAggregationOperator) operator).nextBlock();
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator)
operator).getExecutionStatistics(), NUM_RECORDS,
+ 0, 0, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getResults();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 5);
+ for (int i = 0; i < 5; i++) {
+ // After threshold promotion, result should be ULL object
+ assertTrue(aggregationResult.get(i) instanceof
com.dynatrace.hash4j.distinctcount.UltraLogLog);
+ com.dynatrace.hash4j.distinctcount.UltraLogLog ull =
+ (com.dynatrace.hash4j.distinctcount.UltraLogLog)
aggregationResult.get(i);
+ int actual = (int) Math.round(ull.getDistinctCountEstimate());
+ int expected = _values.size();
+ // ULL with default p provides high accuracy; allow 5% error similar
to HLL(log2m=12)
+ assertEquals(actual, expected, expected * 0.05);
+ interSegmentsExpectedResults[i] = actual;
+ }
+ }
+
+ for (BrokerResponseNative brokerResponse :
Arrays.asList(getBrokerResponse(query),
+ getBrokerResponseWithFilter(query))) {
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 *
NUM_RECORDS, 0, 0, 4 * NUM_RECORDS,
+ interSegmentsExpectedResults);
+ }
+
+ // Change p via parameters
+ query = "SELECT DISTINCTCOUNTSMARTULL(intColumn, 'threshold=10;p=16') FROM
testTable";
+ NonScanBasedAggregationOperator nonScanOperator = getOperator(query);
+ List<Object> aggregationResult = nonScanOperator.nextBlock().getResults();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 1);
+ assertTrue(aggregationResult.get(0) instanceof
com.dynatrace.hash4j.distinctcount.UltraLogLog);
+ }
+
@AfterClass
public void tearDown()
throws IOException {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index f74807f9f29..111c57c7b16 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -68,7 +68,10 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.apache.pinot.common.function.scalar.StringFunctions.*;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestSet {
@@ -259,11 +262,12 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String[] numericResultFunctions = new String[]{
"distinctCount", "distinctCountBitmap", "distinctCountHLL",
"segmentPartitionedDistinctCount",
- "distinctCountSmartHLL", "distinctCountThetaSketch", "distinctSum",
"distinctAvg"
+ "distinctCountSmartHLL", "distinctCountULL", "distinctCountSmartULL",
"distinctCountThetaSketch",
+ "distinctSum", "distinctAvg"
};
double[] expectedNumericResults = new double[]{
- 364, 364, 355, 364, 364, 364, 5915969, 16252.662087912087
+ 364, 364, 355, 364, 364, 364, 364, 364, 5915969, 16252.662087912087
};
Assert.assertEquals(numericResultFunctions.length,
expectedNumericResults.length);
@@ -377,8 +381,7 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
@Test
public void testUnsupportedUdfOnIntermediateStage()
throws Exception {
- String sqlQuery = ""
- + "SET timeoutMs=10000;\n" // In older versions we timeout in this
case, but we should fail fast now
+ String sqlQuery = "SET timeoutMs=10000;\n" // In older versions we timeout
in this case, but we should fail fast now
+ "WITH fakeTable AS (\n" // this table is used to make sure the call
is made on an intermediate stage
+ " SELECT \n"
+ " t1.DaysSinceEpoch + t2.DaysSinceEpoch as DaysSinceEpoch"
@@ -1555,7 +1558,8 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
}
@Test
- public void testCaseInsensitiveNames() throws Exception {
+ public void testCaseInsensitiveNames()
+ throws Exception {
String query = "select ACTualELAPsedTIMe from mYtABLE where
actUALelAPSedTIMe > 0 limit 1";
JsonNode jsonNode = postQuery(query);
long result =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -1564,7 +1568,8 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
}
@Test
- public void testCaseInsensitiveNamesAgainstController() throws Exception {
+ public void testCaseInsensitiveNamesAgainstController()
+ throws Exception {
String query = "select ACTualELAPsedTIMe from mYtABLE where
actUALelAPSedTIMe > 0 limit 1";
JsonNode jsonNode = postQueryToController(query);
long result =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -1573,7 +1578,8 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
}
@Test
- public void testQueryCompileBrokerTimeout() throws Exception {
+ public void testQueryCompileBrokerTimeout()
+ throws Exception {
// The sleep function is called with a literal value so it should be
evaluated during the query compile phase
String query = "SET timeoutMs=100; SELECT sleep(1000) FROM mytable";
@@ -1598,7 +1604,8 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
}
@Test
- public void testNumServersQueried() throws Exception {
+ public void testNumServersQueried()
+ throws Exception {
String query = "select * from mytable limit 10";
JsonNode jsonNode = postQuery(query);
JsonNode numServersQueried = jsonNode.get("numServersQueried");
@@ -1608,7 +1615,8 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
}
@Test
- public void testLookupJoin() throws Exception {
+ public void testLookupJoin()
+ throws Exception {
Schema lookupTableSchema = createSchema(DIM_TABLE_SCHEMA_PATH);
addSchema(lookupTableSchema);
@@ -1645,7 +1653,8 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
dropOfflineTable(tableConfig.getTableName());
}
- public void testSearchLiteralFilter() throws Exception {
+ public void testSearchLiteralFilter()
+ throws Exception {
String sqlQuery =
"WITH CTE_B AS (SELECT 1692057600000 AS __ts FROM mytable GROUP BY
__ts) SELECT 1692057600000 AS __ts FROM "
+ "CTE_B WHERE __ts >= 1692057600000 AND __ts < 1693267200000
GROUP BY __ts";
@@ -1670,7 +1679,8 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
}
@Test
- public void testPolymorphicScalarArrayFunctions() throws Exception {
+ public void testPolymorphicScalarArrayFunctions()
+ throws Exception {
String query = "select ARRAY_LENGTH(ARRAY[1,2,3]);";
JsonNode jsonNode = postQuery(query);
assertNoError(jsonNode);
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index ede51582739..4203f891069 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -95,6 +95,8 @@ public enum AggregationFunctionType {
OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER), i
-> i == 1), SqlTypeName.OTHER),
DISTINCTCOUNTRAWULL("distinctCountRawULL", ReturnTypes.VARCHAR,
OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER), i
-> i == 1), SqlTypeName.OTHER),
+ DISTINCTCOUNTSMARTULL("distinctCountSmartULL", ReturnTypes.BIGINT,
+ OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER),
i -> i == 1), SqlTypeName.OTHER),
DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch", ReturnTypes.BIGINT,
OperandTypes.ONE_OR_MORE, SqlTypeName.OTHER),
DISTINCTCOUNTRAWTHETASKETCH("distinctCountRawThetaSketch",
ReturnTypes.VARCHAR, OperandTypes.ONE_OR_MORE,
SqlTypeName.OTHER),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]