davecromberge commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139439869
##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -213,6 +216,8 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.VarianceTuple;
} else if (value instanceof PinotFourthMoment) {
return ObjectType.PinotFourthMoment;
+ } else if (value instanceof org.apache.datasketches.tuple.Sketch) {
+ return ObjectType.IntegerTupleSketch;
Review Comment:
Is this a safe assumption? Is it also necessary to inspect the summary type
to verify integer?
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends
IntegerTupleSketchAggregationFunction {
+
+ public
SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext>
arguments, IntegerSummary.Mode mode) {
+ super(arguments, mode);
+ }
+
+ // TODO if extra aggregation modes are supported, make this switch
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>>
integerSummarySketches) {
+ Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+ integerSummarySketches.forEach(union::union);
+ double retainedTotal = 0L;
+ SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+ while (summaries.next()) {
+ retainedTotal += summaries.getSummary().getValue();
+ }
+ double estimate = retainedTotal / union.getResult().getRetainedEntries() *
union.getResult().getEstimate();
Review Comment:
You could multiply by theta instead which should be equivalent to the number
of retained entries divided by the estimate.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgIntegerTupleSketchAggregationFunction
+ extends IntegerTupleSketchAggregationFunction {
+
+ public AvgIntegerTupleSketchAggregationFunction(List<ExpressionContext>
arguments, IntegerSummary.Mode mode) {
+ super(arguments, mode);
+ }
+
+ // TODO if extra aggregation modes are supported, make this switch
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>>
integerSummarySketches) {
+ Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+ integerSummarySketches.forEach(union::union);
+ double retainedTotal = 0L;
+ SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+ while (summaries.next()) {
+ retainedTotal += summaries.getSummary().getValue();
+ }
+ double estimate = retainedTotal / union.getResult().getRetainedEntries();
+ return Double.valueOf(estimate).longValue();
Review Comment:
Does calling getResult() on the union multiple times recompute the result?
##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -918,6 +923,28 @@ public Sketch deserialize(ByteBuffer byteBuffer) {
}
};
+ public static final
ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>
DATA_SKETCH_INT_TUPLE_SER_DE =
+ new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() {
+ @Override
+ public byte[]
serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) {
+ return value.compact().toByteArray();
+ }
+
+ @Override
+ public org.apache.datasketches.tuple.Sketch<IntegerSummary>
deserialize(byte[] bytes) {
+ return
org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+ new IntegerSummaryDeserializer());
+ }
+
+ @Override
+ public org.apache.datasketches.tuple.Sketch<IntegerSummary>
deserialize(ByteBuffer byteBuffer) {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return
org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+ new IntegerSummaryDeserializer());
Review Comment:
The Datasketches Memory can also wrap a byte buffer directly.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends
IntegerTupleSketchAggregationFunction {
+
+ public
SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext>
arguments, IntegerSummary.Mode mode) {
+ super(arguments, mode);
+ }
+
+ // TODO if extra aggregation modes are supported, make this switch
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>>
integerSummarySketches) {
+ Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+ integerSummarySketches.forEach(union::union);
+ double retainedTotal = 0L;
+ SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+ while (summaries.next()) {
+ retainedTotal += summaries.getSummary().getValue();
+ }
+ double estimate = retainedTotal / union.getResult().getRetainedEntries() *
union.getResult().getEstimate();
+ return Double.valueOf(estimate).longValue();
+ }
Review Comment:
Does the serde always deserialise bytes to a compact sketch? It could be
better to use the base `Sketch` abstraction for cases where the sketches have
been created outside the system and not compacted.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -299,13 +299,21 @@ public static AggregationFunction
getAggregationFunction(FunctionContext functio
return new FourthMomentAggregationFunction(firstArgument,
FourthMomentAggregationFunction.Type.KURTOSIS);
case FOURTHMOMENT:
return new FourthMomentAggregationFunction(firstArgument,
FourthMomentAggregationFunction.Type.MOMENT);
+ case DISTINCTCOUNTTUPLESKETCH:
+ // mode actually doesn't matter here because we only care about
keys, not values
+ return new
DistinctCountIntegerTupleSketchAggregationFunction(arguments,
IntegerSummary.Mode.Sum);
+ case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+ return new IntegerTupleSketchAggregationFunction(arguments,
IntegerSummary.Mode.Sum);
+ case SUMVALUESINTEGERSUMTUPLESKETCH:
+ return new
SumValuesIntegerTupleSketchAggregationFunction(arguments,
IntegerSummary.Mode.Sum);
+ case AVGVALUEINTEGERSUMTUPLESKETCH:
+ return new AvgIntegerTupleSketchAggregationFunction(arguments,
IntegerSummary.Mode.Sum);
Review Comment:
`DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH` is consistent with the current
naming scheme but it's somewhat redundant to have the `DISTINCTCOUNT` prefix
since it's clear that we are using a tuple (or theta) sketch. This is just a
subjective remark and not necessary to change it since this is consistent with
the theta naming.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgIntegerTupleSketchAggregationFunction
+ extends IntegerTupleSketchAggregationFunction {
+
+ public AvgIntegerTupleSketchAggregationFunction(List<ExpressionContext>
arguments, IntegerSummary.Mode mode) {
+ super(arguments, mode);
+ }
+
+ // TODO if extra aggregation modes are supported, make this switch
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>>
integerSummarySketches) {
+ Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+ integerSummarySketches.forEach(union::union);
+ double retainedTotal = 0L;
+ SketchIterator<IntegerSummary> summaries = union.getResult().iterator();
+ while (summaries.next()) {
+ retainedTotal += summaries.getSummary().getValue();
+ }
+ double estimate = retainedTotal / union.getResult().getRetainedEntries();
Review Comment:
It could be a good idea to first check if the the union is empty, otherwise
you may encounter division by zero.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches
currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+ extends
BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>,
Comparable> {
+ ExpressionContext _expressionContext;
+ IntegerSummarySetOperations _setOps;
+ int _entries;
Review Comment:
Would these be final if they are only set in the constructor?
##########
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java:
##########
@@ -131,4 +133,49 @@ public static byte[] toHLL(@Nullable Object input, int
log2m) {
}
return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
}
+
+ /**
+ * Create a Tuple Sketch containing the key and value supplied
+ *
+ * @param key an Object we want to insert as the key of the sketch, may be
null to return an empty sketch
+ * @param value an Integer we want to associate as the value to go along
with the key, may be null to return an
+ * empty sketch
+ * @return serialized tuple sketch as bytes
+ */
+ @ScalarFunction(nullableParameters = true)
+ public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable
Integer value) {
+ return toIntegerSumTupleSketch(key, value,
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+ }
+
+ /**
+ * Create a Tuple Sketch containing the key and value supplied
+ *
+ * @param key an Object we want to insert as the key of the sketch, may be
null to return an empty sketch
+ * @param value an Integer we want to associate as the value to go along
with the key, may be null to return an
+ * empty sketch
+ * @param lgK integer representing the log of the maximum number of retained
entries in the sketch, between 4 and 26
+ * @return serialized tuple sketch as bytes
+ */
+ @ScalarFunction(nullableParameters = true)
+ public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer
value, int lgK) {
+ IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+ if (value != null) {
+ if (key instanceof Integer) {
Review Comment:
The tradeoff with this decision is that we may lose the "key" from a
distinct count for a null value, but I don't see a sensible way around this
beside defaulting to some sentinel which may not be applicable to all use cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]