This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bd86e7bdc6 Add support for Apache Datasketches CPC sketch (#11774)
bd86e7bdc6 is described below
commit bd86e7bdc6df22846207b9f496ccab703e0ad462
Author: David Cromberge <[email protected]>
AuthorDate: Wed Oct 18 21:37:27 2023 +0100
Add support for Apache Datasketches CPC sketch (#11774)
---
.../function/AggregationFunctionTypeTest.java | 8 +-
.../apache/pinot/core/common/ObjectSerDeUtils.java | 31 +-
.../core/function/scalar/SketchFunctions.java | 116 ++++-
.../function/AggregationFunctionFactory.java | 4 +
.../DistinctCountCPCSketchAggregationFunction.java | 540 +++++++++++++++++++++
...stinctCountRawCPCSketchAggregationFunction.java | 53 ++
....java => DistinctCountCPCSketchAggregator.java} | 32 +-
.../DistinctCountThetaSketchAggregator.java | 6 +-
.../aggregator/ValueAggregatorFactory.java | 3 +
.../pinot/core/common/ObjectSerDeUtilsTest.java | 18 +
.../core/function/scalar/SketchFunctionsTest.java | 18 +-
.../v2/DistinctCountCPCSketchStarTreeV2Test.java | 52 ++
.../DistinctCountThetaSketchQueriesTest.java | 2 +-
.../integration/tests/custom/CpcSketchTest.java | 198 ++++++++
.../DistinctCountCPCSketchValueAggregator.java | 193 ++++++++
.../DistinctCountThetaSketchValueAggregator.java | 4 +-
.../local/aggregator/ValueAggregatorFactory.java | 6 +
.../local/customobject/SerializedCPCSketch.java | 46 ++
.../segment/local/utils/CustomSerDeUtils.java | 23 +-
.../segment/local/utils/TableConfigUtils.java | 10 +-
.../DistinctCountCPCSketchValueAggregatorTest.java | 161 ++++++
.../pinot/segment/spi/AggregationFunctionType.java | 103 ++--
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
.../org/apache/pinot/tools/SegmentDumpTool.java | 19 +-
24 files changed, 1554 insertions(+), 94 deletions(-)
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
index f7a9ca16b0..b12cda67c9 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
@@ -34,9 +34,9 @@ public class AggregationFunctionTypeTest {
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("AvG"),
AggregationFunctionType.AVG);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MoDe"),
AggregationFunctionType.MODE);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("FiRsTwItHtImE"),
- AggregationFunctionType.FIRSTWITHTIME);
+ AggregationFunctionType.FIRSTWITHTIME);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("LaStWiThTiMe"),
- AggregationFunctionType.LASTWITHTIME);
+ AggregationFunctionType.LASTWITHTIME);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmAxRaNgE"),
AggregationFunctionType.MINMAXRANGE);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnT"),
@@ -79,6 +79,10 @@ public class AggregationFunctionTypeTest {
AggregationFunctionType.PERCENTILERAWEST);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWtDiGeSt99"),
AggregationFunctionType.PERCENTILERAWTDIGEST);
+
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnTcPcSkEtCh"),
+ AggregationFunctionType.DISTINCTCOUNTCPCSKETCH);
+
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnTrAwCpCsKeTcH"),
+ AggregationFunctionType.DISTINCTCOUNTRAWCPCSKETCH);
}
@Test(expectedExceptions = IllegalArgumentException.class)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 3b7d187bc7..45f1ebd489 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -59,6 +59,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.cpc.CpcSketch;
import org.apache.datasketches.frequencies.ItemsSketch;
import org.apache.datasketches.frequencies.LongsSketch;
import org.apache.datasketches.kll.KllDoublesSketch;
@@ -142,8 +143,8 @@ public class ObjectSerDeUtils {
IntegerTupleSketch(37),
FrequentStringsSketch(38),
FrequentLongsSketch(39),
- HyperLogLogPlus(40);
-
+ HyperLogLogPlus(40),
+ CompressedProbabilisticCounting(41);
private final int _value;
@@ -241,6 +242,8 @@ public class ObjectSerDeUtils {
return ObjectType.FrequentLongsSketch;
} else if (value instanceof HyperLogLogPlus) {
return ObjectType.HyperLogLogPlus;
+ } else if (value instanceof CpcSketch) {
+ return ObjectType.CompressedProbabilisticCounting;
} else {
throw new IllegalArgumentException("Unsupported type of value: " +
value.getClass().getSimpleName());
}
@@ -952,7 +955,7 @@ public class ObjectSerDeUtils {
}
};
- public static final ObjectSerDe<Sketch> DATA_SKETCH_SER_DE = new
ObjectSerDe<Sketch>() {
+ public static final ObjectSerDe<Sketch> DATA_SKETCH_THETA_SER_DE = new
ObjectSerDe<Sketch>() {
@Override
public byte[] serialize(Sketch value) {
@@ -1016,6 +1019,25 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<CpcSketch> DATA_SKETCH_CPC_SER_DE = new
ObjectSerDe<CpcSketch>() {
+ @Override
+ public byte[] serialize(CpcSketch value) {
+ return value.toByteArray();
+ }
+
+ @Override
+ public CpcSketch deserialize(byte[] bytes) {
+ return CpcSketch.heapify(Memory.wrap(bytes));
+ }
+
+ @Override
+ public CpcSketch deserialize(ByteBuffer byteBuffer) {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return CpcSketch.heapify(Memory.wrap(bytes));
+ }
+ };
+
public static final ObjectSerDe<Geometry> GEOMETRY_SER_DE = new
ObjectSerDe<Geometry>() {
@Override
@@ -1383,7 +1405,7 @@ public class ObjectSerDeUtils {
INT_SET_SER_DE,
TDIGEST_SER_DE,
DISTINCT_TABLE_SER_DE,
- DATA_SKETCH_SER_DE,
+ DATA_SKETCH_THETA_SER_DE,
GEOMETRY_SER_DE,
ROARING_BITMAP_SER_DE,
LONG_SET_SER_DE,
@@ -1412,6 +1434,7 @@ public class ObjectSerDeUtils {
FREQUENT_STRINGS_SKETCH_SER_DE,
FREQUENT_LONGS_SKETCH_SER_DE,
HYPER_LOG_LOG_PLUS_SER_DE,
+ DATA_SKETCH_CPC_SER_DE,
};
//@formatter:on
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index 8b0b72d1d1..f26a062b10 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -22,6 +22,8 @@ import
com.clearspring.analytics.stream.cardinality.HyperLogLog;
import java.math.BigDecimal;
import java.util.Base64;
import javax.annotation.Nullable;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.AnotB;
import org.apache.datasketches.theta.Intersection;
@@ -68,6 +70,14 @@ import org.apache.pinot.spi.utils.CommonConstants;
* {
* "columnName": "names",
* "transformFunction": "toHLL(playerName, 8)"
+ * },
+ * {
+ * "columnName": "players",
+ * "transformFunction": "toCpcSketch(playerID)"
+ * },
+ * {
+ * "columnName": "players",
+ * "transformFunction": "toCpcSketch(playerID, 11)"
* }
* ]
* }
@@ -119,7 +129,7 @@ public class SketchFunctions {
"Unrecognised input type for Theta sketch: " +
input.getClass().getSimpleName());
}
}
- return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(sketch.compact());
+ return
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(sketch.compact());
}
/**
@@ -274,8 +284,8 @@ public class SketchFunctions {
} else if (sketchObj instanceof byte[]) {
return Sketches.wrapSketch(Memory.wrap((byte[]) sketchObj));
} else {
- throw new RuntimeException("Exception occurred getting estimate from
Theta Sketch, unsupported Object type: "
- + sketchObj.getClass());
+ throw new RuntimeException(
+ "Exception occurred getting estimate from Theta Sketch, unsupported
Object type: " + sketchObj.getClass());
}
}
@@ -310,9 +320,8 @@ public class SketchFunctions {
}
private static byte[] intTupleSketchUnionVar(IntegerSummary.Mode mode, int
nomEntries, Object... sketchObjects) {
- org.apache.datasketches.tuple.Union<IntegerSummary>
- union = new org.apache.datasketches.tuple.Union<>(nomEntries,
- new IntegerSummarySetOperations(mode, mode));
+ org.apache.datasketches.tuple.Union<IntegerSummary> union =
+ new org.apache.datasketches.tuple.Union<>(nomEntries, new
IntegerSummarySetOperations(mode, mode));
for (Object sketchObj : sketchObjects) {
union.union(asIntegerSketch(sketchObj));
}
@@ -360,8 +369,8 @@ public class SketchFunctions {
} else if (sketchObj instanceof byte[]) {
return
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) sketchObj);
} else {
- throw new RuntimeException("Exception occurred getting reading Tuple
Sketch, unsupported Object type: "
- + sketchObj.getClass());
+ throw new RuntimeException(
+ "Exception occurred getting reading Tuple Sketch, unsupported Object
type: " + sketchObj.getClass());
}
}
@@ -369,4 +378,95 @@ public class SketchFunctions {
public static long getIntTupleSketchEstimate(Object o1) {
return Math.round(asIntegerSketch(o1).getEstimate());
}
+
+ /**
+ * Create a CPC Sketch containing the input
+ *
+ * @param input an Object we want to insert into the sketch, may be null to
return an empty sketch
+ * @return serialized CPC sketch as bytes
+ */
+ @ScalarFunction(nullableParameters = true)
+ public static byte[] toCpcSketch(@Nullable Object input) {
+ return toCpcSketch(input, CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+ }
+
+ @ScalarFunction(names = {"getCpcSketchEstimate", "get_cpc_sketch_estimate"})
+ public static long getCpcSketchEstimate(Object o1) {
+ return Math.round(asCpcSketch(o1).getEstimate());
+ }
+
+ @ScalarFunction(names = {"cpcSketchUnion", "cpc_sketch_union"})
+ public static byte[] cpcSketchUnion(Object o1, Object o2) {
+ return cpcSketchUnionVar(o1, o2);
+ }
+
+ @ScalarFunction(names = {"cpcSketchUnion", "cpc_sketch_union"})
+ public static byte[] cpcSketchUnion(Object o1, Object o2, Object o3) {
+ return cpcSketchUnionVar(o1, o2, o3);
+ }
+
+ @ScalarFunction(names = {"cpcSketchUnion", "cpc_sketch_union"})
+ public static byte[] cpcSketchUnion(Object o1, Object o2, Object o3, Object
o4) {
+ return cpcSketchUnionVar(o1, o2, o3, o4);
+ }
+
+ @ScalarFunction(names = {"cpcSketchUnion", "cpc_sketch_union"})
+ public static byte[] cpcSketchUnion(Object o1, Object o2, Object o3, Object
o4, Object o5) {
+ return cpcSketchUnionVar(o1, o2, o3, o4, o5);
+ }
+
+ /**
+ * Create a CPC Sketch containing the input, with a configured nominal
entries
+ *
+ * @param input an Object we want to insert into the sketch, may be null to
return an empty sketch
+ * @param lgK the given log_base2 of k, which is the nominal entries that
the sketch is configured to keep
+ * @return serialized CPC sketch as bytes
+ */
+ @ScalarFunction(nullableParameters = true)
+ public static byte[] toCpcSketch(@Nullable Object input, int lgK) {
+ CpcSketch sketch = new CpcSketch(lgK);
+ if (input != null) {
+ if (input instanceof Integer) {
+ sketch.update((Integer) input);
+ } else if (input instanceof Long) {
+ sketch.update((Long) input);
+ } else if (input instanceof Float) {
+ sketch.update((Float) input);
+ } else if (input instanceof Double) {
+ sketch.update((Double) input);
+ } else if (input instanceof BigDecimal) {
+ sketch.update(((BigDecimal) input).toString());
+ } else if (input instanceof String) {
+ sketch.update((String) input);
+ } else if (input instanceof byte[]) {
+ sketch.update((byte[]) input);
+ } else {
+ throw new IllegalArgumentException(
+ "Unrecognised input type for CPC sketch: " +
input.getClass().getSimpleName());
+ }
+ }
+ return ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(sketch);
+ }
+
+ private static CpcSketch asCpcSketch(Object sketchObj) {
+ if (sketchObj instanceof CpcSketch) {
+ return (CpcSketch) sketchObj;
+ } else if (sketchObj instanceof byte[]) {
+ return CpcSketch.heapify(Memory.wrap((byte[]) sketchObj));
+ } else if (sketchObj instanceof String) {
+ byte[] decoded = Base64.getDecoder().decode((String) sketchObj);
+ return CpcSketch.heapify(Memory.wrap((decoded)));
+ } else {
+ throw new RuntimeException(
+ "Exception occurred getting estimate from CPC Sketch, unsupported
Object type: " + sketchObj.getClass());
+ }
+ }
+
+ private static byte[] cpcSketchUnionVar(Object... sketchObjects) {
+ CpcUnion union = new
CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+ for (Object sketchObj : sketchObjects) {
+ union.update(asCpcSketch(sketchObj));
+ }
+ return union.getResult().toByteArray();
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index f83fe090c0..0480141f17 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -364,6 +364,10 @@ public class AggregationFunctionFactory {
return new FrequentStringsSketchAggregationFunction(arguments);
case FREQUENTLONGSSKETCH:
return new FrequentLongsSketchAggregationFunction(arguments);
+ case DISTINCTCOUNTCPCSKETCH:
+ return new DistinctCountCPCSketchAggregationFunction(arguments);
+ case DISTINCTCOUNTRAWCPCSKETCH:
+ return new DistinctCountRawCPCSketchAggregationFunction(arguments);
default:
throw new IllegalArgumentException("Unsupported aggregation
function type: " + functionType);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
new file mode 100644
index 0000000000..1946200842
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code DistinctCountCPCSketchAggregationFunction} is used for
space-efficient cardinality estimation.
+ * The Apache Datasketches CPC sketch is a unique-counting sketch that
implements the
+ * <i>Compressed Probabilistic Counting (CPC, a.k.a FM85)</i> algorithms
developed by Kevin Lang in his paper
+ * <a href="https://arxiv.org/abs/1708.06839">Back to the Future: an Even More
Nearly Optimal Cardinality Estimation
+ * Algorithm</a>.
+ * <br><br>
+ * The stored CPC sketch can consume about 40% less space than an HLL sketch
of comparable accuracy. CPC sketches have
+ * been intentionally designed to offer different tradeoffs to HLL sketches so
that, they complement each
+ * other in many ways. For more information, see the Apache Datasketches
documentation.
+ * <br><br>
+ * The aggregation function supports both pre-aggregated sketches or raw
values, but no post-aggregation is supported.
+ * Usage examples:
+ * <ul>
+ * <li>
+ * Simple union (1 or 2 arguments): main expression to aggregate on,
followed by an optional CPC sketch size
+ * argument. The second argument is the sketch lgK – the given log_base2
of k, and defaults to 12.
+ * The "raw" equivalents return serialised sketches in base64-encoded
strings.
+ * <p>DISTINCT_COUNT_CPC_SKETCH(col)</p>
+ * <p>DISTINCT_COUNT_CPC_SKETCH(col, 12)</p>
+ * <p>DISTINCT_COUNT_RAW_CPC_SKETCH(col)</p>
+ * <p>DISTINCT_COUNT_RAW_CPC_SKETCH(col, 12)</p>
+ * <li>
+ * Extracting a cardinality estimate from a CPC sketch:
+ * <p>GET_CPC_SKETCH_ESTIMATE(sketch_bytes)</p>
+ * <p>GET_CPC_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_CPC_SKETCH(col))</p>
+ * </li>
+ * <li>
+ * Union between two sketches:
+ * <p>
+ * CPC_SKETCH_UNION(
+ * DISTINCT_COUNT_RAW_CPC_SKETCH(col1),
+ * DISTINCT_COUNT_RAW_CPC_SKETCH(col2)
+ * )
+ * </p>
+ * </li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes"})
+public class DistinctCountCPCSketchAggregationFunction
+ extends BaseSingleInputAggregationFunction<CpcSketch, Comparable> {
+ protected final int _lgK;
+
+ public DistinctCountCPCSketchAggregationFunction(List<ExpressionContext>
arguments) {
+ super(arguments.get(0));
+ int numExpressions = arguments.size();
+ // This function expects 1 or 2 arguments - it is a code smell to extend
the base for single
+ // input aggregation functions. Nevertheless, there are other functions
in the base class that
+ // are apply here. See also: Theta sketch aggregation function.
+ Preconditions.checkArgument(numExpressions <= 2, "DistinctCountCPC expects
1 or 2 arguments, got: %s",
+ numExpressions);
+ if (arguments.size() == 2) {
+ _lgK = arguments.get(1).getLiteral().getIntValue();
+ } else {
+ _lgK = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK;
+ }
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTCPCSKETCH;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized CPC Sketch
+ FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+ if (storedType == DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ CpcSketch cpcSketch = aggregationResultHolder.getResult();
+ CpcUnion union = new CpcUnion(_lgK);
+ if (cpcSketch != null) {
+ union.update(cpcSketch);
+ }
+ for (int i = 0; i < length; i++) {
+
union.update(ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]));
+ }
+ aggregationResultHolder.setValue(union.getResult());
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging CPC
sketches", e);
+ }
+ return;
+ }
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ getDictIdBitmap(aggregationResultHolder, dictionary).addN(dictIds, 0,
length);
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the CpcSketch
+ CpcSketch cpcSketch = getCpcSketch(aggregationResultHolder);
+ switch (storedType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ cpcSketch.update(intValues[i]);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ cpcSketch.update(longValues[i]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ cpcSketch.update(floatValues[i]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ cpcSketch.update(doubleValues[i]);
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ cpcSketch.update(stringValues[i]);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Illegal data type for
DISTINCT_COUNT_CPC aggregation function: " + storedType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized CPC Sketch
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ if (storedType == DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ for (int i = 0; i < length; i++) {
+ CpcSketch value =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]);
+ int groupKey = groupKeyArray[i];
+ CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
+ if (cpcSketch != null) {
+ CpcUnion union = new CpcUnion(_lgK);
+ union.update(cpcSketch);
+ union.update(value);
+ groupByResultHolder.setValueForKey(groupKey, union.getResult());
+ } else {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging CPC
sketches", e);
+ }
+ return;
+ }
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ for (int i = 0; i < length; i++) {
+ getDictIdBitmap(groupByResultHolder, groupKeyArray[i],
dictionary).add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the CpcSketch
+ switch (storedType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ getCpcSketch(groupByResultHolder,
groupKeyArray[i]).update(intValues[i]);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ getCpcSketch(groupByResultHolder,
groupKeyArray[i]).update(longValues[i]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ getCpcSketch(groupByResultHolder,
groupKeyArray[i]).update(floatValues[i]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ getCpcSketch(groupByResultHolder,
groupKeyArray[i]).update(doubleValues[i]);
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ getCpcSketch(groupByResultHolder,
groupKeyArray[i]).update(stringValues[i]);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Illegal data type for
DISTINCT_COUNT_CPC aggregation function: " + storedType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized CPC Sketch
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ if (storedType == DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ for (int i = 0; i < length; i++) {
+ CpcSketch value =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]);
+ for (int groupKey : groupKeysArray[i]) {
+ CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
+ if (cpcSketch != null) {
+ CpcUnion union = new CpcUnion(_lgK);
+ union.update(cpcSketch);
+ union.update(value);
+ groupByResultHolder.setValueForKey(groupKey, union.getResult());
+ } else {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging CPC
sketches", e);
+ }
+ return;
+ }
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ for (int i = 0; i < length; i++) {
+ setDictIdForGroupKeys(groupByResultHolder, groupKeysArray[i],
dictionary, dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the CpcSketch
+ switch (storedType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ getCpcSketch(groupByResultHolder, groupKey).update(intValues[i]);
+ }
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ getCpcSketch(groupByResultHolder, groupKey).update(longValues[i]);
+ }
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ getCpcSketch(groupByResultHolder, groupKey).update(floatValues[i]);
+ }
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ getCpcSketch(groupByResultHolder,
groupKey).update(doubleValues[i]);
+ }
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ getCpcSketch(groupByResultHolder,
groupKey).update(stringValues[i]);
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException("Illegal data type for
DISTINCT_COUNT_CPC aggregation function: " + storedType);
+ }
+ }
+
+ @Override
+ public CpcSketch extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ Object result = aggregationResultHolder.getResult();
+ if (result == null) {
+ return new CpcSketch(_lgK);
+ }
+
+ if (result instanceof DictIdsWrapper) {
+ // For dictionary-encoded expression, convert dictionary ids to CpcSketch
+ return convertToCpcSketch((DictIdsWrapper) result);
+ } else {
+ // For non-dictionary-encoded expression, directly return the CpcSketch
+ return (CpcSketch) result;
+ }
+ }
+
+ @Override
+ public CpcSketch extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ Object result = groupByResultHolder.getResult(groupKey);
+ if (result == null) {
+ return new CpcSketch(_lgK);
+ }
+
+ if (result instanceof DictIdsWrapper) {
+ // For dictionary-encoded expression, convert dictionary ids to CpcSketch
+ return convertToCpcSketch((DictIdsWrapper) result);
+ } else {
+ // For non-dictionary-encoded expression, directly return the CpcSketch
+ return (CpcSketch) result;
+ }
+ }
+
+ @Override
+ public CpcSketch merge(CpcSketch intermediateResult1, CpcSketch
intermediateResult2) {
+ if (intermediateResult1 == null && intermediateResult2 != null) {
+ return intermediateResult2;
+ } else if (intermediateResult1 != null && intermediateResult2 == null) {
+ return intermediateResult1;
+ } else if (intermediateResult1 == null) {
+ return new CpcSketch(_lgK);
+ }
+
+ CpcUnion union = new CpcUnion(_lgK);
+ union.update(intermediateResult1);
+ union.update(intermediateResult2);
+ return union.getResult();
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+ return DataSchema.ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getFinalResultColumnType() {
+ return DataSchema.ColumnDataType.LONG;
+ }
+
+ @Override
+ public Comparable extractFinalResult(CpcSketch intermediateResult) {
+ return Math.round(intermediateResult.getEstimate());
+ }
+
+ /**
+ * Returns the dictionary id bitmap from the result holder or creates a new
one if it does not exist.
+ */
+ protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder
aggregationResultHolder,
+ Dictionary dictionary) {
+ DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
+ if (dictIdsWrapper == null) {
+ dictIdsWrapper = new DictIdsWrapper(dictionary);
+ aggregationResultHolder.setValue(dictIdsWrapper);
+ }
+ return dictIdsWrapper._dictIdBitmap;
+ }
+
+ /**
+ * Returns the CpcSketch from the result holder or creates a new one if it
does not exist.
+ */
+ protected CpcSketch getCpcSketch(AggregationResultHolder
aggregationResultHolder) {
+ CpcSketch cpcSketch = aggregationResultHolder.getResult();
+ if (cpcSketch == null) {
+ cpcSketch = new CpcSketch(_lgK);
+ aggregationResultHolder.setValue(cpcSketch);
+ }
+ return cpcSketch;
+ }
+
+ /**
+ * Returns the dictionary id bitmap for the given group key or creates a new
one if it does not exist.
+ */
+ protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder
groupByResultHolder, int groupKey,
+ Dictionary dictionary) {
+ DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey);
+ if (dictIdsWrapper == null) {
+ dictIdsWrapper = new DictIdsWrapper(dictionary);
+ groupByResultHolder.setValueForKey(groupKey, dictIdsWrapper);
+ }
+ return dictIdsWrapper._dictIdBitmap;
+ }
+
+ /**
+ * Returns the CpcSketch for the given group key or creates a new one if it
does not exist.
+ */
+ protected CpcSketch getCpcSketch(GroupByResultHolder groupByResultHolder,
int groupKey) {
+ CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
+ if (cpcSketch == null) {
+ cpcSketch = new CpcSketch(_lgK);
+ groupByResultHolder.setValueForKey(groupKey, cpcSketch);
+ }
+ return cpcSketch;
+ }
+
+ /**
+ * Helper method to set dictionary id for the given group keys into the
result holder.
+ */
+ private static void setDictIdForGroupKeys(GroupByResultHolder
groupByResultHolder, int[] groupKeys,
+ Dictionary dictionary, int dictId) {
+ for (int groupKey : groupKeys) {
+ getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId);
+ }
+ }
+
+ private CpcSketch convertToCpcSketch(DictIdsWrapper dictIdsWrapper) {
+ CpcSketch cpcSketch = new CpcSketch(_lgK);
+ Dictionary dictionary = dictIdsWrapper._dictionary;
+ RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
+ PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+ while (iterator.hasNext()) {
+ Object value = dictionary.get(iterator.next());
+ addObjectToSketch(value, cpcSketch);
+ }
+ return cpcSketch;
+ }
+
+ private void addObjectToSketch(Object rawValue, CpcSketch sketch) {
+ if (rawValue instanceof String) {
+ sketch.update((String) rawValue);
+ } else if (rawValue instanceof Integer) {
+ sketch.update((Integer) rawValue);
+ } else if (rawValue instanceof Long) {
+ sketch.update((Long) rawValue);
+ } else if (rawValue instanceof Double) {
+ sketch.update((Double) rawValue);
+ } else if (rawValue instanceof Float) {
+ sketch.update((Float) rawValue);
+ } else if (rawValue instanceof Object[]) {
+ addObjectsToSketch((Object[]) rawValue, sketch);
+ } else {
+ throw new IllegalStateException(
+ "Unsupported data type for CPC Sketch aggregation: " +
rawValue.getClass().getSimpleName());
+ }
+ }
+
+ private void addObjectsToSketch(Object[] rawValues, CpcSketch sketch) {
+ if (rawValues instanceof String[]) {
+ for (String s : (String[]) rawValues) {
+ sketch.update(s);
+ }
+ } else if (rawValues instanceof Integer[]) {
+ for (Integer i : (Integer[]) rawValues) {
+ sketch.update(i);
+ }
+ } else if (rawValues instanceof Long[]) {
+ for (Long l : (Long[]) rawValues) {
+ sketch.update(l);
+ }
+ } else if (rawValues instanceof Double[]) {
+ for (Double d : (Double[]) rawValues) {
+ sketch.update(d);
+ }
+ } else if (rawValues instanceof Float[]) {
+ for (Float f : (Float[]) rawValues) {
+ sketch.update(f);
+ }
+ } else {
+ throw new IllegalStateException(
+ "Unsupported data type for CPC Sketch aggregation: " +
rawValues.getClass().getSimpleName());
+ }
+ }
+
+ private static final class DictIdsWrapper {
+ final Dictionary _dictionary;
+ final RoaringBitmap _dictIdBitmap;
+
+ private DictIdsWrapper(Dictionary dictionary) {
+ _dictionary = dictionary;
+ _dictIdBitmap = new RoaringBitmap();
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
new file mode 100644
index 0000000000..ab153c8835
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.SerializedCPCSketch;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code DistinctCountRawCPCAggregationFunction} shares the same usage as
the
+ * {@link DistinctCountCPCSketchAggregationFunction}, and returns the sketch
as a base64 encoded string.
+ */
+public class DistinctCountRawCPCSketchAggregationFunction extends
DistinctCountCPCSketchAggregationFunction {
+
+ public DistinctCountRawCPCSketchAggregationFunction(List<ExpressionContext>
arguments) {
+ super(arguments);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTRAWCPCSKETCH;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.STRING;
+ }
+
+ @Override
+ public SerializedCPCSketch extractFinalResult(CpcSketch sketch) {
+ return new SerializedCPCSketch(sketch);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
similarity index 53%
copy from
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
index 62a350a367..82e9a74161 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
@@ -18,26 +18,34 @@
*/
package org.apache.pinot.core.segment.processing.aggregator;
-import org.apache.datasketches.theta.Sketch;
-import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.utils.CommonConstants;
-public class DistinctCountThetaSketchAggregator implements ValueAggregator {
+public class DistinctCountCPCSketchAggregator implements ValueAggregator {
- private final Union _union;
-
- public DistinctCountThetaSketchAggregator() {
- // TODO: Handle configurable nominal entries
- _union =
Union.builder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES).buildUnion();
+ public DistinctCountCPCSketchAggregator() {
}
@Override
public Object aggregate(Object value1, Object value2) {
- Sketch first = ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize((byte[])
value1);
- Sketch second = ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize((byte[])
value2);
- Sketch result = _union.union(first, second);
- return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(result);
+ CpcSketch first =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value1);
+ CpcSketch second =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value2);
+ CpcSketch result;
+ if (first == null && second == null) {
+ result = new CpcSketch(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+ } else if (second == null) {
+ result = first;
+ } else if (first == null) {
+ result = second;
+ } else {
+ CpcUnion union = new
CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+ union.update(first);
+ union.update(second);
+ result = union.getResult();
+ }
+ return ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(result);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
index 62a350a367..b11f7d7b00 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
@@ -35,9 +35,9 @@ public class DistinctCountThetaSketchAggregator implements
ValueAggregator {
@Override
public Object aggregate(Object value1, Object value2) {
- Sketch first = ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize((byte[])
value1);
- Sketch second = ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize((byte[])
value2);
+ Sketch first =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value1);
+ Sketch second =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value2);
Sketch result = _union.union(first, second);
- return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(result);
+ return ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(result);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
index cd5388870d..26182d2c74 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
@@ -55,6 +55,9 @@ public class ValueAggregatorFactory {
case SUMVALUESINTEGERSUMTUPLESKETCH:
case AVGVALUEINTEGERSUMTUPLESKETCH:
return new IntegerTupleSketchAggregator(IntegerSummary.Mode.Sum);
+ case DISTINCTCOUNTCPCSKETCH:
+ case DISTINCTCOUNTRAWCPCSKETCH:
+ return new DistinctCountCPCSketchAggregator();
default:
throw new IllegalStateException("Unsupported aggregation type: " +
aggregationType);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index ccd9ae3f6f..8116a29373 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.datasketches.cpc.CpcSketch;
import
org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
import org.apache.pinot.segment.local.customobject.AvgPair;
@@ -374,4 +375,21 @@ public class ObjectSerDeUtilsTest {
assertEquals(actual, expected, ERROR_MESSAGE);
}
}
+
+ @Test
+ public void testCpcSketch() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ CpcSketch sketch = new CpcSketch();
+ int size = RANDOM.nextInt(100) + 1;
+ for (int j = 0; j < size; j++) {
+ sketch.update(RANDOM.nextLong());
+ }
+
+ byte[] bytes = ObjectSerDeUtils.serialize(sketch);
+ CpcSketch actual =
+ ObjectSerDeUtils.deserialize(bytes,
ObjectSerDeUtils.ObjectType.CompressedProbabilisticCounting);
+
+ assertEquals(actual.getEstimate(), sketch.getEstimate(), ERROR_MESSAGE);
+ }
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
index b62d363c4f..039ba01018 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
@@ -27,7 +27,7 @@ import org.testng.annotations.Test;
public class SketchFunctionsTest {
private double thetaEstimate(byte[] bytes) {
- return
ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize(bytes).getEstimate();
+ return
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(bytes).getEstimate();
}
byte[] _bytes = {1, 2, 3};
@@ -78,4 +78,20 @@ public class SketchFunctionsTest {
Assert.assertThrows(IllegalArgumentException.class,
() -> SketchFunctions.toIntegerSumTupleSketch(new Object(), 1, 1024));
}
+
+ private double cpcEstimate(byte[] bytes) {
+ return
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytes).getEstimate();
+ }
+
+ @Test
+ public void testCpcCreation() {
+ for (Object i : _inputs) {
+ Assert.assertEquals(cpcEstimate(SketchFunctions.toCpcSketch(i)), 1.0);
+ Assert.assertEquals(cpcEstimate(SketchFunctions.toCpcSketch(i, 11)),
1.0);
+ }
+ Assert.assertEquals(cpcEstimate(SketchFunctions.toCpcSketch(null)), 0.0);
+ Assert.assertEquals(cpcEstimate(SketchFunctions.toCpcSketch(null, 11)),
0.0);
+ Assert.assertThrows(IllegalArgumentException.class, () ->
SketchFunctions.toCpcSketch(new Object()));
+ Assert.assertThrows(IllegalArgumentException.class, () ->
SketchFunctions.toCpcSketch(new Object(), 11));
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountCPCSketchStarTreeV2Test.java
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountCPCSketchStarTreeV2Test.java
new file mode 100644
index 0000000000..3732d3553b
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountCPCSketchStarTreeV2Test.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Collections;
+import java.util.Random;
+import org.apache.datasketches.cpc.CpcSketch;
+import
org.apache.pinot.segment.local.aggregator.DistinctCountCPCSketchValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DistinctCountCPCSketchStarTreeV2Test extends
BaseStarTreeV2Test<Object, CpcSketch> {
+
+ @Override
+ ValueAggregator<Object, CpcSketch> getValueAggregator() {
+ return new DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ }
+
+ @Override
+ DataType getRawValueType() {
+ return DataType.INT;
+ }
+
+ @Override
+ Object getRandomRawValue(Random random) {
+ return random.nextInt(100);
+ }
+
+ @Override
+ void assertAggregatedValue(CpcSketch starTreeResult, CpcSketch
nonStarTreeResult) {
+ assertEquals((long) starTreeResult.getEstimate(), (long)
nonStarTreeResult.getEstimate());
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
index 35fc8127bc..4d04539b4a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
@@ -299,7 +299,7 @@ public class DistinctCountThetaSketchQueriesTest extends
BaseQueriesTest {
String query = "SELECT DISTINCT_COUNT_RAW_THETA_SKETCH(intSVColumn) FROM
testTable";
BrokerResponseNative brokerResponse = getBrokerResponse(query);
String serializedSketch = (String)
brokerResponse.getResultTable().getRows().get(0)[0];
- Sketch sketch =
ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize(Base64.getDecoder().decode(serializedSketch));
+ Sketch sketch =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(Base64.getDecoder().decode(serializedSketch));
assertEquals(Math.round(sketch.getEstimate()), NUM_RECORDS);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
new file mode 100644
index 0000000000..94008c404e
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class CpcSketchTest extends CustomDataQueryClusterIntegrationTest {
+
+ private static final String DEFAULT_TABLE_NAME = "CpcSketchTest";
+ private static final String ID = "id";
+ private static final String MET_CPC_SKETCH_BYTES = "metCpcSketchBytes";
+
+ @Override
+ protected long getCountStarResult() {
+ return 1000;
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = String.format("SELECT DISTINCT_COUNT_CPC_SKETCH(%s),
DISTINCT_COUNT_RAW_CPC_SKETCH(%s) FROM %s",
+ MET_CPC_SKETCH_BYTES, MET_CPC_SKETCH_BYTES, getTableName());
+ JsonNode jsonNode = postQuery(query);
+ long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+ byte[] rawSketchBytes =
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
+ CpcSketch deserializedSketch =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(rawSketchBytes);
+
+ assertTrue(distinctCount > 0);
+ assertEquals(Math.round(deserializedSketch.getEstimate()), distinctCount);
+ }
+
+ @Test(dataProvider = "useV2QueryEngine")
+ public void testCpcUnionQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ for (int i = 0; i < 10; i++) {
+ String query = "SELECT " +
"DISTINCT_COUNT_CPC_SKETCH(metCpcSketchBytes), "
+ +
"GET_CPC_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes)) " +
"FROM " + getTableName()
+ + " WHERE id=" + i;
+ JsonNode jsonNode = postQuery(query);
+ long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asLong(),
distinctCount);
+ query = "SELECT " +
"GET_CPC_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes) "
+ + "FILTER (WHERE id = " + i + ")) " + "FROM " + getTableName();
+ jsonNode = postQuery(query);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(),
distinctCount);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ // Query Type 1:
+ String query = "SELECT DISTINCT_COUNT_CPC_SKETCH(metCpcSketchBytes), "
+ +
"GET_CPC_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes)) " +
"FROM " + getTableName()
+ + " WHERE id=" + i + " OR id=" + j;
+ JsonNode jsonNode = postQuery(query);
+ long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asLong(),
distinctCount);
+
+ // Query Type 2:
+ query = "SELECT " +
"GET_CPC_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes) "
+ + "FILTER (WHERE id = " + i + " OR id = " + j + ")) " + "FROM " +
getTableName();
+ jsonNode = postQuery(query);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(),
distinctCount);
+
+ // Query Type 3:
+ query = "SELECT " + "GET_CPC_SKETCH_ESTIMATE(CPC_SKETCH_UNION( "
+ + "DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes) FILTER (WHERE
id = " + i + "),"
+ + "DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes) FILTER (WHERE
id = " + j + ")))" + " FROM "
+ + getTableName();
+
+ jsonNode = postQuery(query);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(),
distinctCount);
+ }
+ }
+ }
+
+ @Test(dataProvider = "useV2QueryEngine")
+ public void testUnionWithSketchQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String query = String.format(
+ "SELECT " + "DISTINCT_COUNT_CPC_SKETCH(%s), " +
"DISTINCT_COUNT_RAW_CPC_SKETCH(%s) " + "FROM " + "("
+ + "SELECT %s FROM %s WHERE %s = 4 " + "UNION ALL " + "SELECT %s
FROM %s WHERE %s = 5 " + "UNION ALL "
+ + "SELECT %s FROM %s WHERE %s = 6 " + "UNION ALL " + "SELECT %s
FROM %s WHERE %s = 7 " + ")",
+ MET_CPC_SKETCH_BYTES, MET_CPC_SKETCH_BYTES, MET_CPC_SKETCH_BYTES,
getTableName(), ID, MET_CPC_SKETCH_BYTES,
+ getTableName(), ID, MET_CPC_SKETCH_BYTES, getTableName(), ID,
MET_CPC_SKETCH_BYTES, getTableName(), ID);
+ JsonNode jsonNode = postQuery(query);
+ long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+ byte[] rawSketchBytes =
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
+ CpcSketch deserializedSketch =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(rawSketchBytes);
+
+ assertTrue(distinctCount > 0);
+ assertEquals(Math.round(deserializedSketch.getEstimate()), distinctCount);
+ }
+
+ @Test(dataProvider = "useV2QueryEngine")
+ public void testJoinWithSketchQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = String.format(
+ "SELECT " + "DISTINCT_COUNT_CPC_SKETCH(a.%s), " +
"DISTINCT_COUNT_RAW_CPC_SKETCH(a.%s), "
+ + "DISTINCT_COUNT_CPC_SKETCH(b.%s), " +
"DISTINCT_COUNT_RAW_CPC_SKETCH(b.%s) " + "FROM "
+ + "(SELECT * FROM %s WHERE %s < 8 ) a " + "JOIN " + "(SELECT *
FROM %s WHERE %s > 3 ) b "
+ + "ON a.%s = b.%s", MET_CPC_SKETCH_BYTES, MET_CPC_SKETCH_BYTES,
MET_CPC_SKETCH_BYTES, MET_CPC_SKETCH_BYTES,
+ getTableName(), ID, getTableName(), ID, ID, ID);
+ JsonNode jsonNode = postQuery(query);
+ long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+ byte[] rawSketchBytes =
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
+ CpcSketch deserializedSketch =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(rawSketchBytes);
+ assertTrue(distinctCount > 0);
+ assertEquals(Math.round(deserializedSketch.getEstimate()), distinctCount);
+
+ distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(2).asLong();
+ rawSketchBytes =
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(3).asText());
+ deserializedSketch =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(rawSketchBytes);
+ assertTrue(distinctCount > 0);
+ assertEquals(Math.round(deserializedSketch.getEstimate()), distinctCount);
+ }
+
+ @Override
+ public String getTableName() {
+ return DEFAULT_TABLE_NAME;
+ }
+
+ @Override
+ public Schema createSchema() {
+ return new
Schema.SchemaBuilder().setSchemaName(getTableName()).addSingleValueDimension(ID,
FieldSpec.DataType.INT)
+ .addMetric(MET_CPC_SKETCH_BYTES, FieldSpec.DataType.BYTES).build();
+ }
+
+ @Override
+ public File createAvroFile()
+ throws Exception {
+ // create avro schema
+ org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+ avroSchema.setFields(ImmutableList.of(
+ new org.apache.avro.Schema.Field(ID,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), null,
+ null), new org.apache.avro.Schema.Field(MET_CPC_SKETCH_BYTES,
+ org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES),
null, null)));
+
+ // create avro file
+ File avroFile = new File(_tempDir, "data.avro");
+ try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ fileWriter.create(avroSchema, avroFile);
+ for (int i = 0; i < getCountStarResult(); i++) {
+ // create avro record
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put(ID, RandomUtils.nextInt(10));
+ record.put(MET_CPC_SKETCH_BYTES, ByteBuffer.wrap(getRandomRawValue()));
+ // add avro record to file
+ fileWriter.append(record);
+ }
+ }
+ return avroFile;
+ }
+
+ private byte[] getRandomRawValue() {
+ CpcSketch sketch = new CpcSketch(4);
+ sketch.update(RANDOM.nextInt(100));
+ return ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(sketch);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
new file mode 100644
index 0000000000..7ac3090188
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import java.util.List;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class DistinctCountCPCSketchValueAggregator implements
ValueAggregator<Object, CpcSketch> {
+ public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+ private final int _lgK;
+
+ private int _maxByteSize;
+
+ public DistinctCountCPCSketchValueAggregator(List<ExpressionContext>
arguments) {
+ // length 1 means we use the Helix default
+ if (arguments.size() <= 1) {
+ _lgK = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK;
+ } else {
+ _lgK = arguments.get(1).getLiteral().getIntValue();
+ }
+ }
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.DISTINCTCOUNTCPCSKETCH;
+ }
+
+ @Override
+ public DataType getAggregatedValueType() {
+ return AGGREGATED_VALUE_TYPE;
+ }
+
+ @Override
+ public CpcSketch getInitialAggregatedValue(Object rawValue) {
+ CpcSketch initialValue;
+ if (rawValue instanceof byte[]) { // Serialized Sketch
+ byte[] bytes = (byte[]) rawValue;
+ initialValue = deserializeAggregatedValue(bytes);
+ _maxByteSize = Math.max(_maxByteSize, bytes.length);
+ } else if (rawValue instanceof byte[][]) { // Multiple Serialized Sketches
+ byte[][] serializedSketches = (byte[][]) rawValue;
+ CpcUnion union = new CpcUnion(_lgK);
+ for (byte[] bytes : serializedSketches) {
+ union.update(deserializeAggregatedValue(bytes));
+ }
+ initialValue = union.getResult();
+ updateMaxByteSize(initialValue);
+ } else {
+ initialValue = empty();
+ addObjectToSketch(rawValue, initialValue);
+ updateMaxByteSize(initialValue);
+ }
+ return initialValue;
+ }
+
+ @Override
+ public CpcSketch applyRawValue(CpcSketch value, Object rawValue) {
+ if (rawValue instanceof byte[]) {
+ byte[] bytes = (byte[]) rawValue;
+ CpcSketch sketch = union(value, deserializeAggregatedValue(bytes));
+ updateMaxByteSize(sketch);
+ return sketch;
+ } else {
+ addObjectToSketch(rawValue, value);
+ updateMaxByteSize(value);
+ return value;
+ }
+ }
+
+ @Override
+ public CpcSketch applyAggregatedValue(CpcSketch value, CpcSketch
aggregatedValue) {
+ CpcSketch result = union(value, aggregatedValue);
+ updateMaxByteSize(result);
+ return result;
+ }
+
+ @Override
+ public CpcSketch cloneAggregatedValue(CpcSketch value) {
+ return deserializeAggregatedValue(serializeAggregatedValue(value));
+ }
+
+ @Override
+ public int getMaxAggregatedValueByteSize() {
+ // NOTE: For aggregated metrics, initial aggregated value might have not
been generated. Returns the byte size
+ // based on lgK.
+ return _maxByteSize > 0 ? _maxByteSize :
CpcSketch.getMaxSerializedBytes(_lgK);
+ }
+
+ @Override
+ public byte[] serializeAggregatedValue(CpcSketch value) {
+ return CustomSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(value);
+ }
+
+ @Override
+ public CpcSketch deserializeAggregatedValue(byte[] bytes) {
+ return CustomSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytes);
+ }
+
+ private CpcSketch union(CpcSketch left, CpcSketch right) {
+ if (left == null && right == null) {
+ return empty();
+ } else if (left == null) {
+ return right;
+ } else if (right == null) {
+ return left;
+ }
+
+ CpcUnion union = new CpcUnion(_lgK);
+ union.update(left);
+ union.update(right);
+ return union.getResult();
+ }
+
+ private void addObjectToSketch(Object rawValue, CpcSketch sketch) {
+ if (rawValue instanceof String) {
+ sketch.update((String) rawValue);
+ } else if (rawValue instanceof Integer) {
+ sketch.update((Integer) rawValue);
+ } else if (rawValue instanceof Long) {
+ sketch.update((Long) rawValue);
+ } else if (rawValue instanceof Double) {
+ sketch.update((Double) rawValue);
+ } else if (rawValue instanceof Float) {
+ sketch.update((Float) rawValue);
+ } else if (rawValue instanceof Object[]) {
+ addObjectsToSketch((Object[]) rawValue, sketch);
+ } else {
+ throw new IllegalStateException(
+ "Unsupported data type for CPC Sketch aggregation: " +
rawValue.getClass().getSimpleName());
+ }
+ }
+
+ private void addObjectsToSketch(Object[] rawValues, CpcSketch sketch) {
+ if (rawValues instanceof String[]) {
+ for (String s : (String[]) rawValues) {
+ sketch.update(s);
+ }
+ } else if (rawValues instanceof Integer[]) {
+ for (Integer i : (Integer[]) rawValues) {
+ sketch.update(i);
+ }
+ } else if (rawValues instanceof Long[]) {
+ for (Long l : (Long[]) rawValues) {
+ sketch.update(l);
+ }
+ } else if (rawValues instanceof Double[]) {
+ for (Double d : (Double[]) rawValues) {
+ sketch.update(d);
+ }
+ } else if (rawValues instanceof Float[]) {
+ for (Float f : (Float[]) rawValues) {
+ sketch.update(f);
+ }
+ } else {
+ throw new IllegalStateException(
+ "Unsupported data type for CPC Sketch aggregation: " +
rawValues.getClass().getSimpleName());
+ }
+ }
+
+ private void updateMaxByteSize(CpcSketch sketch) {
+ if (sketch != null) {
+ _maxByteSize = Math.max(_maxByteSize, sketch.toByteArray().length);
+ }
+ }
+
+ private CpcSketch empty() {
+ return new CpcSketch(_lgK);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
index 9b2a428c3b..f36f9a00e9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
@@ -168,11 +168,11 @@ public class DistinctCountThetaSketchValueAggregator
implements ValueAggregator<
@Override
public byte[] serializeAggregatedValue(Sketch value) {
- return CustomSerDeUtils.DATA_SKETCH_SER_DE.serialize(value);
+ return CustomSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(value);
}
@Override
public Sketch deserializeAggregatedValue(byte[] bytes) {
- return CustomSerDeUtils.DATA_SKETCH_SER_DE.deserialize(bytes);
+ return CustomSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(bytes);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index 16dc0328f7..18c5ac96f6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -78,6 +78,9 @@ public class ValueAggregatorFactory {
case AVGVALUEINTEGERSUMTUPLESKETCH:
case SUMVALUESINTEGERSUMTUPLESKETCH:
return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
+ case DISTINCTCOUNTCPCSKETCH:
+ case DISTINCTCOUNTRAWCPCSKETCH:
+ return new DistinctCountCPCSketchValueAggregator(arguments);
default:
throw new IllegalStateException("Unsupported aggregation type: " +
aggregationType);
}
@@ -127,6 +130,9 @@ public class ValueAggregatorFactory {
case AVGVALUEINTEGERSUMTUPLESKETCH:
case SUMVALUESINTEGERSUMTUPLESKETCH:
return IntegerTupleSketchValueAggregator.AGGREGATED_VALUE_TYPE;
+ case DISTINCTCOUNTCPCSKETCH:
+ case DISTINCTCOUNTRAWCPCSKETCH:
+ return DistinctCountCPCSketchValueAggregator.AGGREGATED_VALUE_TYPE;
default:
throw new IllegalStateException("Unsupported aggregation type: " +
aggregationType);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedCPCSketch.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedCPCSketch.java
new file mode 100644
index 0000000000..52dbf4f414
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedCPCSketch.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.Base64;
+import org.apache.datasketches.cpc.CpcSketch;
+
+
+/**
+ * Serialized and comparable version of CPC Sketch.
+ * Ordering is defined by the cardinality estimate and not the size
+ * of the underlying sketch.
+ */
+public class SerializedCPCSketch implements Comparable<SerializedCPCSketch> {
+ private final CpcSketch _sketch;
+
+ public SerializedCPCSketch(CpcSketch sketch) {
+ _sketch = sketch;
+ }
+
+ @Override
+ public int compareTo(SerializedCPCSketch other) {
+ return Double.compare(_sketch.getEstimate(), other._sketch.getEstimate());
+ }
+
+ @Override
+ public String toString() {
+ return Base64.getEncoder().encodeToString(_sketch.toByteArray());
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
index f5a6275a3b..b1515c5b41 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
@@ -25,6 +25,7 @@ import com.tdunning.math.stats.MergingDigest;
import com.tdunning.math.stats.TDigest;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.datasketches.cpc.CpcSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
@@ -237,7 +238,7 @@ public class CustomSerDeUtils {
}
};
- public static final ObjectSerDe<Sketch> DATA_SKETCH_SER_DE = new
ObjectSerDe<Sketch>() {
+ public static final ObjectSerDe<Sketch> DATA_SKETCH_THETA_SER_DE = new
ObjectSerDe<Sketch>() {
@Override
public byte[] serialize(Sketch value) {
@@ -281,6 +282,26 @@ public class CustomSerDeUtils {
}
};
+ public static final ObjectSerDe<CpcSketch> DATA_SKETCH_CPC_SER_DE = new
ObjectSerDe<CpcSketch>() {
+
+ @Override
+ public byte[] serialize(CpcSketch value) {
+ return value.toByteArray();
+ }
+
+ @Override
+ public CpcSketch deserialize(byte[] bytes) {
+ return CpcSketch.heapify(Memory.wrap(bytes));
+ }
+
+ @Override
+ public CpcSketch deserialize(ByteBuffer byteBuffer) {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return CpcSketch.heapify(Memory.wrap(bytes));
+ }
+ };
+
public static final ObjectSerDe<RoaringBitmap> ROARING_BITMAP_SER_DE = new
ObjectSerDe<RoaringBitmap>() {
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 71e47c7c62..5065d1b186 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -213,8 +213,7 @@ public final class TableConfigUtils {
String rawTableName =
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
String schemaName = tableConfig.getValidationConfig().getSchemaName();
if (schemaName != null && !schemaName.equals(rawTableName)) {
- throw new IllegalStateException(
- "Schema name: " + schemaName + " does not match table name: " +
rawTableName);
+ throw new IllegalStateException("Schema name: " + schemaName + " does
not match table name: " + rawTableName);
}
}
@@ -576,7 +575,8 @@ public final class TableConfigUtils {
public final static EnumSet<AggregationFunctionType>
AVAILABLE_CORE_VALUE_AGGREGATORS =
EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL,
DISTINCTCOUNTTHETASKETCH,
DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH,
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
- SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH,
DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS);
+ SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH,
DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS,
+ DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH);
@VisibleForTesting
static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
@@ -626,8 +626,8 @@ public final class TableConfigUtils {
throw new IllegalArgumentException("ValueAggregator not
enabled for type: " + aft.toString());
}
} catch (IllegalArgumentException e) {
- String err = String.format(
- "Column \"%s\" has invalid aggregate type: %s",
entry.getKey(), entry.getValue());
+ String err =
+ String.format("Column \"%s\" has invalid aggregate type:
%s", entry.getKey(), entry.getValue());
throw new IllegalStateException(err);
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
new file mode 100644
index 0000000000..b8dcb701f5
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+
+public class DistinctCountCPCSketchValueAggregatorTest {
+
+ @Test
+ public void initialShouldCreateSingleItemSketch() {
+ DistinctCountCPCSketchValueAggregator agg = new
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ assertEquals(agg.getInitialAggregatedValue("hello world").getEstimate(),
1.0);
+ }
+
+ @Test
+ public void initialShouldParseASketch() {
+ CpcSketch input = new CpcSketch();
+ IntStream.range(0, 1000).forEach(input::update);
+ DistinctCountCPCSketchValueAggregator agg = new
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ byte[] bytes = agg.serializeAggregatedValue(input);
+ assertEquals(agg.getInitialAggregatedValue(bytes).getEstimate(),
input.getEstimate());
+
+ // and should update the max size
+ assertEquals(agg.getMaxAggregatedValueByteSize(),
input.toByteArray().length);
+ }
+
+ @Test
+ public void initialShouldParseMultiValueSketches() {
+ CpcSketch input1 = new CpcSketch();
+ input1.update("hello");
+ CpcSketch input2 = new CpcSketch();
+ input2.update("world");
+ DistinctCountCPCSketchValueAggregator agg = new
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ byte[][] bytes = {agg.serializeAggregatedValue(input1),
agg.serializeAggregatedValue(input2)};
+
assertEquals(Math.round(agg.getInitialAggregatedValue(bytes).getEstimate()), 2);
+ }
+
+ @Test
+ public void applyAggregatedValueShouldUnion() {
+ CpcSketch input1 = new CpcSketch();
+ IntStream.range(0, 1000).forEach(input1::update);
+ CpcSketch input2 = new CpcSketch();
+ IntStream.range(0, 1000).forEach(input2::update);
+ DistinctCountCPCSketchValueAggregator agg = new
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ CpcSketch result = agg.applyAggregatedValue(input1, input2);
+
+ CpcUnion union = new
CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+ union.update(input1);
+ union.update(input2);
+ CpcSketch merged = union.getResult();
+
+ assertEquals(result.getEstimate(), merged.getEstimate());
+
+ // and should update the max size
+ assertEquals(agg.getMaxAggregatedValueByteSize(),
merged.toByteArray().length);
+ }
+
+ @Test
+ public void applyRawValueShouldUnion() {
+ CpcSketch input1 = new CpcSketch();
+ IntStream.range(0, 1000).forEach(input1::update);
+ CpcSketch input2 = new CpcSketch();
+ IntStream.range(0, 1000).forEach(input2::update);
+ DistinctCountCPCSketchValueAggregator agg = new
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ byte[] result2bytes = agg.serializeAggregatedValue(input2);
+ CpcSketch result = agg.applyRawValue(input1, result2bytes);
+
+ CpcUnion union = new
CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+ union.update(input1);
+ union.update(input2);
+ CpcSketch merged = union.getResult();
+
+ assertEquals(result.getEstimate(), merged.getEstimate());
+
+ // and should update the max size
+ assertEquals(agg.getMaxAggregatedValueByteSize(),
merged.toByteArray().length);
+ }
+
+ @Test
+ public void applyRawValueShouldAdd() {
+ CpcSketch input1 = new CpcSketch();
+ input1.update("hello".hashCode());
+ DistinctCountCPCSketchValueAggregator agg = new
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ CpcSketch result = agg.applyRawValue(input1, "world");
+ assertEquals(Math.round(result.getEstimate()), 2);
+
+ CpcSketch pristine = new CpcSketch();
+ pristine.update("hello");
+ pristine.update("world");
+ assertEquals(agg.getMaxAggregatedValueByteSize(),
pristine.toByteArray().length);
+ }
+
+ @Test
+ public void applyRawValueShouldSupportMultiValue() {
+ CpcSketch input1 = new CpcSketch();
+ input1.update("hello");
+ DistinctCountCPCSketchValueAggregator agg = new
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ String[] strings = {"hello", "world", "this", "is", "some", "strings"};
+ CpcSketch result = agg.applyRawValue(input1, strings);
+
+ assertEquals(Math.round(result.getEstimate()), 6);
+
+ CpcSketch pristine = new CpcSketch();
+ for (String value : strings) {
+ pristine.update(value);
+ }
+ assertEquals(agg.getMaxAggregatedValueByteSize(),
pristine.toByteArray().length);
+ }
+
+ @Test
+ public void getInitialValueShouldSupportDifferentTypes() {
+ DistinctCountCPCSketchValueAggregator agg = new
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ assertEquals(agg.getInitialAggregatedValue(12345).getEstimate(), 1.0);
+ assertEquals(agg.getInitialAggregatedValue(12345L).getEstimate(), 1.0);
+ assertEquals(agg.getInitialAggregatedValue(12.345f).getEstimate(), 1.0);
+ assertEquals(agg.getInitialAggregatedValue(12.345d).getEstimate(), 1.0);
+ assertThrows(() -> agg.getInitialAggregatedValue(new Object()));
+ }
+
+ @Test
+ public void getInitialValueShouldSupportMultiValueTypes() {
+ DistinctCountCPCSketchValueAggregator agg = new
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ Integer[] ints = {12345};
+ assertEquals(agg.getInitialAggregatedValue(ints).getEstimate(), 1.0);
+ Long[] longs = {12345L};
+ assertEquals(agg.getInitialAggregatedValue(longs).getEstimate(), 1.0);
+ Float[] floats = {12.345f};
+ assertEquals(agg.getInitialAggregatedValue(floats).getEstimate(), 1.0);
+ Double[] doubles = {12.345d};
+ assertEquals(agg.getInitialAggregatedValue(doubles).getEstimate(), 1.0);
+ Object[] objects = {new Object()};
+ assertThrows(() -> agg.getInitialAggregatedValue(objects));
+ byte[][] zeroSketches = {};
+ assertEquals(agg.getInitialAggregatedValue(zeroSketches).getEstimate(),
0.0);
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index b9aaf8ee86..5a7c127e54 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -69,17 +69,14 @@ public enum AggregationFunctionType {
FIRSTWITHTIME("firstWithTime", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.or(
OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.TIMESTAMP, SqlTypeFamily.CHARACTER)),
- OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.NUMERIC, SqlTypeFamily.CHARACTER))
- ),
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.NUMERIC, SqlTypeFamily.CHARACTER))),
ReturnTypes.ARG0, ReturnTypes.explicit(SqlTypeName.OTHER)),
- LASTWITHTIME("lastWithTime", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
- OperandTypes.or(
- OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.TIMESTAMP, SqlTypeFamily.CHARACTER)),
- OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.NUMERIC, SqlTypeFamily.CHARACTER))
- ),
+ LASTWITHTIME("lastWithTime", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.or(
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.TIMESTAMP, SqlTypeFamily.CHARACTER)),
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.NUMERIC, SqlTypeFamily.CHARACTER))),
+ ReturnTypes.ARG0, ReturnTypes.explicit(SqlTypeName.OTHER)),
+ MINMAXRANGE("minMaxRange", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.NUMERIC, OperandTypes.NUMERIC,
ReturnTypes.ARG0, ReturnTypes.explicit(SqlTypeName.OTHER)),
- MINMAXRANGE("minMaxRange", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.NUMERIC,
- OperandTypes.NUMERIC, ReturnTypes.ARG0,
ReturnTypes.explicit(SqlTypeName.OTHER)),
/**
* for all distinct count family functions:
* (1) distinct_count only supports single argument;
@@ -107,20 +104,19 @@ public enum AggregationFunctionType {
OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)),
// DEPRECATED in v2
- @Deprecated
- FASTHLL("fastHLL"),
- DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch", null,
- SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ @Deprecated FASTHLL("fastHLL"),
+ DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch", null,
SqlKind.OTHER_FUNCTION,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)),
- DISTINCTCOUNTRAWTHETASKETCH("distinctCountRawThetaSketch", null,
- SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ DISTINCTCOUNTRAWTHETASKETCH("distinctCountRawThetaSketch", null,
SqlKind.OTHER_FUNCTION,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)),
- DISTINCTSUM("distinctSum", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.NUMERIC,
- OperandTypes.NUMERIC, ReturnTypes.AGG_SUM,
ReturnTypes.explicit(SqlTypeName.OTHER)),
- DISTINCTAVG("distinctAvg", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.NUMERIC,
- OperandTypes.NUMERIC, ReturnTypes.explicit(SqlTypeName.DOUBLE),
ReturnTypes.explicit(SqlTypeName.OTHER)),
+ DISTINCTSUM("distinctSum", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.NUMERIC, OperandTypes.NUMERIC,
+ ReturnTypes.AGG_SUM, ReturnTypes.explicit(SqlTypeName.OTHER)),
+ DISTINCTAVG("distinctAvg", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.NUMERIC, OperandTypes.NUMERIC,
+ ReturnTypes.explicit(SqlTypeName.DOUBLE),
ReturnTypes.explicit(SqlTypeName.OTHER)),
PERCENTILE("percentile", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC,
SqlTypeFamily.NUMERIC)), ReturnTypes.ARG0,
@@ -138,8 +134,7 @@ public enum AggregationFunctionType {
OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC,
SqlTypeFamily.NUMERIC)), ReturnTypes.VARCHAR_2000,
ReturnTypes.explicit(SqlTypeName.OTHER)),
// DEPRECATED in v2
- @Deprecated
- PERCENTILESMARTTDIGEST("percentileSmartTDigest"),
+ @Deprecated PERCENTILESMARTTDIGEST("percentileSmartTDigest"),
PERCENTILEKLL("percentileKLL", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC,
SqlTypeFamily.NUMERIC)), ReturnTypes.ARG0,
ReturnTypes.explicit(SqlTypeName.OTHER)),
@@ -157,8 +152,7 @@ public enum AggregationFunctionType {
ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)),
// DEPRECATED in v2
- @Deprecated
- IDSET("idSet"),
+ @Deprecated IDSET("idSet"),
// TODO: support histogram requires solving ARRAY constructor and
multi-function signature without optional ordinal
HISTOGRAM("histogram"),
@@ -179,10 +173,10 @@ public enum AggregationFunctionType {
OperandTypes.NUMERIC, ReturnTypes.DOUBLE,
ReturnTypes.explicit(SqlTypeName.OTHER)),
STDDEVSAMP("stdDevSamp", Collections.emptyList(), SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.NUMERIC, ReturnTypes.DOUBLE,
ReturnTypes.explicit(SqlTypeName.OTHER)),
- SKEWNESS("skewness", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
- OperandTypes.NUMERIC, ReturnTypes.DOUBLE,
ReturnTypes.explicit(SqlTypeName.OTHER)),
- KURTOSIS("kurtosis", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
- OperandTypes.NUMERIC, ReturnTypes.DOUBLE,
ReturnTypes.explicit(SqlTypeName.OTHER)),
+ SKEWNESS("skewness", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.NUMERIC,
+ ReturnTypes.DOUBLE, ReturnTypes.explicit(SqlTypeName.OTHER)),
+ KURTOSIS("kurtosis", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.NUMERIC,
+ ReturnTypes.DOUBLE, ReturnTypes.explicit(SqlTypeName.OTHER)),
FOURTHMOMENT("fourthMoment"),
// DataSketches Tuple Sketch support
@@ -204,9 +198,20 @@ public enum AggregationFunctionType {
SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.BINARY, ReturnTypes.BIGINT,
ReturnTypes.explicit(SqlTypeName.OTHER)),
+ // Datasketches Frequent Items support
FREQUENTSTRINGSSKETCH("frequentStringsSketch"),
FREQUENTLONGSSKETCH("frequentLongsSketch"),
+ // Datasketches CPC Sketch support
+ DISTINCTCOUNTCPCSKETCH("distinctCountCPCSketch",
ImmutableList.of("DISTINCT_COUNT_CPC_SKETCH"),
+ SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
+ ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)),
+ DISTINCTCOUNTRAWCPCSKETCH("distinctCountRawCPCSketch",
ImmutableList.of("DISTINCT_COUNT_RAW_CPC_SKETCH"),
+ SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
+ ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)),
+
// Geo aggregation functions
STUNION("STUnion", ImmutableList.of("ST_UNION"), SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.BINARY, ReturnTypes.explicit(SqlTypeName.VARBINARY),
ReturnTypes.explicit(SqlTypeName.OTHER)),
@@ -271,35 +276,35 @@ public enum AggregationFunctionType {
ReturnTypes.explicit(SqlTypeName.OTHER)),
// boolean aggregate functions
- BOOLAND("boolAnd", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
- OperandTypes.BOOLEAN, ReturnTypes.BOOLEAN,
ReturnTypes.explicit(SqlTypeName.INTEGER)),
- BOOLOR("boolOr", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
- OperandTypes.BOOLEAN, ReturnTypes.BOOLEAN,
ReturnTypes.explicit(SqlTypeName.INTEGER)),
+ BOOLAND("boolAnd", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BOOLEAN,
+ ReturnTypes.BOOLEAN, ReturnTypes.explicit(SqlTypeName.INTEGER)),
+ BOOLOR("boolOr", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BOOLEAN,
+ ReturnTypes.BOOLEAN, ReturnTypes.explicit(SqlTypeName.INTEGER)),
// ExprMin and ExprMax
// TODO: revisit support for ExprMin/Max count in V2, particularly plug
query rewriter in the right place
EXPRMIN("exprMin", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
- OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY), ordinal ->
ordinal > 1),
- ReturnTypes.ARG0, ReturnTypes.explicit(SqlTypeName.OTHER)),
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY), ordinal ->
ordinal > 1), ReturnTypes.ARG0,
+ ReturnTypes.explicit(SqlTypeName.OTHER)),
EXPRMAX("exprMax", null, SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
- OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY), ordinal ->
ordinal > 1),
- ReturnTypes.ARG0, ReturnTypes.explicit(SqlTypeName.OTHER)),
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY), ordinal ->
ordinal > 1), ReturnTypes.ARG0,
+ ReturnTypes.explicit(SqlTypeName.OTHER)),
-
PARENTEXPRMIN(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX
+ EXPRMIN.getName(),
- null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+
PARENTEXPRMIN(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX
+ EXPRMIN.getName(), null,
+ SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.INTEGER,
SqlTypeFamily.ANY), ordinal -> ordinal > 2),
ReturnTypes.explicit(SqlTypeName.OTHER),
ReturnTypes.explicit(SqlTypeName.OTHER)),
-
PARENTEXPRMAX(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX
+ EXPRMAX.getName(),
- null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+
PARENTEXPRMAX(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX
+ EXPRMAX.getName(), null,
+ SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.INTEGER,
SqlTypeFamily.ANY), ordinal -> ordinal > 2),
ReturnTypes.explicit(SqlTypeName.OTHER),
ReturnTypes.explicit(SqlTypeName.OTHER)),
- CHILDEXPRMIN(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ EXPRMIN.getName(),
- null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ CHILDEXPRMIN(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ EXPRMIN.getName(), null,
+ SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.INTEGER,
SqlTypeFamily.ANY), ordinal -> ordinal > 3),
ReturnTypes.ARG1, ReturnTypes.explicit(SqlTypeName.OTHER)),
- CHILDEXPRMAX(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ EXPRMAX.getName(),
- null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ CHILDEXPRMAX(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ EXPRMAX.getName(), null,
+ SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.family(ImmutableList.of(SqlTypeFamily.INTEGER,
SqlTypeFamily.ANY), ordinal -> ordinal > 3),
ReturnTypes.ARG1, ReturnTypes.explicit(SqlTypeName.OTHER)),
@@ -307,8 +312,9 @@ public enum AggregationFunctionType {
// TODO: revisit support for funnel count in V2
FUNNELCOUNT("funnelCount");
- private static final Set<String> NAMES =
Arrays.stream(values()).flatMap(func -> Stream.of(func.name(),
- func.getName(),
func.getName().toLowerCase())).collect(Collectors.toSet());
+ private static final Set<String> NAMES =
+ Arrays.stream(values()).flatMap(func -> Stream.of(func.name(),
func.getName(), func.getName().toLowerCase()))
+ .collect(Collectors.toSet());
// --------------------------------------------------------------------------
// Function signature used by Calcite.
@@ -373,8 +379,7 @@ public enum AggregationFunctionType {
_returnTypeInference = finalReturnType;
_operandTypeChecker = operandTypeChecker;
- _intermediateReturnTypeInference = intermediateReturnType == null ?
_returnTypeInference
- : intermediateReturnType;
+ _intermediateReturnTypeInference = intermediateReturnType == null ?
_returnTypeInference : intermediateReturnType;
}
public String getName() {
@@ -427,9 +432,7 @@ public enum AggregationFunctionType {
public static String getUnderscoreSplitAggregationFunctionName(String
functionName) {
// Skip functions that have numbers for now and return their name as is
- return functionName.matches(".*\\d.*")
- ? functionName
- : functionName.replaceAll("(.)(\\p{Upper}+|\\d+)", "$1_$2");
+ return functionName.matches(".*\\d.*") ? functionName :
functionName.replaceAll("(.)(\\p{Upper}+|\\d+)", "$1_$2");
}
/**
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 303ad6c716..b5064fad3c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -104,6 +104,8 @@ public class CommonConstants {
public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
+ public static final int DEFAULT_CPC_SKETCH_LGK = 12;
+
// Whether to rewrite DistinctCount to DistinctCountBitmap
public static final String ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY =
"enable.distinct.count.bitmap.override";
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java
index 361eb0b61c..ae42127d7a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java
@@ -40,8 +40,8 @@ import picocli.CommandLine;
@CommandLine.Command
public class SegmentDumpTool extends AbstractBaseCommand implements Command {
- @CommandLine.Option(names = {"-path"}, required = true,
- description = "Path of the folder containing the segment" + " file")
+ @CommandLine.Option(names = {"-path"}, required = true, description = "Path
of the folder containing the segment"
+ + " file")
private String _segmentDir = null;
@CommandLine.Option(names = {"-columns"}, arity = "1..*", description =
"Columns to dump")
@@ -50,8 +50,8 @@ public class SegmentDumpTool extends AbstractBaseCommand
implements Command {
@CommandLine.Option(names = {"-dumpStarTree"})
private boolean _dumpStarTree = false;
- @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required =
false, usageHelp = true,
- description = "Print this message.")
+ @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required =
false, usageHelp = true, description =
+ "Print this message.")
private boolean _help = false;
private void dump()
@@ -94,6 +94,15 @@ public class SegmentDumpTool extends AbstractBaseCommand
implements Command {
}
}
+ // Adds custom output formatting depending on the type of value
+ private void printRowValue(Object value) {
+ if (value instanceof byte[]) {
+ System.out.printf("%s bytes", ((byte[]) value).length);
+ } else {
+ System.out.print(value);
+ }
+ }
+
private void dumpRows(PinotSegmentRecordReader reader, GenericRow reuse,
Set<String> mvColumns) {
int docId = 0;
@@ -103,7 +112,7 @@ public class SegmentDumpTool extends AbstractBaseCommand
implements Command {
for (String columnName : _columnNames) {
if (!mvColumns.contains(columnName)) {
- System.out.print(row.getValue(columnName));
+ printRowValue(row.getValue(columnName));
System.out.print("\t");
} else {
Object[] values = (Object[]) row.getValue(columnName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]