swaminathanmanish commented on code in PR #12042:
URL: https://github.com/apache/pinot/pull/12042#discussion_r1408231135


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java:
##########
@@ -243,9 +243,14 @@ public TDigest deserialize(ByteBuffer byteBuffer) {
 
     @Override
     public byte[] serialize(Sketch value) {
-      // NOTE: Compact the sketch in unsorted, on-heap fashion for performance 
concern.
-      //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html 
for more details.
-      return value.compact(false, null).toByteArray();
+      // The serializer should respect existing ordering to enable "early stop"
+      // optimisations on unions.
+      boolean shouldCompact = !value.isCompact();
+      boolean shouldOrder = value.isOrdered();
+      if (shouldCompact) {

Review Comment:
   I assume shouldCompact defaults to true?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java:
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import javax.annotation.Nonnull;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+
+
+/**
+ * Intermediate state used by {@code 
DistinctCountThetaSketchAggregationFunction} which gives
+ * the end user more control over how sketches are merged for performance.
+ * The end user can set parameters that trade-off more memory usage for more 
pre-aggregation.
+ * This permits use of the Union "early-stop" optimisation where ordered 
sketches require no further
+ * processing beyond the minimum Theta value.
+ * The union operation initialises an empty "gadget" bookkeeping sketch that 
is updated with hashed entries
+ * that fall below the minimum Theta value for all input sketches ("Broder 
Rule").  When the initial
+ * Theta value is set to the minimum immediately, further gains can be 
realised.
+ */
+public class ThetaSketchAccumulator {
+  private ArrayList<Sketch> _accumulator;
+  private boolean _ordered = false;
+  private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+  private Union _union;
+  private int _threshold;
+  private int _numInputs = 0;
+
+  public ThetaSketchAccumulator() {
+  }
+
+  // Note: The accumulator is serialized as a sketch.  This means that the 
majority of the processing
+  // happens on serialization. Therefore, when deserialized, the values may be 
null and will
+  // require re-initialisation. Since the primary use case is at query time 
for the Broker
+  // and Server, these properties are already in memory and are re-set.
+  public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, 
boolean ordered, int threshold) {
+    _setOperationBuilder = setOperationBuilder;
+    _ordered = ordered;
+    _threshold = threshold;
+  }
+
+  public void setOrdered(boolean ordered) {
+    _ordered = ordered;
+  }
+
+  public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
+    _setOperationBuilder = setOperationBuilder;
+  }
+
+  public void setThreshold(int threshold) {
+    _threshold = threshold;
+  }
+
+  public boolean isEmpty() {
+    return _numInputs == 0;
+  }
+
+  @Nonnull
+  public Sketch getResult() {
+    return unionAll();
+  }
+
+  public void apply(Sketch sketch) {
+    internalAdd(sketch);
+  }
+
+  public void merge(ThetaSketchAccumulator thetaUnion) {
+    if (thetaUnion.isEmpty()) {
+      return;
+    }
+    Sketch sketch = thetaUnion.getResult();
+    internalAdd(sketch);
+  }
+
+  private void internalAdd(Sketch sketch) {
+    if (sketch.isEmpty()) {
+      return;
+    }
+    if (_accumulator == null) {
+      _accumulator = new ArrayList<>(_threshold);
+    }
+    _accumulator.add(sketch);
+    _numInputs += 1;
+
+    if (_accumulator.size() >= _threshold) {
+      unionAll();
+    }
+  }
+
+  private Sketch unionAll() {
+    if (_union == null) {
+      _union = _setOperationBuilder.buildUnion();
+    }
+    // Return the default update "gadget" sketch as a compact sketch
+    if (isEmpty()) {
+      return _union.getResult(_ordered, null);
+    }
+    // Corner-case: the parameters are not strictly respected when there is a 
single sketch.
+    // This single sketch might have been the result of a previously 
accumulated union and
+    // would already have the parameters set.  The sketch is returned as-is 
without adjusting
+    // ordering and nominal entries which requires an additional union 
operation.
+    if (_numInputs == 1) {
+      return _accumulator.get(0);
+    }
+
+    // Performance optimisation: ensure that the minimum Theta is used for 
"early-stop".
+    _accumulator.sort(Comparator.comparingDouble(Sketch::getTheta));

Review Comment:
   Optimization here is to stop the sort based on theta threshold ?  Just 
trying to understand how early stop takes effect here.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java:
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import javax.annotation.Nonnull;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+
+
+/**
+ * Intermediate state used by {@code 
DistinctCountThetaSketchAggregationFunction} which gives
+ * the end user more control over how sketches are merged for performance.
+ * The end user can set parameters that trade-off more memory usage for more 
pre-aggregation.
+ * This permits use of the Union "early-stop" optimisation where ordered 
sketches require no further
+ * processing beyond the minimum Theta value.
+ * The union operation initialises an empty "gadget" bookkeeping sketch that 
is updated with hashed entries
+ * that fall below the minimum Theta value for all input sketches ("Broder 
Rule").  When the initial
+ * Theta value is set to the minimum immediately, further gains can be 
realised.
+ */
+public class ThetaSketchAccumulator {
+  private ArrayList<Sketch> _accumulator;
+  private boolean _ordered = false;
+  private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+  private Union _union;
+  private int _threshold;
+  private int _numInputs = 0;
+
+  public ThetaSketchAccumulator() {
+  }
+
+  // Note: The accumulator is serialized as a sketch.  This means that the 
majority of the processing
+  // happens on serialization. Therefore, when deserialized, the values may be 
null and will
+  // require re-initialisation. Since the primary use case is at query time 
for the Broker
+  // and Server, these properties are already in memory and are re-set.
+  public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, 
boolean ordered, int threshold) {
+    _setOperationBuilder = setOperationBuilder;
+    _ordered = ordered;
+    _threshold = threshold;
+  }
+
+  public void setOrdered(boolean ordered) {
+    _ordered = ordered;
+  }
+
+  public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
+    _setOperationBuilder = setOperationBuilder;
+  }
+
+  public void setThreshold(int threshold) {
+    _threshold = threshold;
+  }
+
+  public boolean isEmpty() {
+    return _numInputs == 0;
+  }
+
+  @Nonnull
+  public Sketch getResult() {
+    return unionAll();
+  }
+
+  public void apply(Sketch sketch) {
+    internalAdd(sketch);
+  }
+
+  public void merge(ThetaSketchAccumulator thetaUnion) {
+    if (thetaUnion.isEmpty()) {
+      return;
+    }
+    Sketch sketch = thetaUnion.getResult();
+    internalAdd(sketch);
+  }
+
+  private void internalAdd(Sketch sketch) {
+    if (sketch.isEmpty()) {
+      return;
+    }
+    if (_accumulator == null) {
+      _accumulator = new ArrayList<>(_threshold);
+    }
+    _accumulator.add(sketch);
+    _numInputs += 1;
+
+    if (_accumulator.size() >= _threshold) {

Review Comment:
   The main optimization here is to batch the sketch unions instead of doing 
many unions, to amortize the sort-merge cost, right? And thats where we tune 
this threshold parameter.



##########
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java:
##########
@@ -261,6 +261,11 @@ public static Sketch thetaSketchDiff(Object sketchObjectA, 
Object sketchObjectB)
     return diff.getResult(false, null, false);
   }
 
+  @ScalarFunction(names = {"thetaSketchToString", "theta_sketch_to_string"})

Review Comment:
   Are these new functions added to take in the new parameters?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java:
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import javax.annotation.Nonnull;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+
+
+/**
+ * Intermediate state used by {@code 
DistinctCountThetaSketchAggregationFunction} which gives
+ * the end user more control over how sketches are merged for performance.
+ * The end user can set parameters that trade-off more memory usage for more 
pre-aggregation.
+ * This permits use of the Union "early-stop" optimisation where ordered 
sketches require no further
+ * processing beyond the minimum Theta value.
+ * The union operation initialises an empty "gadget" bookkeeping sketch that 
is updated with hashed entries
+ * that fall below the minimum Theta value for all input sketches ("Broder 
Rule").  When the initial
+ * Theta value is set to the minimum immediately, further gains can be 
realised.
+ */
+public class ThetaSketchAccumulator {
+  private ArrayList<Sketch> _accumulator;
+  private boolean _ordered = false;
+  private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+  private Union _union;
+  private int _threshold;
+  private int _numInputs = 0;
+
+  public ThetaSketchAccumulator() {
+  }
+
+  // Note: The accumulator is serialized as a sketch.  This means that the 
majority of the processing
+  // happens on serialization. Therefore, when deserialized, the values may be 
null and will
+  // require re-initialisation. Since the primary use case is at query time 
for the Broker
+  // and Server, these properties are already in memory and are re-set.
+  public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, 
boolean ordered, int threshold) {
+    _setOperationBuilder = setOperationBuilder;
+    _ordered = ordered;
+    _threshold = threshold;
+  }
+
+  public void setOrdered(boolean ordered) {
+    _ordered = ordered;
+  }
+
+  public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
+    _setOperationBuilder = setOperationBuilder;
+  }
+
+  public void setThreshold(int threshold) {
+    _threshold = threshold;
+  }
+
+  public boolean isEmpty() {
+    return _numInputs == 0;
+  }
+
+  @Nonnull
+  public Sketch getResult() {
+    return unionAll();
+  }
+
+  public void apply(Sketch sketch) {
+    internalAdd(sketch);
+  }
+
+  public void merge(ThetaSketchAccumulator thetaUnion) {
+    if (thetaUnion.isEmpty()) {
+      return;
+    }
+    Sketch sketch = thetaUnion.getResult();
+    internalAdd(sketch);
+  }
+
+  private void internalAdd(Sketch sketch) {
+    if (sketch.isEmpty()) {
+      return;
+    }
+    if (_accumulator == null) {
+      _accumulator = new ArrayList<>(_threshold);
+    }
+    _accumulator.add(sketch);
+    _numInputs += 1;
+
+    if (_accumulator.size() >= _threshold) {

Review Comment:
   is there a final catch-all to do the union (last batch), even if the 
threshold is not met? I guess it happens via 
   
     public Sketch getResult() {
       return unionAll();
     }



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java:
##########
@@ -102,9 +111,22 @@ public 
DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> argum
       Preconditions.checkArgument(paramsExpression.getType() == 
ExpressionContext.Type.LITERAL,
           "Second argument of DISTINCT_COUNT_THETA_SKETCH aggregation function 
must be literal (parameters)");
       Parameters parameters = new 
Parameters(paramsExpression.getLiteral().getStringValue());
+      // Allows the user to trade-off memory usage for merge CPU; higher 
values use more memory
+      _accumulatorThreshold = parameters.getAccumulatorThreshold();
+      // Ordering controls whether intermediate compact sketches are ordered 
in set operations
+      _intermediateOrdering = parameters.getIntermediateOrdering();
+      // Nominal entries controls sketch accuracy and size
       int nominalEntries = parameters.getNominalEntries();
       _updateSketchBuilder.setNominalEntries(nominalEntries);
       _setOperationBuilder.setNominalEntries(nominalEntries);
+      // Sampling probability sets the initial value of Theta, defaults to 1.0
+      float p = parameters.getSamplingProbability();
+      _setOperationBuilder.setP(p);
+      _updateSketchBuilder.setP(p);
+      // Resize factor controls the size multiple that affects how fast the 
internal cache grows
+      ResizeFactor rf = parameters.getResizeFactor();

Review Comment:
   Curious how this is tuned and impacts performance ? 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java:
##########
@@ -47,11 +49,18 @@ public ColumnDataType getFinalResultColumnType() {
   }
 
   @Override
-  public String extractFinalResult(List<Sketch> sketches) {
-    Sketch sketch = evaluatePostAggregationExpression(sketches);
+  public String extractFinalResult(List<ThetaSketchAccumulator> accumulators) {
+    int numAccumulators = accumulators.size();
+    List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
 
-    // NOTE: Compact the sketch in unsorted, on-heap fashion for performance 
concern.
-    //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html for 
more details.
-    return Base64.getEncoder().encodeToString(sketch.compact(false, 
null).toByteArray());
+    for (ThetaSketchAccumulator accumulator : accumulators) {
+      accumulator.setOrdered(_intermediateOrdering);
+      accumulator.setThreshold(_accumulatorThreshold);
+      accumulator.setSetOperationBuilder(_setOperationBuilder);
+      mergedSketches.add(accumulator.getResult());

Review Comment:
   What would happen in the default case ? I assume it would just do union a 
for every sketch pair, instead of the intermediate batching (default threshold 
= 2).



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java:
##########
@@ -102,9 +111,22 @@ public 
DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> argum
       Preconditions.checkArgument(paramsExpression.getType() == 
ExpressionContext.Type.LITERAL,
           "Second argument of DISTINCT_COUNT_THETA_SKETCH aggregation function 
must be literal (parameters)");
       Parameters parameters = new 
Parameters(paramsExpression.getLiteral().getStringValue());
+      // Allows the user to trade-off memory usage for merge CPU; higher 
values use more memory
+      _accumulatorThreshold = parameters.getAccumulatorThreshold();
+      // Ordering controls whether intermediate compact sketches are ordered 
in set operations
+      _intermediateOrdering = parameters.getIntermediateOrdering();
+      // Nominal entries controls sketch accuracy and size
       int nominalEntries = parameters.getNominalEntries();
       _updateSketchBuilder.setNominalEntries(nominalEntries);
       _setOperationBuilder.setNominalEntries(nominalEntries);
+      // Sampling probability sets the initial value of Theta, defaults to 1.0

Review Comment:
   Is this sampling to skip entries in a sketch, while doing the sort/merge? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to