swaminathanmanish commented on code in PR #10427:
URL: https://github.com/apache/pinot/pull/10427#discussion_r1139067772


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -96,6 +96,9 @@ public static class Helix {
     // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
     public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
 
+
+    public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;

Review Comment:
   Any references that can help explain this value? 



##########
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java:
##########
@@ -131,4 +133,49 @@ public static byte[] toHLL(@Nullable Object input, int 
log2m) {
     }
     return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
   }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be 
null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along 
with the key, may be null to return an
+   *              empty sketch
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable 
Integer value) {
+    return toIntegerSumTupleSketch(key, value, 
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+  }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be 
null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along 
with the key, may be null to return an
+   *              empty sketch
+   * @param lgK integer representing the log of the maximum number of retained 
entries in the sketch, between 4 and 26
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer 
value, int lgK) {
+    IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+    if (value != null) {
+      if (key instanceof Integer) {
+        is.update((Integer) key, value);
+      } else if (key instanceof Long) {
+        is.update((Long) key, value);
+      } else if (key instanceof Float) {
+        is.update((float) key, value);
+      } else if (key instanceof Double) {
+        is.update((double) key, value);
+      } else if (key instanceof BigDecimal) {
+        is.update(((BigDecimal) key).toString(), value);
+      } else if (key instanceof String) {
+        is.update((String) key, value);
+      } else if (key instanceof byte[]) {
+        is.update((byte[]) key, value);
+      }

Review Comment:
   In case you want to validate/catch invalid types, consider throwing an 
IllegalStateException/illegalArg exception ?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+public class IntegerTupleSketchValueAggregator implements 
ValueAggregator<byte[], Sketch<IntegerSummary>> {

Review Comment:
   Can the raw type (R) be Sketch<IntegerSummary>, instead of byte[] here ? 
Looking at the other sketch implementation 
(DistinctCountThetaSketchValueAggregator), which has Object as the raw type, I 
just wanted to check. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java:
##########
@@ -299,13 +299,21 @@ public static AggregationFunction 
getAggregationFunction(FunctionContext functio
             return new FourthMomentAggregationFunction(firstArgument, 
FourthMomentAggregationFunction.Type.KURTOSIS);
           case FOURTHMOMENT:
             return new FourthMomentAggregationFunction(firstArgument, 
FourthMomentAggregationFunction.Type.MOMENT);
+          case DISTINCTCOUNTTUPLESKETCH:
+            // mode actually doesn't matter here because we only care about 
keys, not values
+            return new 
DistinctCountIntegerTupleSketchAggregationFunction(arguments, 
IntegerSummary.Mode.Sum);

Review Comment:
   is there a reason why we pass IntegerSummary.Mode.Sum as a parameter ? We 
are already differentiating based on the aggregation implementations 
IntegerTupleSketchAggregationFunction vs 
AvgIntegerTupleSketchAggregationFunction vs 
SumValuesIntegerTupleSketchAggregationFunction 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java:
##########
@@ -66,6 +67,11 @@ public static ValueAggregator 
getValueAggregator(AggregationFunctionType aggrega
       case DISTINCTCOUNTTHETASKETCH:
       case DISTINCTCOUNTRAWTHETASKETCH:
         return new DistinctCountThetaSketchValueAggregator();
+      case DISTINCTCOUNTTUPLESKETCH:
+      case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+      case AVGVALUEINTEGERSUMTUPLESKETCH:
+      case SUMVALUESINTEGERSUMTUPLESKETCH:
+        return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);

Review Comment:
   Thanks! That makes sense. Other functions are on top of sum. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches 
currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends 
BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, 
Comparable> {
+  ExpressionContext _expressionContext;
+  IntegerSummarySetOperations _setOps;
+  int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> 
arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+    _expressionContext = arguments.get(0);
+    _setOps = new IntegerSummarySetOperations(mode, mode);
+    _entries = 4096;

Review Comment:
   assignment not needed ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to