This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e12aab464d Extend Merge Rollup Capabilities for Datasketches (#14625)
e12aab464d is described below
commit e12aab464d1daad54e0c3017f1e704f34056a751
Author: David Cromberge <[email protected]>
AuthorDate: Sat Jan 4 22:58:17 2025 +0200
Extend Merge Rollup Capabilities for Datasketches (#14625)
---
.../DistinctCountCPCSketchAggregator.java | 24 +++---
.../DistinctCountThetaSketchAggregator.java | 19 +++--
.../aggregator/IntegerTupleSketchAggregator.java | 13 +--
.../DistinctCountCPCSketchAggregatorTest.java | 72 ++++++++++++++++
.../DistinctCountThetaSketchAggregatorTest.java | 98 ++++++++++++++++++++++
.../IntegerTupleSketchAggregatorTest.java | 85 +++++++++++++++++++
.../org/apache/pinot/segment/spi/Constants.java | 1 +
7 files changed, 288 insertions(+), 24 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
index 73985f564d..b708305de4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.datasketches.cpc.CpcSketch;
import org.apache.datasketches.cpc.CpcUnion;
import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -34,19 +35,18 @@ public class DistinctCountCPCSketchAggregator implements
ValueAggregator {
public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
CpcSketch first =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value1);
CpcSketch second =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value2);
- CpcSketch result;
- if (first == null && second == null) {
- result = new CpcSketch(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
- } else if (second == null) {
- result = first;
- } else if (first == null) {
- result = second;
+ CpcUnion union;
+
+ String lgKParam = functionParameters.get(Constants.CPCSKETCH_LGK_KEY);
+ if (lgKParam != null) {
+ union = new CpcUnion(Integer.parseInt(lgKParam));
} else {
- CpcUnion union = new
CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
- union.update(first);
- union.update(second);
- result = union.getResult();
+ // If the functionParameters don't have an explicit lgK value set,
+ // use the default value for nominal entries
+ union = new CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
}
- return ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(result);
+ union.update(first);
+ union.update(second);
+ return
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(union.getResult());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
index f22e38ed3c..3d00e602f0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.segment.processing.aggregator;
import java.util.Map;
+import org.apache.datasketches.theta.SetOperationBuilder;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.pinot.core.common.ObjectSerDeUtils;
@@ -33,20 +34,26 @@ public class DistinctCountThetaSketchAggregator implements
ValueAggregator {
@Override
public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
- String nominalEntriesParam =
functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
+ SetOperationBuilder unionBuilder = Union.builder();
- int sketchNominalEntries;
+ String samplingProbabilityParam =
functionParameters.get(Constants.THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY);
+ String nominalEntriesParam =
functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
- // Check if nominal entries values match
+ // Check if nominal entries is set
if (nominalEntriesParam != null) {
- sketchNominalEntries = Integer.parseInt(nominalEntriesParam);
+ unionBuilder.setNominalEntries(Integer.parseInt(nominalEntriesParam));
} else {
// If the functionParameters don't have an explicit nominal entries
value set,
// use the default value for nominal entries
- sketchNominalEntries =
CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES;
+
unionBuilder.setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES);
+ }
+
+ // Check if sampling probability is set
+ if (samplingProbabilityParam != null) {
+ unionBuilder.setP(Float.parseFloat(samplingProbabilityParam));
}
- Union union =
Union.builder().setNominalEntries(sketchNominalEntries).buildUnion();
+ Union union = unionBuilder.buildUnion();
Sketch first =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value1);
Sketch second =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value2);
Sketch result = union.union(first, second);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
index b7df4c05fe..9c1588c74f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
@@ -39,21 +39,22 @@ public class IntegerTupleSketchAggregator implements
ValueAggregator {
public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
String nominalEntriesParam =
functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
- int sketchNominalEntries;
+ Union<IntegerSummary> integerUnion;
+ IntegerSummarySetOperations setOperations = new
IntegerSummarySetOperations(_mode, _mode);
- // Check if nominal entries values match
+ // Check if nominal entries is set
if (nominalEntriesParam != null) {
- sketchNominalEntries = Integer.parseInt(nominalEntriesParam);
+ integerUnion = new Union<>(Integer.parseInt(nominalEntriesParam),
setOperations);
} else {
// If the functionParameters don't have an explicit nominal entries
value set,
// use the default value for nominal entries
- sketchNominalEntries = (int) Math.pow(2,
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+ int sketchNominalEntries = (int) Math.pow(2,
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+ integerUnion = new Union<>(sketchNominalEntries, setOperations);
}
Sketch<IntegerSummary> first =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value1);
Sketch<IntegerSummary> second =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value2);
- Sketch<IntegerSummary> result =
- new Union<>(sketchNominalEntries, new
IntegerSummarySetOperations(_mode, _mode)).union(first, second);
+ Sketch<IntegerSummary> result = integerUnion.union(first, second);
return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(result);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregatorTest.java
new file mode 100644
index 0000000000..aff8725e0c
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregatorTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+public class DistinctCountCPCSketchAggregatorTest {
+
+ private DistinctCountCPCSketchAggregator _cpcSketchAggregator;
+
+ @BeforeMethod
+ public void setUp() {
+ _cpcSketchAggregator = new DistinctCountCPCSketchAggregator();
+ }
+
+ @Test
+ public void testAggregateWithDefaultLgK() {
+ CpcSketch firstSketch = new CpcSketch(10);
+ CpcSketch secondSketch = new CpcSketch(20);
+ byte[] value1 =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(firstSketch);
+ byte[] value2 =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(secondSketch);
+
+ Map<String, String> functionParameters = new HashMap<>();
+ byte[] result = (byte[]) _cpcSketchAggregator.aggregate(value1, value2,
functionParameters);
+
+ CpcSketch resultSketch =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(result);
+ assertNotNull(resultSketch);
+ assertEquals(resultSketch.getLgK(), 12);
+ }
+
+ @Test
+ public void testAggregateWithFunctionParameters() {
+ CpcSketch firstSketch = new CpcSketch(10);
+ CpcSketch secondSketch = new CpcSketch(20);
+ byte[] value1 =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(firstSketch);
+ byte[] value2 =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(secondSketch);
+
+ Map<String, String> functionParameters = new HashMap<>();
+ functionParameters.put(Constants.CPCSKETCH_LGK_KEY, "15");
+
+ byte[] result = (byte[]) _cpcSketchAggregator.aggregate(value1, value2,
functionParameters);
+
+ CpcSketch resultSketch =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(result);
+ assertNotNull(resultSketch);
+ assertEquals(resultSketch.getLgK(), 15);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregatorTest.java
new file mode 100644
index 0000000000..0c416762e2
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregatorTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+public class DistinctCountThetaSketchAggregatorTest {
+
+ private DistinctCountThetaSketchAggregator _thetaSketchAggregator;
+
+ @BeforeMethod
+ public void setUp() {
+ _thetaSketchAggregator = new DistinctCountThetaSketchAggregator();
+ }
+
+ @Test
+ public void testAggregateWithDefaultBehaviour() {
+ Sketch firstSketch = createThetaSketch(64);
+ Sketch secondSketch = createThetaSketch(32);
+ byte[] value1 =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(firstSketch);
+ byte[] value2 =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(secondSketch);
+ Map<String, String> functionParameters = new HashMap<>();
+
+ byte[] result = (byte[]) _thetaSketchAggregator.aggregate(value1, value2,
functionParameters);
+
+ Sketch resultSketch =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(result);
+ assertNotNull(resultSketch);
+ assertEquals(resultSketch.getRetainedEntries(), 64);
+ }
+
+ @Test
+ public void testAggregateWithNominalEntries() {
+ Sketch firstSketch = createThetaSketch(64);
+ Sketch secondSketch = createThetaSketch(32);
+ byte[] value1 =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(firstSketch);
+ byte[] value2 =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(secondSketch);
+
+ Map<String, String> functionParameters = new HashMap<>();
+ functionParameters.put(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES, "32");
+
+ byte[] result = (byte[]) _thetaSketchAggregator.aggregate(value1, value2,
functionParameters);
+
+ Sketch resultSketch =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(result);
+ assertNotNull(resultSketch);
+ assertEquals(resultSketch.getRetainedEntries(), 32);
+ }
+
+ @Test
+ public void testAggregateWithSamplingProbability() {
+ Sketch firstSketch = createThetaSketch(64);
+ Sketch secondSketch = createThetaSketch(32);
+ byte[] value1 =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(firstSketch);
+ byte[] value2 =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(secondSketch);
+
+ Map<String, String> functionParameters = new HashMap<>();
+ functionParameters.put(Constants.THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY,
"0.1");
+
+ byte[] result = (byte[]) _thetaSketchAggregator.aggregate(value1, value2,
functionParameters);
+
+ Sketch resultSketch =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(result);
+ assertNotNull(resultSketch);
+ assertTrue(resultSketch.getRetainedEntries() < 64);
+ }
+
+ private Sketch createThetaSketch(int nominalEntries) {
+ UpdateSketch updateSketch =
UpdateSketch.builder().setNominalEntries(nominalEntries).build();
+ for (int i = 0; i < nominalEntries; i++) {
+ updateSketch.update(i);
+ }
+ return updateSketch.compact();
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregatorTest.java
new file mode 100644
index 0000000000..2dbf857fca
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregatorTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+public class IntegerTupleSketchAggregatorTest {
+
+ private IntegerTupleSketchAggregator _tupleSketchAggregator;
+
+ @BeforeMethod
+ public void setUp() {
+ _tupleSketchAggregator = new
IntegerTupleSketchAggregator(IntegerSummary.Mode.Max);
+ }
+
+ @Test
+ public void testAggregateWithDefaultBehaviour() {
+ Sketch<IntegerSummary> firstSketch = createTupleSketch(64);
+ Sketch<IntegerSummary> secondSketch = createTupleSketch(32);
+ byte[] value1 =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(firstSketch);
+ byte[] value2 =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(secondSketch);
+ Map<String, String> functionParameters = new HashMap<>();
+
+ byte[] result = (byte[]) _tupleSketchAggregator.aggregate(value1, value2,
functionParameters);
+
+ Sketch<IntegerSummary> resultSketch =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(result);
+ assertNotNull(resultSketch);
+ assertEquals(resultSketch.getRetainedEntries(), 64);
+ }
+
+ @Test
+ public void testAggregateWithNominalEntries() {
+ Sketch<IntegerSummary> firstSketch = createTupleSketch(64);
+ Sketch<IntegerSummary> secondSketch = createTupleSketch(32);
+ byte[] value1 =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(firstSketch);
+ byte[] value2 =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(secondSketch);
+
+ Map<String, String> functionParameters = new HashMap<>();
+ functionParameters.put(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES, "32");
+
+ byte[] result = (byte[]) _tupleSketchAggregator.aggregate(value1, value2,
functionParameters);
+
+ Sketch<IntegerSummary> resultSketch =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(result);
+ assertNotNull(resultSketch);
+ assertEquals(resultSketch.getRetainedEntries(), 32);
+ }
+
+ private CompactSketch<IntegerSummary> createTupleSketch(int nominalEntries) {
+ int lgK = (int) (Math.log(nominalEntries) / Math.log(2));
+ IntegerSketch integerSketch = new IntegerSketch(lgK,
IntegerSummary.Mode.Max);
+ for (int i = 0; i < nominalEntries; i++) {
+ integerSketch.update(i, 1);
+ }
+ return integerSketch.compact();
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java
index f17548f039..d1e717ca6c 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java
@@ -30,6 +30,7 @@ public class Constants {
public static final String HLLPLUS_SP_KEY = "sp";
public static final String CPCSKETCH_LGK_KEY = "lgK";
public static final String THETA_TUPLE_SKETCH_NOMINAL_ENTRIES =
"nominalEntries";
+ public static final String THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY =
"samplingProbability";
public static final String PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY =
"compressionFactor";
public static final String SUMPRECISION_PRECISION_KEY = "precision";
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]