This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e12aab464d Extend Merge Rollup Capabilities for Datasketches (#14625)
e12aab464d is described below

commit e12aab464d1daad54e0c3017f1e704f34056a751
Author: David Cromberge <[email protected]>
AuthorDate: Sat Jan 4 22:58:17 2025 +0200

    Extend Merge Rollup Capabilities for Datasketches (#14625)
---
 .../DistinctCountCPCSketchAggregator.java          | 24 +++---
 .../DistinctCountThetaSketchAggregator.java        | 19 +++--
 .../aggregator/IntegerTupleSketchAggregator.java   | 13 +--
 .../DistinctCountCPCSketchAggregatorTest.java      | 72 ++++++++++++++++
 .../DistinctCountThetaSketchAggregatorTest.java    | 98 ++++++++++++++++++++++
 .../IntegerTupleSketchAggregatorTest.java          | 85 +++++++++++++++++++
 .../org/apache/pinot/segment/spi/Constants.java    |  1 +
 7 files changed, 288 insertions(+), 24 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
index 73985f564d..b708305de4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.datasketches.cpc.CpcSketch;
 import org.apache.datasketches.cpc.CpcUnion;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
@@ -34,19 +35,18 @@ public class DistinctCountCPCSketchAggregator implements 
ValueAggregator {
   public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
     CpcSketch first = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value1);
     CpcSketch second = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value2);
-    CpcSketch result;
-    if (first == null && second == null) {
-      result = new CpcSketch(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
-    } else if (second == null) {
-      result = first;
-    } else if (first == null) {
-      result = second;
+    CpcUnion union;
+
+    String lgKParam = functionParameters.get(Constants.CPCSKETCH_LGK_KEY);
+    if (lgKParam != null) {
+      union = new CpcUnion(Integer.parseInt(lgKParam));
     } else {
-      CpcUnion union = new 
CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
-      union.update(first);
-      union.update(second);
-      result = union.getResult();
+      // If the functionParameters don't have an explicit lgK value set,
+      // use the default value for nominal entries
+      union = new CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
     }
-    return ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(result);
+    union.update(first);
+    union.update(second);
+    return 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(union.getResult());
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
index f22e38ed3c..3d00e602f0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.segment.processing.aggregator;
 
 import java.util.Map;
+import org.apache.datasketches.theta.SetOperationBuilder;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
@@ -33,20 +34,26 @@ public class DistinctCountThetaSketchAggregator implements 
ValueAggregator {
 
   @Override
   public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
-    String nominalEntriesParam = 
functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
+    SetOperationBuilder unionBuilder = Union.builder();
 
-    int sketchNominalEntries;
+    String samplingProbabilityParam = 
functionParameters.get(Constants.THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY);
+    String nominalEntriesParam = 
functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
 
-    // Check if nominal entries values match
+    // Check if nominal entries is set
     if (nominalEntriesParam != null) {
-      sketchNominalEntries = Integer.parseInt(nominalEntriesParam);
+      unionBuilder.setNominalEntries(Integer.parseInt(nominalEntriesParam));
     } else {
       // If the functionParameters don't have an explicit nominal entries 
value set,
       // use the default value for nominal entries
-      sketchNominalEntries = 
CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES;
+      
unionBuilder.setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES);
+    }
+
+    // Check if sampling probability is set
+    if (samplingProbabilityParam != null) {
+      unionBuilder.setP(Float.parseFloat(samplingProbabilityParam));
     }
 
-    Union union = 
Union.builder().setNominalEntries(sketchNominalEntries).buildUnion();
+    Union union = unionBuilder.buildUnion();
     Sketch first = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value1);
     Sketch second = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value2);
     Sketch result = union.union(first, second);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
index b7df4c05fe..9c1588c74f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
@@ -39,21 +39,22 @@ public class IntegerTupleSketchAggregator implements 
ValueAggregator {
   public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
     String nominalEntriesParam = 
functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
 
-    int sketchNominalEntries;
+    Union<IntegerSummary> integerUnion;
+    IntegerSummarySetOperations setOperations = new 
IntegerSummarySetOperations(_mode, _mode);
 
-    // Check if nominal entries values match
+    // Check if nominal entries is set
     if (nominalEntriesParam != null) {
-      sketchNominalEntries = Integer.parseInt(nominalEntriesParam);
+      integerUnion = new Union<>(Integer.parseInt(nominalEntriesParam), 
setOperations);
     } else {
       // If the functionParameters don't have an explicit nominal entries 
value set,
       // use the default value for nominal entries
-      sketchNominalEntries = (int) Math.pow(2, 
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+      int sketchNominalEntries = (int) Math.pow(2, 
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+      integerUnion = new Union<>(sketchNominalEntries, setOperations);
     }
 
     Sketch<IntegerSummary> first = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value1);
     Sketch<IntegerSummary> second = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value2);
-    Sketch<IntegerSummary> result =
-        new Union<>(sketchNominalEntries, new 
IntegerSummarySetOperations(_mode, _mode)).union(first, second);
+    Sketch<IntegerSummary> result = integerUnion.union(first, second);
     return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(result);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregatorTest.java
new file mode 100644
index 0000000000..aff8725e0c
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregatorTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+public class DistinctCountCPCSketchAggregatorTest {
+
+  private DistinctCountCPCSketchAggregator _cpcSketchAggregator;
+
+  @BeforeMethod
+  public void setUp() {
+    _cpcSketchAggregator = new DistinctCountCPCSketchAggregator();
+  }
+
+  @Test
+  public void testAggregateWithDefaultLgK() {
+    CpcSketch firstSketch = new CpcSketch(10);
+    CpcSketch secondSketch = new CpcSketch(20);
+    byte[] value1 = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(firstSketch);
+    byte[] value2 = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(secondSketch);
+
+    Map<String, String> functionParameters = new HashMap<>();
+    byte[] result = (byte[]) _cpcSketchAggregator.aggregate(value1, value2, 
functionParameters);
+
+    CpcSketch resultSketch = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(result);
+    assertNotNull(resultSketch);
+    assertEquals(resultSketch.getLgK(), 12);
+  }
+
+  @Test
+  public void testAggregateWithFunctionParameters() {
+    CpcSketch firstSketch = new CpcSketch(10);
+    CpcSketch secondSketch = new CpcSketch(20);
+    byte[] value1 = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(firstSketch);
+    byte[] value2 = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(secondSketch);
+
+    Map<String, String> functionParameters = new HashMap<>();
+    functionParameters.put(Constants.CPCSKETCH_LGK_KEY, "15");
+
+    byte[] result = (byte[]) _cpcSketchAggregator.aggregate(value1, value2, 
functionParameters);
+
+    CpcSketch resultSketch = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(result);
+    assertNotNull(resultSketch);
+    assertEquals(resultSketch.getLgK(), 15);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregatorTest.java
new file mode 100644
index 0000000000..0c416762e2
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregatorTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+public class DistinctCountThetaSketchAggregatorTest {
+
+  private DistinctCountThetaSketchAggregator _thetaSketchAggregator;
+
+  @BeforeMethod
+  public void setUp() {
+    _thetaSketchAggregator = new DistinctCountThetaSketchAggregator();
+  }
+
+  @Test
+  public void testAggregateWithDefaultBehaviour() {
+    Sketch firstSketch = createThetaSketch(64);
+    Sketch secondSketch = createThetaSketch(32);
+    byte[] value1 = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(firstSketch);
+    byte[] value2 = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(secondSketch);
+    Map<String, String> functionParameters = new HashMap<>();
+
+    byte[] result = (byte[]) _thetaSketchAggregator.aggregate(value1, value2, 
functionParameters);
+
+    Sketch resultSketch = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(result);
+    assertNotNull(resultSketch);
+    assertEquals(resultSketch.getRetainedEntries(), 64);
+  }
+
+  @Test
+  public void testAggregateWithNominalEntries() {
+    Sketch firstSketch = createThetaSketch(64);
+    Sketch secondSketch = createThetaSketch(32);
+    byte[] value1 = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(firstSketch);
+    byte[] value2 = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(secondSketch);
+
+    Map<String, String> functionParameters = new HashMap<>();
+    functionParameters.put(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES, "32");
+
+    byte[] result = (byte[]) _thetaSketchAggregator.aggregate(value1, value2, 
functionParameters);
+
+    Sketch resultSketch = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(result);
+    assertNotNull(resultSketch);
+    assertEquals(resultSketch.getRetainedEntries(), 32);
+  }
+
+  @Test
+  public void testAggregateWithSamplingProbability() {
+    Sketch firstSketch = createThetaSketch(64);
+    Sketch secondSketch = createThetaSketch(32);
+    byte[] value1 = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(firstSketch);
+    byte[] value2 = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(secondSketch);
+
+    Map<String, String> functionParameters = new HashMap<>();
+    functionParameters.put(Constants.THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY, 
"0.1");
+
+    byte[] result = (byte[]) _thetaSketchAggregator.aggregate(value1, value2, 
functionParameters);
+
+    Sketch resultSketch = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(result);
+    assertNotNull(resultSketch);
+    assertTrue(resultSketch.getRetainedEntries() < 64);
+  }
+
+  private Sketch createThetaSketch(int nominalEntries) {
+    UpdateSketch updateSketch = 
UpdateSketch.builder().setNominalEntries(nominalEntries).build();
+    for (int i = 0; i < nominalEntries; i++) {
+      updateSketch.update(i);
+    }
+    return updateSketch.compact();
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregatorTest.java
new file mode 100644
index 0000000000..2dbf857fca
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregatorTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+public class IntegerTupleSketchAggregatorTest {
+
+  private IntegerTupleSketchAggregator _tupleSketchAggregator;
+
+  @BeforeMethod
+  public void setUp() {
+    _tupleSketchAggregator = new 
IntegerTupleSketchAggregator(IntegerSummary.Mode.Max);
+  }
+
+  @Test
+  public void testAggregateWithDefaultBehaviour() {
+    Sketch<IntegerSummary> firstSketch = createTupleSketch(64);
+    Sketch<IntegerSummary> secondSketch = createTupleSketch(32);
+    byte[] value1 = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(firstSketch);
+    byte[] value2 = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(secondSketch);
+    Map<String, String> functionParameters = new HashMap<>();
+
+    byte[] result = (byte[]) _tupleSketchAggregator.aggregate(value1, value2, 
functionParameters);
+
+    Sketch<IntegerSummary> resultSketch = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(result);
+    assertNotNull(resultSketch);
+    assertEquals(resultSketch.getRetainedEntries(), 64);
+  }
+
+  @Test
+  public void testAggregateWithNominalEntries() {
+    Sketch<IntegerSummary> firstSketch = createTupleSketch(64);
+    Sketch<IntegerSummary> secondSketch = createTupleSketch(32);
+    byte[] value1 = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(firstSketch);
+    byte[] value2 = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(secondSketch);
+
+    Map<String, String> functionParameters = new HashMap<>();
+    functionParameters.put(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES, "32");
+
+    byte[] result = (byte[]) _tupleSketchAggregator.aggregate(value1, value2, 
functionParameters);
+
+    Sketch<IntegerSummary> resultSketch = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(result);
+    assertNotNull(resultSketch);
+    assertEquals(resultSketch.getRetainedEntries(), 32);
+  }
+
+  private CompactSketch<IntegerSummary> createTupleSketch(int nominalEntries) {
+    int lgK = (int) (Math.log(nominalEntries) / Math.log(2));
+    IntegerSketch integerSketch = new IntegerSketch(lgK, 
IntegerSummary.Mode.Max);
+    for (int i = 0; i < nominalEntries; i++) {
+      integerSketch.update(i, 1);
+    }
+    return integerSketch.compact();
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java
index f17548f039..d1e717ca6c 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java
@@ -30,6 +30,7 @@ public class Constants {
   public static final String HLLPLUS_SP_KEY = "sp";
   public static final String CPCSKETCH_LGK_KEY = "lgK";
   public static final String THETA_TUPLE_SKETCH_NOMINAL_ENTRIES = 
"nominalEntries";
+  public static final String THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY = 
"samplingProbability";
   public static final String PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY = 
"compressionFactor";
   public static final String SUMPRECISION_PRECISION_KEY = "precision";
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to