This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3501b869c7 Add hyperLogLogPlus aggregation function for distinct count
(#11346)
3501b869c7 is described below
commit 3501b869c7ece2ddd088daf34c68a15ad6740a3a
Author: deemoliu <[email protected]>
AuthorDate: Thu Sep 21 16:48:17 2023 -0700
Add hyperLogLogPlus aggregation function for distinct count (#11346)
* Add hyperLogLogPlus aggregation function for distinct count
* address code comments
* address code comments
---
.../apache/pinot/core/common/ObjectSerDeUtils.java | 36 +-
.../query/NonScanBasedAggregationOperator.java | 46 +-
.../pinot/core/plan/AggregationPlanNode.java | 3 +-
.../function/AggregationFunctionFactory.java | 8 +
.../DistinctCountHLLPlusAggregationFunction.java | 471 +++++++++++++++++++++
.../DistinctCountHLLPlusMVAggregationFunction.java | 265 ++++++++++++
...DistinctCountRawHLLPlusAggregationFunction.java | 115 +++++
...stinctCountRawHLLPlusMVAggregationFunction.java | 36 ++
.../function/AggregationFunctionFactoryTest.java | 24 ++
...terSegmentAggregationMultiValueQueriesTest.java | 75 ++++
...SegmentAggregationMultiValueRawQueriesTest.java | 75 ++++
...erSegmentAggregationSingleValueQueriesTest.java | 60 +++
.../pinot/queries/SerializedBytesQueriesTest.java | 121 +++++-
.../tests/OfflineClusterIntegrationTest.java | 28 +-
.../DistinctCountHLLPlusValueAggregator.java | 125 ++++++
.../local/aggregator/ValueAggregatorFactory.java | 6 +
.../local/customobject/SerializedHLLPlus.java | 42 ++
.../segment/local/utils/CustomSerDeUtils.java | 29 ++
.../segment/local/utils/HyperLogLogPlusUtils.java | 43 ++
.../pinot/segment/spi/AggregationFunctionType.java | 16 +
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
21 files changed, 1613 insertions(+), 13 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 69c00ea5b8..4f05bad761 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.common;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.RegisterSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -139,7 +140,9 @@ public class ObjectSerDeUtils {
KllDataSketch(36),
IntegerTupleSketch(37),
FrequentStringsSketch(38),
- FrequentLongsSketch(39);
+ FrequentLongsSketch(39),
+ HyperLogLogPlus(40);
+
private final int _value;
@@ -235,6 +238,8 @@ public class ObjectSerDeUtils {
return ObjectType.FrequentStringsSketch;
} else if (value instanceof LongsSketch) {
return ObjectType.FrequentLongsSketch;
+ } else if (value instanceof HyperLogLogPlus) {
+ return ObjectType.HyperLogLogPlus;
} else {
throw new IllegalArgumentException("Unsupported type of value: " +
value.getClass().getSimpleName());
}
@@ -563,6 +568,34 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<HyperLogLogPlus> HYPER_LOG_LOG_PLUS_SER_DE =
new ObjectSerDe<HyperLogLogPlus>() {
+
+ @Override
+ public byte[] serialize(HyperLogLogPlus hyperLogLogPlus) {
+ try {
+ return hyperLogLogPlus.getBytes();
+ } catch (IOException e) {
+ throw new RuntimeException("Caught exception while serializing
HyperLogLogPlus", e);
+ }
+ }
+
+ @Override
+ public HyperLogLogPlus deserialize(byte[] bytes) {
+ try {
+ return HyperLogLogPlus.Builder.build(bytes);
+ } catch (IOException e) {
+ throw new RuntimeException("Caught exception while serializing
HyperLogLogPlus", e);
+ }
+ }
+
+ @Override
+ public HyperLogLogPlus deserialize(ByteBuffer byteBuffer) {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return deserialize(bytes);
+ }
+ };
+
public static final ObjectSerDe<DistinctTable> DISTINCT_TABLE_SER_DE = new
ObjectSerDe<DistinctTable>() {
@Override
@@ -1377,6 +1410,7 @@ public class ObjectSerDeUtils {
DATA_SKETCH_INT_TUPLE_SER_DE,
FREQUENT_STRINGS_SKETCH_SER_DE,
FREQUENT_LONGS_SKETCH_SER_DE,
+ HYPER_LOG_LOG_PLUS_SER_DE,
};
//@formatter:on
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
index 2060135d38..fb6e8d02f6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.query;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@@ -36,7 +37,9 @@ import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.DistinctCountHLLAggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.DistinctCountHLLPlusAggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLPlusAggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
@@ -50,8 +53,8 @@ import org.apache.pinot.spi.utils.ByteArray;
* Aggregation operator that utilizes dictionary or column metadata for
serving aggregation queries to avoid scanning.
* The scanless operator is selected in the plan maker, if the query is of
aggregation type min, max, minmaxrange,
* distinctcount, distinctcounthll, distinctcountrawhll,
segmentpartitioneddistinctcount, distinctcountsmarthll,
- * and the column has a dictionary, or has column metadata with min and max
value defined. It also supports count(*) if
- * the query has no filter.
+ * distinctcounthllplus, distinctcountrawhllplus, and the column has a
dictionary, or has column metadata with min and
+ * max value defined. It also supports count(*) if the query has no filter.
* We don't use this operator if the segment has star tree,
* as the dictionary will have aggregated values for the metrics, and
dimensions will have star node value.
*
@@ -118,6 +121,17 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
result =
getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
((DistinctCountRawHLLAggregationFunction)
aggregationFunction).getDistinctCountHLLAggregationFunction());
break;
+ case DISTINCTCOUNTHLLPLUS:
+ case DISTINCTCOUNTHLLPLUSMV:
+ result =
getDistinctCountHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()),
+ (DistinctCountHLLPlusAggregationFunction) aggregationFunction);
+ break;
+ case DISTINCTCOUNTRAWHLLPLUS:
+ case DISTINCTCOUNTRAWHLLPLUSMV:
+ result =
getDistinctCountHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()),
+ ((DistinctCountRawHLLPlusAggregationFunction)
aggregationFunction)
+ .getDistinctCountHLLPlusAggregationFunction());
+ break;
case SEGMENTPARTITIONEDDISTINCTCOUNT:
result = (long)
Objects.requireNonNull(dataSource.getDictionary()).length();
break;
@@ -215,6 +229,15 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
return hll;
}
+ private static HyperLogLogPlus getDistinctValueHLLPlus(Dictionary
dictionary, int p, int sp) {
+ HyperLogLogPlus hllPlus = new HyperLogLogPlus(p, sp);
+ int length = dictionary.length();
+ for (int i = 0; i < length; i++) {
+ hllPlus.offer(dictionary.get(i));
+ }
+ return hllPlus;
+ }
+
private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary,
DistinctCountHLLAggregationFunction function) {
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
@@ -234,6 +257,25 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
}
}
+ private static HyperLogLogPlus getDistinctCountHLLPlusResult(Dictionary
dictionary,
+ DistinctCountHLLPlusAggregationFunction function) {
+ if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+ // Treat BYTES value as serialized HyperLogLogPlus
+ try {
+ HyperLogLogPlus hllplus =
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(0));
+ int length = dictionary.length();
+ for (int i = 1; i < length; i++) {
+
hllplus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(i)));
+ }
+ return hllplus;
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging
HyperLogLogPluses", e);
+ }
+ } else {
+ return getDistinctValueHLLPlus(dictionary, function.getP(),
function.getSp());
+ }
+ }
+
private static Object getDistinctCountSmartHLLResult(Dictionary dictionary,
DistinctCountSmartHLLAggregationFunction function) {
if (dictionary.length() > function.getThreshold()) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 2f53b0d238..95a61c10be 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -56,7 +56,8 @@ public class AggregationPlanNode implements PlanNode {
private static final EnumSet<AggregationFunctionType>
DICTIONARY_BASED_FUNCTIONS =
EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV,
DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTCOUNTHLL,
DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV,
SEGMENTPARTITIONEDDISTINCTCOUNT,
- DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV,
DISTINCTAVGMV);
+ DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV,
DISTINCTAVGMV, DISTINCTCOUNTHLLPLUS,
+ DISTINCTCOUNTHLLPLUSMV, DISTINCTCOUNTRAWHLLPLUS,
DISTINCTCOUNTRAWHLLPLUSMV);
// DISTINCTCOUNT excluded because consuming segment metadata contains
unknown cardinality when there is no dictionary
private static final EnumSet<AggregationFunctionType>
METADATA_BASED_FUNCTIONS =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 2d69c093f2..c426ed472f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -298,6 +298,14 @@ public class AggregationFunctionFactory {
return new DistinctCountHLLMVAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLLMV:
return new DistinctCountRawHLLMVAggregationFunction(arguments);
+ case DISTINCTCOUNTHLLPLUS:
+ return new DistinctCountHLLPlusAggregationFunction(arguments);
+ case DISTINCTCOUNTRAWHLLPLUS:
+ return new DistinctCountRawHLLPlusAggregationFunction(arguments);
+ case DISTINCTCOUNTHLLPLUSMV:
+ return new DistinctCountHLLPlusMVAggregationFunction(arguments);
+ case DISTINCTCOUNTRAWHLLPLUSMV:
+ return new DistinctCountRawHLLPlusMVAggregationFunction(arguments);
case DISTINCTSUMMV:
return new DistinctSumMVAggregationFunction(arguments);
case DISTINCTAVGMV:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusAggregationFunction.java
new file mode 100644
index 0000000000..2ca7d4eec3
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusAggregationFunction.java
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class DistinctCountHLLPlusAggregationFunction extends
BaseSingleInputAggregationFunction<HyperLogLogPlus, Long> {
+ // The parameter "p" determines the precision of the sparse list in
HyperLogLogPlus.
+ protected final int _p;
+ // The "sp" parameter specifies the number of standard deviations that the
sparse list's precision should be set to.
+ protected final int _sp;
+
+ public DistinctCountHLLPlusAggregationFunction(List<ExpressionContext>
arguments) {
+ super(arguments.get(0));
+ int numExpressions = arguments.size();
+ // This function expects 1 or 2 or 3 arguments.
+ Preconditions.checkArgument(numExpressions <= 3, "DistinctCountHLLPlus
expects 2 or 3 arguments, got: %s",
+ numExpressions);
+ if (arguments.size() == 2) {
+ _p = arguments.get(1).getLiteral().getIntValue();
+ _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP;
+ } else if (arguments.size() == 3) {
+ _p = arguments.get(1).getLiteral().getIntValue();
+ _sp = arguments.get(2).getLiteral().getIntValue();
+ } else {
+ _p = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_P;
+ _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP;
+ }
+ }
+
+ public int getP() {
+ return _p;
+ }
+
+ public int getSp() {
+ return _sp;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTHLLPLUS;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized HyperLogLogPlus
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ if (storedType == DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ HyperLogLogPlus hyperLogLogPlus = aggregationResultHolder.getResult();
+ if (hyperLogLogPlus == null) {
+ hyperLogLogPlus =
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[0]);
+ aggregationResultHolder.setValue(hyperLogLogPlus);
+ } else {
+
hyperLogLogPlus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[0]));
+ }
+ for (int i = 1; i < length; i++) {
+
hyperLogLogPlus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i]));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging
HyperLogLogPlus", e);
+ }
+ return;
+ }
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ getDictIdBitmap(aggregationResultHolder, dictionary).addN(dictIds, 0,
length);
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the
HyperLogLogPlus
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(aggregationResultHolder);
+ switch (storedType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ hyperLogLogPlus.offer(intValues[i]);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ hyperLogLogPlus.offer(longValues[i]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ hyperLogLogPlus.offer(floatValues[i]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ hyperLogLogPlus.offer(doubleValues[i]);
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ hyperLogLogPlus.offer(stringValues[i]);
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_HLL_PLUS aggregation
function: " + storedType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized HyperLogLogPlus
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ if (storedType == DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ for (int i = 0; i < length; i++) {
+ HyperLogLogPlus value =
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i]);
+ int groupKey = groupKeyArray[i];
+ HyperLogLogPlus hyperLogLogPlus =
groupByResultHolder.getResult(groupKey);
+ if (hyperLogLogPlus != null) {
+ hyperLogLogPlus.addAll(value);
+ } else {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging
HyperLogLogPlus", e);
+ }
+ return;
+ }
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ for (int i = 0; i < length; i++) {
+ getDictIdBitmap(groupByResultHolder, groupKeyArray[i],
dictionary).add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the
HyperLogLogPlus
+ switch (storedType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ getHyperLogLogPlus(groupByResultHolder,
groupKeyArray[i]).offer(intValues[i]);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ getHyperLogLogPlus(groupByResultHolder,
groupKeyArray[i]).offer(longValues[i]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ getHyperLogLogPlus(groupByResultHolder,
groupKeyArray[i]).offer(floatValues[i]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ getHyperLogLogPlus(groupByResultHolder,
groupKeyArray[i]).offer(doubleValues[i]);
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ getHyperLogLogPlus(groupByResultHolder,
groupKeyArray[i]).offer(stringValues[i]);
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_HLL_PLUS aggregation
function: " + storedType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized HyperLogLogPlus
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ if (storedType == DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ for (int i = 0; i < length; i++) {
+ HyperLogLogPlus value =
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i]);
+ for (int groupKey : groupKeysArray[i]) {
+ HyperLogLogPlus hyperLogLogPlus =
groupByResultHolder.getResult(groupKey);
+ if (hyperLogLogPlus != null) {
+ hyperLogLogPlus.addAll(value);
+ } else {
+ // Create a new HyperLogLogPlus for the group
+ groupByResultHolder.setValueForKey(groupKey,
+
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i]));
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging
HyperLogLogPlus", e);
+ }
+ return;
+ }
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ for (int i = 0; i < length; i++) {
+ setDictIdForGroupKeys(groupByResultHolder, groupKeysArray[i],
dictionary, dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the
HyperLogLogPlus
+ switch (storedType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
intValues[i]);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
longValues[i]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
floatValues[i]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
doubleValues[i]);
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
stringValues[i]);
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_HLL_PLUS aggregation
function: " + storedType);
+ }
+ }
+
+ @Override
+ public HyperLogLogPlus extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ Object result = aggregationResultHolder.getResult();
+ if (result == null) {
+ return new HyperLogLogPlus(_p, _sp);
+ }
+
+ if (result instanceof DictIdsWrapper) {
+ // For dictionary-encoded expression, convert dictionary ids to
HyperLogLogPlus
+ return convertToHyperLogLogPlus((DictIdsWrapper) result);
+ } else {
+ // For non-dictionary-encoded expression, directly return the
HyperLogLogPlus
+ return (HyperLogLogPlus) result;
+ }
+ }
+
+ @Override
+ public HyperLogLogPlus extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ Object result = groupByResultHolder.getResult(groupKey);
+ if (result == null) {
+ return new HyperLogLogPlus(_p, _sp);
+ }
+
+ if (result instanceof DictIdsWrapper) {
+ // For dictionary-encoded expression, convert dictionary ids to
HyperLogLogPlus
+ return convertToHyperLogLogPlus((DictIdsWrapper) result);
+ } else {
+ // For non-dictionary-encoded expression, directly return the
HyperLogLogPlus
+ return (HyperLogLogPlus) result;
+ }
+ }
+
+ @Override
+ public HyperLogLogPlus merge(HyperLogLogPlus intermediateResult1,
HyperLogLogPlus intermediateResult2) {
+ // Can happen when aggregating serialized HyperLogLogPlus with non-default
p, sp values
+ if (intermediateResult1.sizeof() != intermediateResult2.sizeof()) {
+ if (intermediateResult1.cardinality() == 0) {
+ return intermediateResult2;
+ } else {
+ Preconditions.checkState(intermediateResult2.cardinality() == 0,
+ "Cannot merge HyperLogLogPlus of different sizes");
+ return intermediateResult1;
+ }
+ }
+ try {
+ intermediateResult1.addAll(intermediateResult2);
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging
HyperLogLogPlus", e);
+ }
+ return intermediateResult1;
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public Long extractFinalResult(HyperLogLogPlus intermediateResult) {
+ return intermediateResult.cardinality();
+ }
+
+ /**
+ * Returns the dictionary id bitmap from the result holder or creates a new
one if it does not exist.
+ */
+ protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder
aggregationResultHolder,
+ Dictionary dictionary) {
+ DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
+ if (dictIdsWrapper == null) {
+ dictIdsWrapper = new DictIdsWrapper(dictionary);
+ aggregationResultHolder.setValue(dictIdsWrapper);
+ }
+ return dictIdsWrapper._dictIdBitmap;
+ }
+
+ /**
+ * Returns the HyperLogLogPlus from the result holder or creates a new one
if it does not exist.
+ */
+ protected HyperLogLogPlus getHyperLogLogPlus(AggregationResultHolder
aggregationResultHolder) {
+ HyperLogLogPlus hyperLogLogPlus = aggregationResultHolder.getResult();
+ if (hyperLogLogPlus == null) {
+ hyperLogLogPlus = new HyperLogLogPlus(_p, _sp);
+ aggregationResultHolder.setValue(hyperLogLogPlus);
+ }
+ return hyperLogLogPlus;
+ }
+
+ /**
+ * Returns the dictionary id bitmap for the given group key or creates a new
one if it does not exist.
+ */
+ protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder
groupByResultHolder, int groupKey,
+ Dictionary dictionary) {
+ DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey);
+ if (dictIdsWrapper == null) {
+ dictIdsWrapper = new DictIdsWrapper(dictionary);
+ groupByResultHolder.setValueForKey(groupKey, dictIdsWrapper);
+ }
+ return dictIdsWrapper._dictIdBitmap;
+ }
+
+ /**
+ * Returns the HyperLogLogPlus for the given group key or creates a new one
if it does not exist.
+ */
+ protected HyperLogLogPlus getHyperLogLogPlus(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ HyperLogLogPlus hyperLogLogPlus = groupByResultHolder.getResult(groupKey);
+ if (hyperLogLogPlus == null) {
+ hyperLogLogPlus = new HyperLogLogPlus(_p, _sp);
+ groupByResultHolder.setValueForKey(groupKey, hyperLogLogPlus);
+ }
+ return hyperLogLogPlus;
+ }
+
+ /**
+ * Helper method to set dictionary id for the given group keys into the
result holder.
+ */
+ private static void setDictIdForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys,
+ Dictionary dictionary, int dictId) {
+ for (int groupKey : groupKeys) {
+ getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId);
+ }
+ }
+
+ /**
+ * Helper method to set value for the given group keys into the result
holder.
+ */
+ private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder,
int[] groupKeys, Object value) {
+ for (int groupKey : groupKeys) {
+ getHyperLogLogPlus(groupByResultHolder, groupKey).offer(value);
+ }
+ }
+
+ /**
+ * Helper method to read dictionary and convert dictionary ids to
HyperLogLogPlus for dictionary-encoded expression.
+ */
+ private HyperLogLogPlus convertToHyperLogLogPlus(DictIdsWrapper
dictIdsWrapper) {
+ HyperLogLogPlus hyperLogLogPlus = new HyperLogLogPlus(_p, _sp);
+ Dictionary dictionary = dictIdsWrapper._dictionary;
+ RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
+ PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+ while (iterator.hasNext()) {
+ hyperLogLogPlus.offer(dictionary.get(iterator.next()));
+ }
+ return hyperLogLogPlus;
+ }
+
+ private static final class DictIdsWrapper {
+ final Dictionary _dictionary;
+ final RoaringBitmap _dictIdBitmap;
+
+ private DictIdsWrapper(Dictionary dictionary) {
+ _dictionary = dictionary;
+ _dictIdBitmap = new RoaringBitmap();
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusMVAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusMVAggregationFunction.java
new file mode 100644
index 0000000000..00abb1f5d2
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusMVAggregationFunction.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class DistinctCountHLLPlusMVAggregationFunction extends
DistinctCountHLLPlusAggregationFunction {
+
+ public DistinctCountHLLPlusMVAggregationFunction(List<ExpressionContext>
arguments) {
+ super(arguments);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTHLLPLUSMV;
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder,
dictionary);
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ dictIdBitmap.add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the HyperLogLog
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(aggregationResultHolder);
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ switch (storedType) {
+ case INT:
+ int[][] intValuesArray = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int value : intValuesArray[i]) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ break;
+ case LONG:
+ long[][] longValuesArray = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (long value : longValuesArray[i]) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValuesArray = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (float value : floatValuesArray[i]) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ break;
+ case DOUBLE:
+ double[][] doubleValuesArray = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (double value : doubleValuesArray[i]) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValuesArray = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (String value : stringValuesArray[i]) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_HLL_MV aggregation function:
" + storedType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ getDictIdBitmap(groupByResultHolder, groupKeyArray[i],
dictionary).add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the HyperLogLog
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ switch (storedType) {
+ case INT:
+ int[][] intValuesArray = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]);
+ for (int value : intValuesArray[i]) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ break;
+ case LONG:
+ long[][] longValuesArray = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]);
+ for (long value : longValuesArray[i]) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValuesArray = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]);
+ for (float value : floatValuesArray[i]) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ break;
+ case DOUBLE:
+ double[][] doubleValuesArray = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]);
+ for (double value : doubleValuesArray[i]) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValuesArray = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]);
+ for (String value : stringValuesArray[i]) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_HLL_MV aggregation function:
" + storedType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ getDictIdBitmap(groupByResultHolder, groupKey,
dictionary).add(dictIds[i]);
+ }
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the HyperLogLog
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ switch (storedType) {
+ case INT:
+ int[][] intValuesArray = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ int[] intValues = intValuesArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(groupByResultHolder, groupKey);
+ for (int value : intValues) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ }
+ break;
+ case LONG:
+ long[][] longValuesArray = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ long[] longValues = longValuesArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(groupByResultHolder, groupKey);
+ for (long value : longValues) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValuesArray = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ float[] floatValues = floatValuesArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(groupByResultHolder, groupKey);
+ for (float value : floatValues) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ }
+ break;
+ case DOUBLE:
+ double[][] doubleValuesArray = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ double[] doubleValues = doubleValuesArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(groupByResultHolder, groupKey);
+ for (double value : doubleValues) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValuesArray = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ String[] stringValues = stringValuesArray[i];
+ for (int groupKey : groupKeysArray[i]) {
+ HyperLogLogPlus hyperLogLogPlus =
getHyperLogLogPlus(groupByResultHolder, groupKey);
+ for (String value : stringValues) {
+ hyperLogLogPlus.offer(value);
+ }
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_HLL_MV aggregation function:
" + storedType);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusAggregationFunction.java
new file mode 100644
index 0000000000..facef6a222
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusAggregationFunction.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.SerializedHLLPlus;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class DistinctCountRawHLLPlusAggregationFunction
+ extends BaseSingleInputAggregationFunction<HyperLogLogPlus,
SerializedHLLPlus> {
+ private final DistinctCountHLLPlusAggregationFunction
_distinctCountHLLPlusAggregationFunction;
+
+ public DistinctCountRawHLLPlusAggregationFunction(List<ExpressionContext>
arguments) {
+ this(arguments.get(0), new
DistinctCountHLLPlusAggregationFunction(arguments));
+ }
+
+ DistinctCountRawHLLPlusAggregationFunction(ExpressionContext expression,
+ DistinctCountHLLPlusAggregationFunction
distinctCountHLLPlusAggregationFunction) {
+ super(expression);
+ _distinctCountHLLPlusAggregationFunction =
distinctCountHLLPlusAggregationFunction;
+ }
+
+ public DistinctCountHLLPlusAggregationFunction
getDistinctCountHLLPlusAggregationFunction() {
+ return _distinctCountHLLPlusAggregationFunction;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUS;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return
_distinctCountHLLPlusAggregationFunction.createAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return
_distinctCountHLLPlusAggregationFunction.createGroupByResultHolder(initialCapacity,
maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _distinctCountHLLPlusAggregationFunction.aggregate(length,
aggregationResultHolder, blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _distinctCountHLLPlusAggregationFunction.aggregateGroupBySV(length,
groupKeyArray, groupByResultHolder,
+ blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _distinctCountHLLPlusAggregationFunction.aggregateGroupByMV(length,
groupKeysArray, groupByResultHolder,
+ blockValSetMap);
+ }
+
+ @Override
+ public HyperLogLogPlus extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ return
_distinctCountHLLPlusAggregationFunction.extractAggregationResult(aggregationResultHolder);
+ }
+
+ @Override
+ public HyperLogLogPlus extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ return
_distinctCountHLLPlusAggregationFunction.extractGroupByResult(groupByResultHolder,
groupKey);
+ }
+
+ @Override
+ public HyperLogLogPlus merge(HyperLogLogPlus intermediateResult1,
HyperLogLogPlus intermediateResult2) {
+ return _distinctCountHLLPlusAggregationFunction.merge(intermediateResult1,
intermediateResult2);
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return
_distinctCountHLLPlusAggregationFunction.getIntermediateResultColumnType();
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.STRING;
+ }
+
+ @Override
+ public SerializedHLLPlus extractFinalResult(HyperLogLogPlus
intermediateResult) {
+ return new SerializedHLLPlus(intermediateResult);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusMVAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusMVAggregationFunction.java
new file mode 100644
index 0000000000..6ae2d04996
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusMVAggregationFunction.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class DistinctCountRawHLLPlusMVAggregationFunction extends
DistinctCountRawHLLPlusAggregationFunction {
+
+ public DistinctCountRawHLLPlusMVAggregationFunction(List<ExpressionContext>
arguments) {
+ super(arguments.get(0), new
DistinctCountHLLPlusMVAggregationFunction(arguments));
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUSMV;
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index de8abca960..0caf40536b 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -172,6 +172,18 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getType(),
AggregationFunctionType.DISTINCTCOUNTRAWHLL);
assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+ function = getFunction("DiStInCtCoUnThLlPlUs");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function, false);
+ assertTrue(aggregationFunction instanceof
DistinctCountHLLPlusAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.DISTINCTCOUNTHLLPLUS);
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
+ function = getFunction("DiStInCtCoUnTrAwHlLpLuS");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function, false);
+ assertTrue(aggregationFunction instanceof
DistinctCountRawHLLPlusAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUS);
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
function = getFunction("FaStHlL");
aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function, false);
assertTrue(aggregationFunction instanceof FastHLLAggregationFunction);
@@ -358,6 +370,18 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getType(),
AggregationFunctionType.DISTINCTCOUNTRAWHLLMV);
assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+ function = getFunction("DiStInCt_CoUnT_hLl_PlUs_Mv");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function, false);
+ assertTrue(aggregationFunction instanceof
DistinctCountHLLPlusMVAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.DISTINCTCOUNTHLLPLUSMV);
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
+ function = getFunction("DiStInCtCoUnTrAwHlLpLuS_mV");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function, false);
+ assertTrue(aggregationFunction instanceof
DistinctCountRawHLLPlusMVAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUSMV);
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
function = getFunction("PeRcEnTiLe10Mv");
aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function, false);
assertTrue(aggregationFunction instanceof PercentileMVAggregationFunction);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index 33c32db70a..f974aa8542 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -408,6 +408,39 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
124960L, 400000L, expectedResultTable);
}
+ @Test
+ public void testDistinctCountHLLPlusMV() {
+ String query = "SELECT DISTINCTCOUNTHLLPLUSMV(column6) AS value FROM
testTable";
+
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ Object[] expectedResults = new Object[]{18651L};
+ ResultTable expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(expectedResults));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L,
400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResults[0] = 1176L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
62480L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+ expectedResults[0] = 4796L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+ expectedResults[0] = 1176L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
124960L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+ expectedResults[0] = 3457L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+ expectedResults[0] = 579L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
124960L, 400000L, expectedResultTable);
+ }
+
@Test
public void testDistinctCountRawHLLMV() {
String query = "SELECT DISTINCTCOUNTRAWHLLMV(column6) AS value FROM
testTable";
@@ -449,6 +482,48 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
cardinalityExtractor);
}
+ @Test
+ public void testDistinctCountRawHLLPLUSMV() {
+ String query = "SELECT DISTINCTCOUNTRAWHLLPLUSMV(column6) AS value FROM
testTable";
+ Function<Object, Object> cardinalityExtractor =
+ value ->
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(BytesUtils.toBytes((String)
value))
+ .cardinality();
+
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ Object[] expectedResults = new Object[]{18651L};
+ ResultTable expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(expectedResults));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L,
400000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResults[0] = 1176L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
62480L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+ expectedResults[0] = 4796L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+ expectedResults[0] = 1176L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
124960L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+ expectedResults[0] = 3457L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+ expectedResults[0] = 579L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
124960L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+ }
+
@Test
public void testPercentileMV() {
List<String> queries = Arrays.asList("SELECT PERCENTILE50MV(column6) AS
value FROM testTable",
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
index c9f0c444af..591b5ffffa 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
@@ -350,6 +350,39 @@ public class
InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L,
124960L, 400000L, expectedResultTable);
}
+ @Test
+ public void testDistinctCountHLLPLUSMV() {
+ String query = "SELECT DISTINCTCOUNTHLLPLUSMV(column6) AS value FROM
testTable";
+
+ // Without filter, query should be answered by
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
DataSchema.ColumnDataType[]
+ {DataSchema.ColumnDataType.LONG});
+ Object[] expectedResults = new Object[]{18651L};
+ ResultTable expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(expectedResults));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
400000L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResults[0] = 1176L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L,
62480L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+ expectedResults[0] = 4796L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+ expectedResults[0] = 1176L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L,
124960L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+ expectedResults[0] = 3457L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+ expectedResults[0] = 579L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L,
124960L, 400000L, expectedResultTable);
+ }
+
@Test
public void testDistinctCountRawHLLMV() {
String query = "SELECT DISTINCTCOUNTRAWHLLMV(column6) AS value FROM
testTable";
@@ -391,6 +424,48 @@ public class
InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
cardinalityExtractor);
}
+ @Test
+ public void testDistinctCountRawHLLPLUSMV() {
+ String query = "SELECT DISTINCTCOUNTRAWHLLPLUSMV(column6) AS value FROM
testTable";
+ Function<Object, Object> cardinalityExtractor =
+ value ->
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(BytesUtils.toBytes((String)
value))
+ .cardinality();
+
+ // Without filter, query should be answered by
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
DataSchema.ColumnDataType[]
+ {DataSchema.ColumnDataType.LONG});
+ Object[] expectedResults = new Object[]{18651L};
+ ResultTable expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(expectedResults));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
400000L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResults[0] = 1176L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L,
62480L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+ expectedResults[0] = 4796L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+ expectedResults[0] = 1176L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L,
124960L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+ expectedResults[0] = 3457L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+ expectedResults[0] = 579L;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L,
124960L, 400000L, expectedResultTable,
+ cardinalityExtractor);
+ }
+
@Test
public void testPercentileMV() {
List<String> queries = Arrays.asList("SELECT PERCENTILE50MV(column6) AS
value FROM testTable",
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
index a6e8320962..d852a8c626 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
@@ -283,6 +283,32 @@ public class InterSegmentAggregationSingleValueQueriesTest
extends BaseSingleVal
QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
73548L, 120000L, expectedResultTable);
}
+ @Test
+ public void testDistinctCountHLLPlus() {
+ String query = "SELECT DISTINCTCOUNTHLLPLUS(column1) AS v1,
DISTINCTCOUNTHLLPLUS(column3) AS v2 FROM testTable";
+
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema =
+ new DataSchema(new String[]{"v1", "v2"}, new
ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.LONG});
+ ResultTable expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new
Object[]{6595L, 21822L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, 0L,
120000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(new Object[]{1885L, 4545L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
49032L, 120000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + GROUP_BY);
+ expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(new Object[]{3495L, 12022L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
360000L, 120000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+ expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(new Object[]{1273L, 3284L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
73548L, 120000L, expectedResultTable);
+ }
+
@Test
public void testDistinctCountRawHLL() {
String query = "SELECT DISTINCTCOUNTRAWHLL(column1) AS v1,
DISTINCTCOUNTRAWHLL(column3) AS v2 FROM testTable";
@@ -315,6 +341,40 @@ public class InterSegmentAggregationSingleValueQueriesTest
extends BaseSingleVal
cardinalityExtractor);
}
+ @Test
+ public void testDistinctCountRawHLLPlus() {
+ String query =
+ "SELECT DISTINCTCOUNTRAWHLLPLUS(column1) AS v1,
DISTINCTCOUNTRAWHLLPLUS(column3) AS v2 FROM testTable";
+ Function<Object, Object> cardinalityExtractor =
+ value ->
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(BytesUtils.toBytes((String)
value))
+ .cardinality();
+
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema =
+ new DataSchema(new String[]{"v1", "v2"}, new
ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.LONG});
+ ResultTable expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new
Object[]{6595L, 21822L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, 0L,
120000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(new Object[]{1885L, 4545L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
49032L, 120000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + GROUP_BY);
+ expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(new Object[]{3495L, 12022L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
360000L, 120000L, expectedResultTable,
+ cardinalityExtractor);
+
+ brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+ expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(new Object[]{1273L, 3284L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
73548L, 120000L, expectedResultTable,
+ cardinalityExtractor);
+ }
+
@Test
public void testPercentile() {
List<String> queries =
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
index bba665f9e8..17bd6494a7 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.queries;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.tdunning.math.stats.MergingDigest;
import com.tdunning.math.stats.TDigest;
import java.io.File;
@@ -82,6 +83,8 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
private static final String DISTINCT_COUNT_HLL_COLUMN =
"distinctCountHLLColumn";
// Use non-default log2m
private static final int DISTINCT_COUNT_HLL_LOG2M = 9;
+ private static final String DISTINCT_COUNT_HLL_PLUS_COLUMN =
"distinctCountHLLPlusColumn";
+ private static final int DISTINCT_COUNT_HLL_PLUS_P = 14;
private static final String MIN_MAX_RANGE_COLUMN = "minMaxRangeColumn";
private static final String PERCENTILE_EST_COLUMN = "percentileEstColumn";
// Use non-default max error
@@ -101,6 +104,7 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
private final int[][] _valuesArray = new
int[NUM_ROWS][MAX_NUM_VALUES_TO_PRE_AGGREGATE];
private final AvgPair[] _avgPairs = new AvgPair[NUM_ROWS];
private final HyperLogLog[] _hyperLogLogs = new HyperLogLog[NUM_ROWS];
+ private final HyperLogLogPlus[] _hyperLogLogPluses = new
HyperLogLogPlus[NUM_ROWS];
private final MinMaxRangePair[] _minMaxRangePairs = new
MinMaxRangePair[NUM_ROWS];
private final QuantileDigest[] _quantileDigests = new
QuantileDigest[NUM_ROWS];
private final TDigest[] _tDigests = new TDigest[NUM_ROWS];
@@ -193,6 +197,14 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
_tDigests[i] = tDigest;
valueMap.put(PERCENTILE_TDIGEST_COLUMN,
ObjectSerDeUtils.TDIGEST_SER_DE.serialize(tDigest));
+ HyperLogLogPlus hyperLogLogPlus = new
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+ for (int value : values) {
+ hyperLogLogPlus.offer(value);
+ }
+ _hyperLogLogPluses[i] = hyperLogLogPlus;
+ valueMap.put(DISTINCT_COUNT_HLL_PLUS_COLUMN,
+
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus));
+
GenericRow genericRow = new GenericRow();
genericRow.init(valueMap);
rows.add(genericRow);
@@ -201,8 +213,9 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addSingleValueDimension(GROUP_BY_SV_COLUMN, DataType.STRING)
.addMultiValueDimension(GROUP_BY_MV_COLUMN,
DataType.STRING).addMetric(AVG_COLUMN, DataType.BYTES)
- .addMetric(DISTINCT_COUNT_HLL_COLUMN,
DataType.BYTES).addMetric(MIN_MAX_RANGE_COLUMN, DataType.BYTES)
- .addMetric(PERCENTILE_EST_COLUMN,
DataType.BYTES).addMetric(PERCENTILE_TDIGEST_COLUMN, DataType.BYTES).build();
+ .addMetric(DISTINCT_COUNT_HLL_COLUMN,
DataType.BYTES).addMetric(DISTINCT_COUNT_HLL_PLUS_COLUMN, DataType.BYTES)
+ .addMetric(MIN_MAX_RANGE_COLUMN,
DataType.BYTES).addMetric(PERCENTILE_EST_COLUMN, DataType.BYTES)
+ .addMetric(PERCENTILE_TDIGEST_COLUMN, DataType.BYTES).build();
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
@@ -224,7 +237,7 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
AggregationOperator aggregationOperator =
getOperator(getAggregationQuery());
List<Object> aggregationResult =
aggregationOperator.nextBlock().getResults();
assertNotNull(aggregationResult);
- assertEquals(aggregationResult.size(), 5);
+ assertEquals(aggregationResult.size(), 6);
// Avg
AvgPair avgPair = (AvgPair) aggregationResult.get(0);
@@ -277,13 +290,24 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
expectedTDigest.add(_tDigests[i]);
}
assertEquals(tDigest.quantile(0.5), expectedTDigest.quantile(0.5),
PERCENTILE_TDIGEST_DELTA);
+
+ // DistinctCountHLLPlus
+ HyperLogLogPlus hyperLogLogPlus = (HyperLogLogPlus)
aggregationResult.get(5);
+ HyperLogLogPlus expectedHyperLogLogPlus = new
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+ for (int value : _valuesArray[0]) {
+ expectedHyperLogLogPlus.offer(value);
+ }
+ for (int i = 1; i < NUM_ROWS; i++) {
+ expectedHyperLogLogPlus.addAll(_hyperLogLogPluses[i]);
+ }
+ assertEquals(hyperLogLogPlus.cardinality(),
expectedHyperLogLogPlus.cardinality());
}
@Test
public void testInterSegmentsAggregation()
throws Exception {
Object[] aggregationResults =
getBrokerResponse(getAggregationQuery()).getResultTable().getRows().get(0);
- assertEquals(aggregationResults.length, 5);
+ assertEquals(aggregationResults.length, 6);
// Simulate the process of server side merge and broker side merge
@@ -372,11 +396,31 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
tDigest1.add(tDigest2);
double expectedPercentileTDigestResult = tDigest1.quantile(0.5);
+ // DistinctCountHLLPlus
+ HyperLogLogPlus hyperLogLogPlus1 = new
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+ HyperLogLogPlus hyperLogLogPlus2 = new
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+ for (int value : _valuesArray[0]) {
+ hyperLogLogPlus1.offer(value);
+ hyperLogLogPlus2.offer(value);
+ }
+ for (int i = 1; i < NUM_ROWS; i++) {
+ hyperLogLogPlus1.addAll(_hyperLogLogPluses[i]);
+ hyperLogLogPlus2.addAll(_hyperLogLogPluses[i]);
+ }
+ hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+ hyperLogLogPlus1 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+ hyperLogLogPlus2 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+ hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+ long expectedDistinctCountHllPlusResult = hyperLogLogPlus1.cardinality();
+
assertEquals((Double) aggregationResults[0], expectedAvgResult, 1e-5);
assertEquals((long) aggregationResults[1], expectedDistinctCountHllResult);
assertEquals((Double) aggregationResults[2], expectedMinMaxRangeResult,
1e-5);
assertEquals((long) aggregationResults[3], expectedPercentileEstResult);
assertEquals((Double) aggregationResults[4],
expectedPercentileTDigestResult, PERCENTILE_TDIGEST_DELTA);
+ assertEquals((long) aggregationResults[5],
expectedDistinctCountHllPlusResult);
}
@Test
@@ -442,6 +486,17 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
expectedTDigest.add(_tDigests[i]);
}
assertEquals(tDigest.quantile(0.5), expectedTDigest.quantile(0.5),
PERCENTILE_TDIGEST_DELTA);
+
+ // DistinctCountHLLPlus
+ HyperLogLogPlus hyperLogLogPlus = (HyperLogLogPlus)
groupByResult.getResultForGroupId(5, groupKey._groupId);
+ HyperLogLogPlus expectedHyperLogLogPlus = new
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+ for (int value : _valuesArray[groupId]) {
+ expectedHyperLogLogPlus.offer(value);
+ }
+ for (int i = groupId + NUM_GROUPS; i < NUM_ROWS; i += NUM_GROUPS) {
+ expectedHyperLogLogPlus.addAll(_hyperLogLogPluses[i]);
+ }
+ assertEquals(hyperLogLogPlus.cardinality(),
expectedHyperLogLogPlus.cardinality());
}
}
@@ -539,12 +594,32 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
tDigest1.add(tDigest2);
double expectedPercentileTDigestResult = tDigest1.quantile(0.5);
+ // DistinctCountHLLPlus
+ HyperLogLogPlus hyperLogLogPlus1 = new
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+ HyperLogLogPlus hyperLogLogPlus2 = new
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+ for (int value : _valuesArray[groupId]) {
+ hyperLogLogPlus1.offer(value);
+ hyperLogLogPlus2.offer(value);
+ }
+ for (int i = groupId + NUM_GROUPS; i < NUM_ROWS; i += NUM_GROUPS) {
+ hyperLogLogPlus1.addAll(_hyperLogLogPluses[i]);
+ hyperLogLogPlus2.addAll(_hyperLogLogPluses[i]);
+ }
+ hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+ hyperLogLogPlus1 =
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+ hyperLogLogPlus2 =
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+ hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+ long expectedDistinctCountHllPlusResult = hyperLogLogPlus1.cardinality();
+
Object[] row = rows.get(groupId);
assertEquals((Double) row[0], expectedAvgResult, 1e-5);
assertEquals((long) row[1], expectedDistinctCountHllResult);
assertEquals((Double) row[2], expectedMinMaxRangeResult, 1e-5);
assertEquals((long) row[3], expectedPercentileEstResult);
assertEquals((Double) row[4], expectedPercentileTDigestResult,
PERCENTILE_TDIGEST_DELTA);
+ assertEquals((long) row[5], expectedDistinctCountHllPlusResult);
}
}
@@ -595,6 +670,15 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
expectedTDigest.add(_tDigests[i]);
}
+ // DistinctCountHLL
+ HyperLogLogPlus expectedHyperLogLogPlus = new
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+ for (int value : _valuesArray[0]) {
+ expectedHyperLogLogPlus.offer(value);
+ }
+ for (int i = 1; i < NUM_ROWS; i++) {
+ expectedHyperLogLogPlus.addAll(_hyperLogLogPluses[i]);
+ }
+
Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator =
groupByResult.getGroupKeyIterator();
while (groupKeyIterator.hasNext()) {
GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
@@ -620,6 +704,10 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
// PercentileTDigest
TDigest tDigest = (TDigest) groupByResult.getResultForGroupId(4,
groupKey._groupId);
assertEquals(tDigest.quantile(0.5), expectedTDigest.quantile(0.5),
PERCENTILE_TDIGEST_DELTA);
+
+ // DistinctCountHLLPlus
+ HyperLogLogPlus hyperLogLogPlus = (HyperLogLogPlus)
groupByResult.getResultForGroupId(5, groupKey._groupId);
+ assertEquals(hyperLogLogPlus.cardinality(),
expectedHyperLogLogPlus.cardinality());
}
}
@@ -716,20 +804,41 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
tDigest1.add(tDigest2);
double expectedPercentileTDigestResult = tDigest1.quantile(0.5);
+ // DistinctCountHLL
+ HyperLogLogPlus hyperLogLogPlus1 = new
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+ HyperLogLogPlus hyperLogLogPlus2 = new
HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P);
+ for (int value : _valuesArray[0]) {
+ hyperLogLogPlus1.offer(value);
+ hyperLogLogPlus2.offer(value);
+ }
+ for (int i = 1; i < NUM_ROWS; i++) {
+ hyperLogLogPlus1.addAll(_hyperLogLogPluses[i]);
+ hyperLogLogPlus2.addAll(_hyperLogLogPluses[i]);
+ }
+ hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+ hyperLogLogPlus1 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+ hyperLogLogPlus2 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(
+
ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1));
+ hyperLogLogPlus1.addAll(hyperLogLogPlus2);
+ long expectedDistinctCountHllPlusResult = hyperLogLogPlus1.cardinality();
+
for (Object[] row : rows) {
assertEquals((Double) row[0], expectedAvgResult, 1e-5);
assertEquals((long) row[1], expectedDistinctCountHllResult);
assertEquals((Double) row[2], expectedMinMaxRangeResult, 1e-5);
assertEquals((long) row[3], expectedPercentileEstResult);
assertEquals((Double) row[4], expectedPercentileTDigestResult,
PERCENTILE_TDIGEST_DELTA);
+ assertEquals((long) row[5], expectedDistinctCountHllPlusResult);
}
}
private String getAggregationQuery() {
return String.format(
- "SELECT AVG(%s), DISTINCTCOUNTHLL(%s), MINMAXRANGE(%s),
PERCENTILEEST50(%s), PERCENTILETDIGEST50(%s) FROM %s",
+ "SELECT AVG(%s), DISTINCTCOUNTHLL(%s), MINMAXRANGE(%s),
PERCENTILEEST50(%s), PERCENTILETDIGEST50(%s), "
+ + "DISTINCTCOUNTHLLPLUS(%s) FROM %s",
AVG_COLUMN, DISTINCT_COUNT_HLL_COLUMN, MIN_MAX_RANGE_COLUMN,
PERCENTILE_EST_COLUMN, PERCENTILE_TDIGEST_COLUMN,
- RAW_TABLE_NAME);
+ DISTINCT_COUNT_HLL_PLUS_COLUMN, RAW_TABLE_NAME);
}
private String getGroupBySVQuery() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index df045e1d5b..d9d645c065 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2632,7 +2632,6 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// The Accurate value is 6538.
query = "SELECT distinctCount(FlightNum) FROM mytable ";
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
6538);
-
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
6538);
// Expected distinctCountHll with different log2m value from 2 to 19. The
Accurate value is 6538.
long[] expectedResults = new long[]{
@@ -2642,7 +2641,6 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
for (int i = 2; i < 20; i++) {
query = String.format("SELECT distinctCountHLL(FlightNum, %d) FROM
mytable ", i);
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedResults[i - 2]);
-
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedResults[i - 2]);
}
// Default log2m for HLL is set to 12 in V1 and 8 in V2
@@ -2654,7 +2652,31 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
expectedDefault = expectedResults[10];
}
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedDefault);
-
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedDefault);
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testDistinctCountHllPlus(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query;
+
+ // The Accurate value is 6538.
+ query = "SELECT distinctCount(FlightNum) FROM mytable ";
+
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
6538);
+
+ // Expected distinctCountHllPlus with different P value from 4 (minimal
value) to 19. The Accurate value is 6538.
+ long[] expectedResults = new long[]{
+ 4901, 5755, 6207, 5651, 6318, 6671, 6559, 6425, 6490, 6486, 6489,
6516, 6532, 6526, 6525, 6534
+ };
+
+ for (int i = 4; i < 20; i++) {
+ query = String.format("SELECT distinctCountHLLPlus(FlightNum, %d) FROM
mytable ", i);
+
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedResults[i - 4]);
+ }
+
+ // Default HLL Plus is set as p=14
+ query = "SELECT distinctCountHLLPlus(FlightNum) FROM mytable ";
+
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedResults[10]);
}
@Test
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
new file mode 100644
index 0000000000..66e38cd151
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.local.utils.HyperLogLogPlusUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class DistinctCountHLLPlusValueAggregator implements
ValueAggregator<Object, HyperLogLogPlus> {
+ public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+ private final int _p;
+ private final int _sp;
+
+ // Byte size won't change once we get the initial aggregated value
+ private int _maxByteSize;
+
+ public DistinctCountHLLPlusValueAggregator(List<ExpressionContext>
arguments) {
+ // length 1 means we use the default _p and _sp
+ if (arguments.size() == 2) {
+ _p = arguments.get(1).getLiteral().getIntValue();
+ _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP;
+ } else if (arguments.size() == 3) {
+ _p = arguments.get(1).getLiteral().getIntValue();
+ _sp = arguments.get(2).getLiteral().getIntValue();
+ } else {
+ _p = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_P;
+ _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP;
+ }
+ }
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.DISTINCTCOUNTHLL;
+ }
+
+ @Override
+ public DataType getAggregatedValueType() {
+ return AGGREGATED_VALUE_TYPE;
+ }
+
+ @Override
+ public HyperLogLogPlus getInitialAggregatedValue(Object rawValue) {
+ HyperLogLogPlus initialValue;
+ if (rawValue instanceof byte[]) {
+ byte[] bytes = (byte[]) rawValue;
+ initialValue = deserializeAggregatedValue(bytes);
+ _maxByteSize = bytes.length;
+ } else {
+ initialValue = new HyperLogLogPlus(_p, _sp);
+ initialValue.offer(rawValue);
+ _maxByteSize = HyperLogLogPlusUtils.byteSize(_p, _sp);
+ }
+ return initialValue;
+ }
+
+ @Override
+ public HyperLogLogPlus applyRawValue(HyperLogLogPlus value, Object rawValue)
{
+ if (rawValue instanceof byte[]) {
+ try {
+ value.addAll(deserializeAggregatedValue((byte[]) rawValue));
+ } catch (CardinalityMergeException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ value.offer(rawValue);
+ }
+ return value;
+ }
+
+ @Override
+ public HyperLogLogPlus applyAggregatedValue(HyperLogLogPlus value,
HyperLogLogPlus aggregatedValue) {
+ try {
+ value.addAll(aggregatedValue);
+ return value;
+ } catch (CardinalityMergeException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public HyperLogLogPlus cloneAggregatedValue(HyperLogLogPlus value) {
+ return deserializeAggregatedValue(serializeAggregatedValue(value));
+ }
+
+ @Override
+ public int getMaxAggregatedValueByteSize() {
+ // NOTE: For aggregated metrics, initial aggregated value might have not
been generated. Returns the byte size
+ // based on p and sp.
+ return _maxByteSize > 0 ? _maxByteSize : HyperLogLogPlusUtils.byteSize(_p,
_sp);
+ }
+
+ @Override
+ public byte[] serializeAggregatedValue(HyperLogLogPlus value) {
+ return CustomSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(value);
+ }
+
+ @Override
+ public HyperLogLogPlus deserializeAggregatedValue(byte[] bytes) {
+ return CustomSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytes);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index b348b1ff4c..16dc0328f7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -70,6 +70,9 @@ public class ValueAggregatorFactory {
case DISTINCTCOUNTTHETASKETCH:
case DISTINCTCOUNTRAWTHETASKETCH:
return new DistinctCountThetaSketchValueAggregator();
+ case DISTINCTCOUNTHLLPLUS:
+ case DISTINCTCOUNTRAWHLLPLUS:
+ return new DistinctCountHLLPlusValueAggregator(arguments);
case DISTINCTCOUNTTUPLESKETCH:
case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
case AVGVALUEINTEGERSUMTUPLESKETCH:
@@ -116,6 +119,9 @@ public class ValueAggregatorFactory {
case DISTINCTCOUNTTHETASKETCH:
case DISTINCTCOUNTRAWTHETASKETCH:
return DistinctCountThetaSketchValueAggregator.AGGREGATED_VALUE_TYPE;
+ case DISTINCTCOUNTHLLPLUS:
+ case DISTINCTCOUNTRAWHLLPLUS:
+ return DistinctCountHLLPlusValueAggregator.AGGREGATED_VALUE_TYPE;
case DISTINCTCOUNTTUPLESKETCH:
case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
case AVGVALUEINTEGERSUMTUPLESKETCH:
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedHLLPlus.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedHLLPlus.java
new file mode 100644
index 0000000000..bee3e7884e
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedHLLPlus.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.spi.utils.BytesUtils;
+
+
+public class SerializedHLLPlus implements Comparable<SerializedHLLPlus> {
+ private final HyperLogLogPlus _hyperLogLogPlus;
+
+ public SerializedHLLPlus(HyperLogLogPlus hyperLogLogPlus) {
+ _hyperLogLogPlus = hyperLogLogPlus;
+ }
+
+ @Override
+ public int compareTo(SerializedHLLPlus other) {
+ return Long.compare(_hyperLogLogPlus.cardinality(),
other._hyperLogLogPlus.cardinality());
+ }
+
+ @Override
+ public String toString() {
+ return
BytesUtils.toHexString(CustomSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(_hyperLogLogPlus));
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
index 1ed3a3e341..f5a6275a3b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.utils;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.google.common.primitives.Longs;
import com.tdunning.math.stats.MergingDigest;
import com.tdunning.math.stats.TDigest;
@@ -188,6 +189,34 @@ public class CustomSerDeUtils {
}
};
+ public static final ObjectSerDe<HyperLogLogPlus> HYPER_LOG_LOG_PLUS_SER_DE =
new ObjectSerDe<HyperLogLogPlus>() {
+
+ @Override
+ public byte[] serialize(HyperLogLogPlus hyperLogLogPlus) {
+ try {
+ return hyperLogLogPlus.getBytes();
+ } catch (IOException e) {
+ throw new RuntimeException("Caught exception while serializing
HyperLogLogPlus", e);
+ }
+ }
+
+ @Override
+ public HyperLogLogPlus deserialize(byte[] bytes) {
+ try {
+ return HyperLogLogPlus.Builder.build(bytes);
+ } catch (IOException e) {
+ throw new RuntimeException("Caught exception while de-serializing
HyperLogLogPlus", e);
+ }
+ }
+
+ @Override
+ public HyperLogLogPlus deserialize(ByteBuffer byteBuffer) {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return deserialize(bytes);
+ }
+ };
+
public static final ObjectSerDe<TDigest> TDIGEST_SER_DE = new
ObjectSerDe<TDigest>() {
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HyperLogLogPlusUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HyperLogLogPlusUtils.java
new file mode 100644
index 0000000000..8614961511
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HyperLogLogPlusUtils.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+
+
+public class HyperLogLogPlusUtils {
+ private HyperLogLogPlusUtils() {
+ }
+
+ /**
+ * Returns the byte size of the given HyperLogLog.
+ */
+ public static int byteSize(HyperLogLogPlus value) {
+ // 8 bytes header (p, sp, & register set size) & register set data
+ return value.sizeof() + 3 * Integer.BYTES;
+ }
+
+ /**
+ * Returns the byte size of HyperLogLogPlus of a given p and sp.
+ */
+ public static int byteSize(int p, int sp) {
+ // 8 bytes header (p & sp) & register set data
+ return new HyperLogLogPlus(p, sp).sizeof() + 3 * Integer.BYTES;
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 4ac3b32af9..33313366a6 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -141,6 +141,15 @@ public enum AggregationFunctionType {
PERCENTILERAWKLL("percentileRawKLL", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC,
SqlTypeFamily.NUMERIC)), ReturnTypes.VARCHAR_2000,
ReturnTypes.explicit(SqlTypeName.OTHER)),
+ // hyper log log plus plus functions
+ DISTINCTCOUNTHLLPLUS("distinctCountHLLPlus",
ImmutableList.of("DISTINCT_COUNT_HLL_PLUS"), SqlKind.OTHER_FUNCTION,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.NUMERIC), ordinal -> ordinal > 0),
+ ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)),
+ DISTINCTCOUNTRAWHLLPLUS("distinctCountRawHLLPlus",
ImmutableList.of("DISTINCT_COUNT_RAW_HLL_PLUS"),
+ SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.INTEGER), ordinal -> ordinal > 0),
+ ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)),
// DEPRECATED in v2
@Deprecated
@@ -248,6 +257,13 @@ public enum AggregationFunctionType {
PERCENTILERAWKLLMV("percentileRawKLLMV", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.ARRAY,
SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC),
ordinal -> ordinal > 1 && ordinal < 4), ReturnTypes.VARCHAR_2000,
ReturnTypes.explicit(SqlTypeName.OTHER)),
+ // hyper log log plus plus functions
+ DISTINCTCOUNTHLLPLUSMV("distinctCountHLLPlusMV", null,
SqlKind.OTHER_FUNCTION,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(SqlTypeFamily.ARRAY), ReturnTypes.BIGINT,
+ ReturnTypes.explicit(SqlTypeName.OTHER)),
+ DISTINCTCOUNTRAWHLLPLUSMV("distinctCountRawHLLPlusMV", null,
SqlKind.OTHER_FUNCTION,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(SqlTypeFamily.ARRAY), ReturnTypes.VARCHAR_2000,
+ ReturnTypes.explicit(SqlTypeName.OTHER)),
// boolean aggregate functions
BOOLAND("boolAnd", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 91755df8fa..713bbb2fe3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -92,6 +92,8 @@ public class CommonConstants {
public static final String DEFAULT_HYPERLOGLOG_LOG2M_KEY =
"default.hyperloglog.log2m";
public static final int DEFAULT_HYPERLOGLOG_LOG2M = 8;
+ public static final int DEFAULT_HYPERLOGLOG_PLUS_P = 14;
+ public static final int DEFAULT_HYPERLOGLOG_PLUS_SP = 0;
// 2 to the power of 16, for tradeoffs see datasketches library
documentation:
// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]