This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cf017f1c63 Add FrequentStringsSketch and FrequentLonsSketch
aggregation functions (#11098)
cf017f1c63 is described below
commit cf017f1c63de8703a71dcaaab4506ffee0672950
Author: Caner Balci <[email protected]>
AuthorDate: Tue Sep 19 16:49:59 2023 -0700
Add FrequentStringsSketch and FrequentLonsSketch aggregation functions
(#11098)
Squashed commit:
Fix checkstyle validations
Split FrequentItems sketch into FrequentStrings and FrequentLongs sketches
Fix datasketch utility import
Add documentation and do some cleanups
Whitespace and typo fixes
Add missing license header
---
.../apache/pinot/core/common/ObjectSerDeUtils.java | 53 ++-
.../function/AggregationFunctionFactory.java | 4 +
.../FrequentLongsSketchAggregationFunction.java | 268 ++++++++++++++
.../FrequentStringsSketchAggregationFunction.java | 252 ++++++++++++++
.../queries/FrequentItemsSketchQueriesTest.java | 387 +++++++++++++++++++++
.../SerializedFrequentLongsSketch.java | 44 +++
.../SerializedFrequentStringsSketch.java | 44 +++
.../pinot/segment/spi/AggregationFunctionType.java | 3 +
8 files changed, 1054 insertions(+), 1 deletion(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index a18a23df35..69c00ea5b8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -57,6 +57,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.datasketches.frequencies.LongsSketch;
import org.apache.datasketches.kll.KllDoublesSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
@@ -134,7 +137,9 @@ public class ObjectSerDeUtils {
PinotFourthMoment(34),
ArgMinMaxObject(35),
KllDataSketch(36),
- IntegerTupleSketch(37);
+ IntegerTupleSketch(37),
+ FrequentStringsSketch(38),
+ FrequentLongsSketch(39);
private final int _value;
@@ -226,6 +231,10 @@ public class ObjectSerDeUtils {
return ObjectType.IntegerTupleSketch;
} else if (value instanceof ExprMinMaxObject) {
return ObjectType.ArgMinMaxObject;
+ } else if (value instanceof ItemsSketch) {
+ return ObjectType.FrequentStringsSketch;
+ } else if (value instanceof LongsSketch) {
+ return ObjectType.FrequentLongsSketch;
} else {
throw new IllegalArgumentException("Unsupported type of value: " +
value.getClass().getSimpleName());
}
@@ -1285,6 +1294,46 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<ItemsSketch<String>>
FREQUENT_STRINGS_SKETCH_SER_DE =
+ new ObjectSerDe<>() {
+ @Override
+ public byte[] serialize(ItemsSketch<String> sketch) {
+ return sketch.toByteArray(new ArrayOfStringsSerDe());
+ }
+
+ @Override
+ public ItemsSketch<String> deserialize(byte[] bytes) {
+ return ItemsSketch.getInstance(Memory.wrap(bytes), new
ArrayOfStringsSerDe());
+ }
+
+ @Override
+ public ItemsSketch<String> deserialize(ByteBuffer byteBuffer) {
+ byte[] arr = new byte[byteBuffer.remaining()];
+ byteBuffer.get(arr);
+ return ItemsSketch.getInstance(Memory.wrap(arr), new
ArrayOfStringsSerDe());
+ }
+ };
+
+ public static final ObjectSerDe<LongsSketch> FREQUENT_LONGS_SKETCH_SER_DE =
+ new ObjectSerDe<>() {
+ @Override
+ public byte[] serialize(LongsSketch sketch) {
+ return sketch.toByteArray();
+ }
+
+ @Override
+ public LongsSketch deserialize(byte[] bytes) {
+ return LongsSketch.getInstance(Memory.wrap(bytes));
+ }
+
+ @Override
+ public LongsSketch deserialize(ByteBuffer byteBuffer) {
+ byte[] arr = new byte[byteBuffer.remaining()];
+ byteBuffer.get(arr);
+ return LongsSketch.getInstance(Memory.wrap(arr));
+ }
+ };
+
// NOTE: DO NOT change the order, it has to be the same order as the
ObjectType
//@formatter:off
private static final ObjectSerDe[] SER_DES = {
@@ -1326,6 +1375,8 @@ public class ObjectSerDeUtils {
ARG_MIN_MAX_OBJECT_SER_DE,
KLL_SKETCH_SER_DE,
DATA_SKETCH_INT_TUPLE_SER_DE,
+ FREQUENT_STRINGS_SKETCH_SER_DE,
+ FREQUENT_LONGS_SKETCH_SER_DE,
};
//@formatter:on
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 7f5695aa6c..2d69c093f2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -351,6 +351,10 @@ public class AggregationFunctionFactory {
"Aggregation function: " + functionType + " is only supported
in selection without alias.");
case FUNNELCOUNT:
return new FunnelCountAggregationFunctionFactory(arguments).get();
+ case FREQUENTSTRINGSSKETCH:
+ return new FrequentStringsSketchAggregationFunction(arguments);
+ case FREQUENTLONGSSKETCH:
+ return new FrequentLongsSketchAggregationFunction(arguments);
default:
throw new IllegalArgumentException("Unsupported aggregation
function type: " + functionType);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentLongsSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentLongsSketchAggregationFunction.java
new file mode 100644
index 0000000000..64761096df
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentLongsSketchAggregationFunction.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.datasketches.frequencies.LongsSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import
org.apache.pinot.segment.local.customobject.SerializedFrequentLongsSketch;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * <p>
+ * {@code FrequentLongsSketchAggregationFunction} provides an approximate
FrequentItems aggregation function based on
+ * <a
href="https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html">Apache
DataSketches library</a>.
+ * It is memory efficient compared to exact counting.
+ * </p>
+ * <p>
+ * The function takes an INT or LONG column as input and returns a Base64
encoded sketch object which can be
+ * deserialized and used to estimate the frequency of items in the dataset
(how many times they appear).
+ * </p>
+ * <p><b>FREQUENT_STRINGS_SKETCH(col, maxMapSize=256)</b></p>
+ * <p>E.g.:</p>
+ * <ul>
+ * <li><b>FREQUENT_LONGS_SKETCH(col)</b></li>
+ * <li><b>FREQUENT_LONGS_SKETCH(col, 1024)</b></li>
+ * </ul>
+ *
+ * <p>
+ * If the column type is BYTES, the aggregation function will assume it is a
serialized FrequentItems data sketch
+ * of type `LongsSketch`and will attempt to deserialize it for merging with
other sketch objects.
+ * </p>
+ *
+ * <p>
+ * Second argument, maxMapsSize, refers to the size of the physical length
of the hashmap which stores counts. It
+ * influences the accuracy of the sketch and should be a power of 2.
+ * </p>
+ *
+ * <p>
+ * There is a variation of the function (<b>FREQUENT_STRINGS_SKETCH</b>)
which accepts STRING type input columns.
+ * </p>
+ */
+public class FrequentLongsSketchAggregationFunction
+ extends BaseSingleInputAggregationFunction<LongsSketch, Comparable<?>> {
+ protected static final int DEFAULT_MAX_MAP_SIZE = 256;
+
+ protected int _maxMapSize;
+
+ public FrequentLongsSketchAggregationFunction(List<ExpressionContext>
arguments) {
+ super(arguments.get(0));
+ int numArguments = arguments.size();
+ Preconditions.checkArgument(numArguments == 1 || numArguments == 2,
+ "Expecting 1 or 2 arguments for FrequentLongsSketch function:
FREQUENTITEMSSKETCH(column, maxMapSize");
+ _maxMapSize = numArguments == 2 ?
arguments.get(1).getLiteral().getIntValue() : DEFAULT_MAX_MAP_SIZE;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.FREQUENTLONGSSKETCH;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ FieldSpec.DataType valueType = valueSet.getValueType();
+
+ LongsSketch sketch = getOrCreateSketch(aggregationResultHolder);
+
+ switch (valueType) {
+ case BYTES:
+ // Assuming the column contains serialized data sketch
+ LongsSketch[] deserializedSketches =
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ sketch = getOrCreateSketch(aggregationResultHolder);
+
+ for (LongsSketch colSketch : deserializedSketches) {
+ sketch.merge(colSketch);
+ }
+ break;
+ case INT:
+ case LONG:
+ for (Long val : valueSet.getLongValuesSV()) {
+ sketch.update(val);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot aggregate on non
int/long types");
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ FieldSpec.DataType valueType = valueSet.getValueType();
+
+ switch (valueType) {
+ case BYTES:
+ // serialized sketch
+ LongsSketch[] deserializedSketches =
+
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ for (int i = 0; i < length; i++) {
+ LongsSketch sketch = getOrCreateSketch(groupByResultHolder,
groupKeyArray[i]);
+ sketch.merge(deserializedSketches[i]);
+ }
+ break;
+ case INT:
+ case LONG:
+ long[] values = valueSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ LongsSketch sketch = getOrCreateSketch(groupByResultHolder,
groupKeyArray[i]);
+ sketch.update(values[i]);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot aggregate on non
int/long types");
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ FieldSpec.DataType valueType = valueSet.getValueType();
+
+ switch (valueType) {
+ case BYTES:
+ // serialized sketch
+ LongsSketch[] deserializedSketches =
+
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ LongsSketch sketch = getOrCreateSketch(groupByResultHolder,
groupKey);
+ sketch.merge(deserializedSketches[i]);
+ }
+ }
+ break;
+ case INT:
+ case LONG:
+ long[] values = valueSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ LongsSketch sketch = getOrCreateSketch(groupByResultHolder,
groupKey);
+ sketch.update(values[i]);
+ }
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot aggregate on non
int/long types");
+ }
+ }
+
+ /**
+ * Extracts the sketch from the result holder or creates a new one if it
does not exist.
+ */
+ protected LongsSketch getOrCreateSketch(AggregationResultHolder
aggregationResultHolder) {
+ LongsSketch sketch = aggregationResultHolder.getResult();
+ if (sketch == null) {
+ sketch = new LongsSketch(_maxMapSize);
+ aggregationResultHolder.setValue(sketch);
+ }
+ return sketch;
+ }
+
+ /**
+ * Extracts the sketch from the group by result holder for key
+ * or creates a new one if it does not exist.
+ */
+ protected LongsSketch getOrCreateSketch(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ LongsSketch sketch = groupByResultHolder.getResult(groupKey);
+ if (sketch == null) {
+ sketch = new LongsSketch(_maxMapSize);
+ groupByResultHolder.setValueForKey(groupKey, sketch);
+ }
+ return sketch;
+ }
+
+ /**
+ * Deserializes the sketches from the bytes.
+ */
+ protected LongsSketch[] deserializeSketches(byte[][] serializedSketches) {
+ LongsSketch[] sketches = new LongsSketch[serializedSketches.length];
+ for (int i = 0; i < serializedSketches.length; i++) {
+ sketches[i] =
LongsSketch.getInstance(Memory.wrap(serializedSketches[i]));
+ }
+ return sketches;
+ }
+
+ @Override
+ public LongsSketch extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ return aggregationResultHolder.getResult();
+ }
+
+ @Override
+ public LongsSketch extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ return groupByResultHolder.getResult(groupKey);
+ }
+
+ @Override
+ public LongsSketch merge(LongsSketch sketch1, LongsSketch sketch2) {
+ LongsSketch union = new LongsSketch(_maxMapSize);
+ if (sketch1 != null) {
+ union.merge(sketch1);
+ }
+ if (sketch2 != null) {
+ union.merge(sketch2);
+ }
+ return union;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+ return DataSchema.ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getFinalResultColumnType() {
+ return DataSchema.ColumnDataType.STRING;
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return AggregationFunctionType.FREQUENTLONGSSKETCH.getName().toLowerCase()
+ + "(" + _expression + ")";
+ }
+
+ @Override
+ public Comparable<?> extractFinalResult(LongsSketch sketch) {
+ return new SerializedFrequentLongsSketch(sketch);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentStringsSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentStringsSketchAggregationFunction.java
new file mode 100644
index 0000000000..2424f223cf
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentStringsSketchAggregationFunction.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import
org.apache.pinot.segment.local.customobject.SerializedFrequentStringsSketch;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * <p>
+ * {@code FrequentStringsSketchAggregationFunction} provides an approximate
FrequentItems aggregation function based on
+ * <a
href="https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html">Apache
DataSketches library</a>.
+ * It is memory efficient compared to exact counting.
+ * </p>
+ * <p>
+ * The function takes a STRING column as input and returns a Base64 encoded
sketch object which can be
+ * deserialized and used to estimate the frequency of items in the dataset
(how many times they appear).
+ * </p>
+ * <p><b>FREQUENT_STRINGS_SKETCH(col, maxMapSize=256)</b></p>
+ * <p>E.g.:</p>
+ * <ul>
+ * <li><b>FREQUENT_STRINGS_SKETCH(col)</b></li>
+ * <li><b>FREQUENT_STRINGS_SKETCH(col, 1024)</b></li>
+ * </ul>
+ *
+ * <p>
+ * If the column type is BYTES, the aggregation function will assume it is a
serialized FrequentItems data sketch
+ * of type `ItemSketch<String>`and will attempt to deserialize it for
merging with other sketch objects.
+ * </p>
+ *
+ * <p>
+ * Second argument, maxMapsSize, refers to the size of the physical length
of the hashmap which stores counts. It
+ * influences the accuracy of the sketch and should be a power of 2.
+ * </p>
+ *
+ * <p>
+ * There is a variation of the function (<b>FREQUENT_LONGS_SKETCH</b>) which
accept INT and LONG type input columns.
+ * </p>
+ */
+public class FrequentStringsSketchAggregationFunction
+ extends BaseSingleInputAggregationFunction<ItemsSketch<String>,
Comparable<?>> {
+ protected static final int DEFAULT_MAX_MAP_SIZE = 256;
+
+ protected int _maxMapSize;
+
+ public FrequentStringsSketchAggregationFunction(List<ExpressionContext>
arguments) {
+ super(arguments.get(0));
+ int numArguments = arguments.size();
+ Preconditions.checkArgument(numArguments == 1 || numArguments == 2,
+ "Expecting 1 or 2 arguments for FrequentItemsSketch function:
FREQUENTSTRINGSSKETCH(column, maxMapSize");
+ _maxMapSize = numArguments == 2 ?
arguments.get(1).getLiteral().getIntValue() : DEFAULT_MAX_MAP_SIZE;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.FREQUENTSTRINGSSKETCH;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ FieldSpec.DataType valueType = valueSet.getValueType();
+
+ ItemsSketch<String> sketch = getOrCreateSketch(aggregationResultHolder);
+
+ if (valueType == FieldSpec.DataType.BYTES) {
+ // Assuming the column contains serialized data sketch
+ ItemsSketch<String>[] deserializedSketches =
+
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ sketch = getOrCreateSketch(aggregationResultHolder);
+
+ for (ItemsSketch<String> colSketch : deserializedSketches) {
+ sketch.merge(colSketch);
+ }
+ } else {
+ for (String val : valueSet.getStringValuesSV()) {
+ sketch.update(val);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ FieldSpec.DataType valueType = valueSet.getValueType();
+
+ if (valueType == FieldSpec.DataType.BYTES) {
+ // serialized sketch
+ ItemsSketch<String>[] deserializedSketches =
+
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ for (int i = 0; i < length; i++) {
+ ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder,
groupKeyArray[i]);
+ sketch.merge(deserializedSketches[i]);
+ }
+ } else {
+ String[] values = valueSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder,
groupKeyArray[i]);
+ sketch.update(values[i]);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ FieldSpec.DataType valueType = valueSet.getValueType();
+
+ if (valueType == FieldSpec.DataType.BYTES) {
+ // serialized sketch
+ ItemsSketch<String>[] deserializedSketches =
+
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder,
groupKey);
+ sketch.merge(deserializedSketches[i]);
+ }
+ }
+ } else {
+ String[] values = valueSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder,
groupKey);
+ sketch.update(values[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * Extracts the sketch from the result holder or creates a new one if it
does not exist.
+ */
+ protected ItemsSketch<String> getOrCreateSketch(AggregationResultHolder
aggregationResultHolder) {
+ ItemsSketch<String> sketch = aggregationResultHolder.getResult();
+ if (sketch == null) {
+ sketch = new ItemsSketch<>(_maxMapSize);
+ aggregationResultHolder.setValue(sketch);
+ }
+ return sketch;
+ }
+
+ /**
+ * Extracts the sketch from the group by result holder for key
+ * or creates a new one if it does not exist.
+ */
+ protected ItemsSketch<String> getOrCreateSketch(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ ItemsSketch<String> sketch = groupByResultHolder.getResult(groupKey);
+ if (sketch == null) {
+ sketch = new ItemsSketch<>(_maxMapSize);
+ groupByResultHolder.setValueForKey(groupKey, sketch);
+ }
+ return sketch;
+ }
+
+ /**
+ * Deserializes the sketches from the bytes.
+ */
+ protected ItemsSketch<String>[] deserializeSketches(byte[][]
serializedSketches) {
+ ItemsSketch<String>[] sketches = new
ItemsSketch[serializedSketches.length];
+ for (int i = 0; i < serializedSketches.length; i++) {
+ sketches[i] =
ItemsSketch.getInstance(Memory.wrap(serializedSketches[i]), new
ArrayOfStringsSerDe());
+ }
+ return sketches;
+ }
+
+ @Override
+ public ItemsSketch<String> extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ return aggregationResultHolder.getResult();
+ }
+
+ @Override
+ public ItemsSketch<String> extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ return groupByResultHolder.getResult(groupKey);
+ }
+
+ @Override
+ public ItemsSketch<String> merge(ItemsSketch<String> sketch1,
ItemsSketch<String> sketch2) {
+ ItemsSketch<String> union = new ItemsSketch<>(_maxMapSize);
+ if (sketch1 != null) {
+ union.merge(sketch1);
+ }
+ if (sketch2 != null) {
+ union.merge(sketch2);
+ }
+ return union;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+ return DataSchema.ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getFinalResultColumnType() {
+ return DataSchema.ColumnDataType.STRING;
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return
AggregationFunctionType.FREQUENTSTRINGSSKETCH.getName().toLowerCase()
+ + "(" + _expression + ")";
+ }
+
+ @Override
+ public Comparable<?> extractFinalResult(ItemsSketch<String> sketch) {
+ return new SerializedFrequentStringsSketch(sketch);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/FrequentItemsSketchQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/FrequentItemsSketchQueriesTest.java
new file mode 100644
index 0000000000..deee8f64a3
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/FrequentItemsSketchQueriesTest.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.frequencies.ErrorType;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.datasketches.frequencies.LongsSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Tests for FREQUENT_STRINGS_SKETCH and FREQUENT_LONGS_SKETCH aggregation
functions.
+ *
+ * <ul>
+ * <li>Generates a segment with LONG, STRING, SKETCH and a group-by
column</li>
+ * <li>Runs aggregation and group-by queries on the generated segment</li>
+ * <li>Compares the results from sketches to exact calculations via
count()</li>
+ * </ul>
+ */
+public class FrequentItemsSketchQueriesTest extends BaseQueriesTest {
+ protected static final File INDEX_DIR = new
File(FileUtils.getTempDirectory(), "FrequentItemsQueriesTest");
+ protected static final String TABLE_NAME = "testTable";
+ protected static final String SEGMENT_NAME = "testSegment";
+
+ protected static final int MAX_MAP_SIZE = 64;
+ protected static final String LONG_COLUMN = "longColumn";
+ protected static final String STRING_COLUMN = "stringColumn";
+ protected static final String STRING_SKETCH_COLUMN = "stringSketchColumn";
+ protected static final String LONG_SKETCH_COLUMN = "longSketchColumn";
+ protected static final String GROUP_BY_COLUMN = "groupByColumn";
+
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+
+ @Override
+ protected String getFilter() {
+ return ""; // No filtering required for this test.
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ buildSegment();
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+ }
+
+ protected void buildSegment() throws Exception {
+
+ // Values chosen with distinct frequencies not to create ambiguity in
testing
+ String[] strValues = new String[] {"a", "a", "a", "b", "b", "a", "d", "d",
"c", "d"};
+ Long[] longValues = new Long[] {1L, 2L, 1L, 1L, 1L, 2L, 5L, 4L, 4L, 4L};
+ String[] groups = new String[] {"g1", "g1", "g1", "g1", "g1", "g1", "g2",
"g2", "g2", "g2"};
+
+ List<GenericRow> rows = new ArrayList<>(strValues.length);
+ for (int i = 0; i < strValues.length; i++) {
+ GenericRow row = new GenericRow();
+
+ row.putValue(LONG_COLUMN, longValues[i]);
+ row.putValue(STRING_COLUMN, strValues[i]);
+
+ LongsSketch longSketch = new LongsSketch(MAX_MAP_SIZE);
+ longSketch.update(longValues[i]);
+ row.putValue(LONG_SKETCH_COLUMN, longSketch.toByteArray());
+
+ ItemsSketch<String> strSketch = new ItemsSketch<>(MAX_MAP_SIZE);
+ strSketch.update(strValues[i]);
+ row.putValue(STRING_SKETCH_COLUMN, strSketch.toByteArray(new
ArrayOfStringsSerDe()));
+
+ row.putValue(GROUP_BY_COLUMN, groups[i]);
+
+ rows.add(row);
+ }
+
+ Schema schema = new Schema();
+ schema.addField(new DimensionFieldSpec(LONG_COLUMN,
FieldSpec.DataType.LONG, true));
+ schema.addField(new DimensionFieldSpec(STRING_COLUMN,
FieldSpec.DataType.STRING, true));
+ schema.addField(new MetricFieldSpec(LONG_SKETCH_COLUMN,
FieldSpec.DataType.BYTES));
+ schema.addField(new MetricFieldSpec(STRING_SKETCH_COLUMN,
FieldSpec.DataType.BYTES));
+ schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN,
FieldSpec.DataType.STRING, true));
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ config.setOutDir(INDEX_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(SEGMENT_NAME);
+
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+ driver.init(config, recordReader);
+ driver.build();
+ }
+ }
+
+ @Test
+ public void testAggregationForStringValues() {
+ // Fetch the sketch object which collects Frequent Items
+ String query = String.format(
+ "SELECT FREQUENTSTRINGSSKETCH(%1$s) FROM %2$s", STRING_COLUMN,
TABLE_NAME);
+ AggregationOperator aggregationOperator = getOperator(query);
+ AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+ List<Object> aggregationResult = resultsBlock.getResults();
+
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 1);
+
+ // Fetch the exact list by count/group-by and compare
+ String[] exactOrdered = getExactOrderedStrings();
+ assertStringsSketch((ItemsSketch<String>) aggregationResult.get(0),
exactOrdered);
+ }
+
+ @Test
+ public void testAggregationForLongValues() {
+ // Fetch the sketch object which collects Frequent Items
+ String query = String.format(
+ "SELECT FREQUENTLONGSSKETCH(%1$s) FROM %2$s", LONG_COLUMN, TABLE_NAME);
+ AggregationOperator aggregationOperator = getOperator(query);
+ AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+ List<Object> aggregationResult = resultsBlock.getResults();
+
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 1);
+
+ // Fetch the exact list by count/group-by and compare
+ Long[] exactOrdered = getExactOrderedLongs();
+ assertLongsSketch((LongsSketch) aggregationResult.get(0), exactOrdered);
+ }
+
+ @Test
+ public void testAggregationForStringSketches() {
+ // Retrieve sketches calculated by: 1) merger of sketches, 2) from plain
values
+ String query = String.format(
+ "SELECT FREQUENTSTRINGSSKETCH(%1$s), FREQUENTSTRINGSSKETCH(%2$s) FROM
%3$s",
+ STRING_SKETCH_COLUMN, STRING_COLUMN, TABLE_NAME);
+ AggregationOperator aggregationOperator = getOperator(query);
+ AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+ List<Object> aggregationResult = resultsBlock.getResults();
+
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 2);
+
+ // Assert the sketches are equivalent
+ ItemsSketch<String> sketch1 = (ItemsSketch<String>)
aggregationResult.get(0);
+ ItemsSketch<String> sketch2 = (ItemsSketch<String>)
aggregationResult.get(1);
+ assertEquals(
+ sketch1.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES),
+ sketch2.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES));
+ }
+
+ @Test
+ public void testAggregationForLongSketches() {
+ // Retrieve sketches calculated by: 1) merger of sketches, 2) from plain
values
+ String query = String.format(
+ "SELECT FREQUENTLONGSSKETCH(%1$s), FREQUENTLONGSSKETCH(%2$s) FROM
%3$s",
+ LONG_SKETCH_COLUMN, LONG_COLUMN, TABLE_NAME);
+ AggregationOperator aggregationOperator = getOperator(query);
+ AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+ List<Object> aggregationResult = resultsBlock.getResults();
+
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 2);
+
+ // Assert the sketches are equivalent
+ LongsSketch sketch1 = (LongsSketch) aggregationResult.get(0);
+ LongsSketch sketch2 = (LongsSketch) aggregationResult.get(1);
+ assertEquals(
+ sketch1.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES),
+ sketch2.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES));
+ }
+
+ @Test
+ public void testGroupByStringSketches() {
+ // Fetch the sketch object which collects Frequent Items
+ String query = String.format(
+ "SELECT %1$s, FREQUENTSTRINGSSKETCH(%2$s) FROM %3$s GROUP BY 1",
+ GROUP_BY_COLUMN, STRING_COLUMN, TABLE_NAME);
+ BrokerResponse brokerResponse = getBrokerResponse(query);
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+
+ assertNotNull(rows);
+ assertEquals(rows.size(), 2); // should be 2 groups
+
+ // Fetch the exact list by count/group-by and compare
+ Map<String, ArrayList<String>> exactOrdered =
getExactOrderedStringGroups();
+ for (Object[] row: rows) {
+ String group = (String) row[0];
+ ItemsSketch<String> sketch = decodeStringsSketch((String) row[1]);
+ List<String> exactOrder = exactOrdered.get(group);
+ assertStringsSketch(sketch, exactOrder);
+ }
+ }
+
+ @Test
+ public void testGroupByLongSketches() {
+ // Fetch the sketch object which collects Frequent Items
+ String query = String.format(
+ "SELECT %1$s, FREQUENTLONGSSKETCH(%2$s) FROM %3$s GROUP BY 1",
+ GROUP_BY_COLUMN, LONG_COLUMN, TABLE_NAME);
+ BrokerResponse brokerResponse = getBrokerResponse(query);
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+
+ assertNotNull(rows);
+ assertEquals(rows.size(), 2); // should be 2 groups
+
+ // Fetch the exact list by count/group-by and compare
+ Map<String, ArrayList<Long>> exactOrdered = getExactOrderedLongGroups();
+ for (Object[] row: rows) {
+ String group = (String) row[0];
+ LongsSketch sketch = decodeLongsSketch((String) row[1]);
+ List<Long> exactOrder = exactOrdered.get(group);
+ assertLongsSketch(sketch, exactOrder);
+ }
+ }
+
+
+ private String[] getExactOrderedStrings() {
+ Object[] objects = getExactOrderForColumn(STRING_COLUMN);
+ return Arrays.copyOf(objects, objects.length, String[].class);
+ }
+
+ private Long[] getExactOrderedLongs() {
+ Object[] objects = getExactOrderForColumn(LONG_COLUMN);
+ return Arrays.copyOf(objects, objects.length, Long[].class);
+ }
+
+ private Object[] getExactOrderForColumn(String col) {
+ String query = String.format(
+ "SELECT %1$s, COUNT(1) FROM %2$s GROUP BY 1 ORDER BY 2 DESC", col,
TABLE_NAME);
+ BrokerResponse resp = getBrokerResponse(query);
+ ResultTable results = resp.getResultTable();
+ List<Object[]> rows = results.getRows();
+ return rows.stream().map((Object[] row) -> row[0]).toArray();
+ }
+
+ private Object[] getExactOrderForColumn2(String query) {
+ BrokerResponse resp = getBrokerResponse(query);
+ ResultTable results = resp.getResultTable();
+ List<Object[]> rows = results.getRows();
+ return rows.stream().map((Object[] row) -> row[0]).toArray();
+ }
+
+ private Map<String, ArrayList<String>> getExactOrderedStringGroups() {
+ String query = String.format(
+ "SELECT %1$s, %2$s, COUNT(1) FROM %3$s GROUP BY 1,2 ORDER BY 3 DESC",
+ GROUP_BY_COLUMN, STRING_COLUMN, TABLE_NAME);
+ BrokerResponse resp = getBrokerResponse(query);
+ ResultTable results = resp.getResultTable();
+ List<Object[]> rows = results.getRows();
+ Map<String, ArrayList<String>> order = new HashMap<>();
+ for (Object[] row: rows) {
+ String group = (String) row[0];
+ if (!order.containsKey(group)) {
+ order.put(group, new ArrayList<>());
+ }
+ order.get(group).add((String) row[1]);
+ }
+ return order;
+ }
+
+ private Map<String, ArrayList<Long>> getExactOrderedLongGroups() {
+ String query = String.format(
+ "SELECT %1$s, %2$s, COUNT(1) FROM %3$s GROUP BY 1,2 ORDER BY 3 DESC",
+ GROUP_BY_COLUMN, LONG_COLUMN, TABLE_NAME);
+ BrokerResponse resp = getBrokerResponse(query);
+ ResultTable results = resp.getResultTable();
+ List<Object[]> rows = results.getRows();
+ Map<String, ArrayList<Long>> order = new HashMap<>();
+ for (Object[] row: rows) {
+ String group = (String) row[0];
+ if (!order.containsKey(group)) {
+ order.put(group, new ArrayList<>());
+ }
+ order.get(group).add((Long) row[1]);
+ }
+ return order;
+ }
+
+ private void assertStringsSketch(ItemsSketch<String> sketch, List<String>
exact) {
+ String[] arr = new String[exact.size()];
+ exact.toArray(arr);
+ assertStringsSketch(sketch, arr);
+ }
+
+ private void assertStringsSketch(ItemsSketch<String> sketch, String[] exact)
{
+ ItemsSketch.Row[] items =
sketch.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES);
+ assertEquals(exact.length, items.length);
+ for (int i = 0; i < exact.length; i++) {
+ assertEquals((String) items[i].getItem(), exact[i]);
+ }
+ }
+
+ private void assertLongsSketch(LongsSketch sketch, List<Long> exact) {
+ Long[] arr = new Long[exact.size()];
+ exact.toArray(arr);
+ assertLongsSketch(sketch, arr);
+ }
+
+ private void assertLongsSketch(LongsSketch sketch, Long[] exact) {
+ LongsSketch.Row[] items =
sketch.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES);
+ assertEquals(exact.length, items.length);
+ for (int i = 0; i < exact.length; i++) {
+ assertEquals((Long) items[i].getItem(), exact[i]);
+ }
+ }
+
+ private ItemsSketch<String> decodeStringsSketch(String encodedSketch) {
+ byte[] byteArr = Base64.getDecoder().decode(encodedSketch);
+ return ItemsSketch.getInstance(Memory.wrap(byteArr), new
ArrayOfStringsSerDe());
+ }
+
+ private LongsSketch decodeLongsSketch(String encodedSketch) {
+ byte[] byteArr = Base64.getDecoder().decode(encodedSketch);
+ return LongsSketch.getInstance(Memory.wrap(byteArr));
+ }
+
+ @AfterClass
+ public void tearDown() {
+ _indexSegment.destroy();
+ FileUtils.deleteQuietly(INDEX_DIR);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentLongsSketch.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentLongsSketch.java
new file mode 100644
index 0000000000..53124e473b
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentLongsSketch.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.Base64;
+import javax.validation.constraints.NotNull;
+import org.apache.datasketches.frequencies.LongsSketch;
+
+
+public class SerializedFrequentLongsSketch implements Comparable<LongsSketch> {
+ private final LongsSketch _sketch;
+
+ public SerializedFrequentLongsSketch(LongsSketch sketch) {
+ _sketch = sketch;
+ }
+
+ @Override
+ public int compareTo(@NotNull LongsSketch other) {
+ // There is no well-defined ordering for these sketches
+ // numActiveItems is just a placeholder, which can be changed later
+ return _sketch.getNumActiveItems() - other.getNumActiveItems();
+ }
+
+ @Override
+ public String toString() {
+ return Base64.getEncoder().encodeToString(_sketch.toByteArray());
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentStringsSketch.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentStringsSketch.java
new file mode 100644
index 0000000000..40f89bc83d
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentStringsSketch.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.Base64;
+import javax.validation.constraints.NotNull;
+import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.frequencies.ItemsSketch;
+
+public class SerializedFrequentStringsSketch implements
Comparable<ItemsSketch<String>> {
+ private final ItemsSketch<String> _sketch;
+
+ public SerializedFrequentStringsSketch(ItemsSketch<String> sketch) {
+ _sketch = sketch;
+ }
+
+ @Override
+ public int compareTo(@NotNull ItemsSketch<String> other) {
+ // There is no well-defined ordering for these sketches
+ // numActiveItems is just a placeholder, which can be changed later
+ return _sketch.getNumActiveItems() - other.getNumActiveItems();
+ }
+
+ @Override
+ public String toString() {
+ return Base64.getEncoder().encodeToString(_sketch.toByteArray(new
ArrayOfStringsSerDe()));
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 4ebc95926c..4ac3b32af9 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -190,6 +190,9 @@ public enum AggregationFunctionType {
SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.BINARY, ReturnTypes.BIGINT,
ReturnTypes.explicit(SqlTypeName.OTHER)),
+ FREQUENTSTRINGSSKETCH("frequentStringsSketch"),
+ FREQUENTLONGSSKETCH("frequentLongsSketch"),
+
// Geo aggregation functions
STUNION("STUnion", ImmutableList.of("ST_UNION"), SqlKind.OTHER_FUNCTION,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
OperandTypes.BINARY, ReturnTypes.explicit(SqlTypeName.VARBINARY),
ReturnTypes.explicit(SqlTypeName.OTHER)),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]