This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3a09d3ce0 [core] add hll aggregator support (#3988)
3a09d3ce0 is described below
commit 3a09d3ce07f3a0cf589db2f3a3b145a6652edcdc
Author: ranxianglei <[email protected]>
AuthorDate: Mon Aug 19 16:11:47 2024 +0800
[core] add hll aggregator support (#3988)
---
.../primary-key-table/merge-engine/aggregation.md | 97 ++++++++++++++++++++--
.../org/apache/paimon/utils/HllSketchUtil.java | 36 ++++++++
.../compact/aggregate/FieldAggregator.java | 7 ++
.../compact/aggregate/FieldHllSketchAgg.java | 59 +++++++++++++
.../compact/aggregate/FieldAggregatorTest.java | 24 ++++++
5 files changed, 216 insertions(+), 7 deletions(-)
diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md
b/docs/content/primary-key-table/merge-engine/aggregation.md
index 27b29ed5b..7c784be35 100644
--- a/docs/content/primary-key-table/merge-engine/aggregation.md
+++ b/docs/content/primary-key-table/merge-engine/aggregation.md
@@ -201,7 +201,90 @@ Current supported aggregate functions and data types are:
### merge_map
The merge_map function merge input maps. It only supports MAP type.
-### theta_sketch
+### Types of cardinality sketches
+
+ Paimon uses the [Apache DataSketches](https://datasketches.apache.org/)
library of stochastic streaming algorithms to implement sketch modules. The
DataSketches library includes various types of sketches, each one designed to
solve a different sort of problem. Paimon supports HyperLogLog (HLL) and Theta
cardinality sketches.
+
+#### HyperLogLog
+
+ The HyperLogLog (HLL) sketch aggregator is a very compact sketch algorithm
for approximate distinct counting. You can also use the HLL aggregator to
calculate a union of HLL sketches.
+
+#### Theta
+
+ The Theta sketch is a sketch algorithm for approximate distinct counting with
set operations. Theta sketches let you count the overlap between sets, so that
you can compute the union, intersection, or set difference between sketch
objects.
+
+#### Choosing a sketch type
+
+ HLL and Theta sketches both support approximate distinct counting; however,
the HLL sketch produces more accurate results and consumes less storage space.
Theta sketches are more flexible but require significantly more memory.
+
+When choosing an approximation algorithm for your use case, consider the
following:
+
+If your use case entails distinct counting and merging sketch objects, use the
HLL sketch.
+If you need to evaluate union, intersection, or difference set operations, use
the Theta sketch.
+You cannot merge HLL sketches with Theta sketches.
+
+#### hll_sketch
+
+The hll_sketch function aggregates multiple serialized Sketch objects into a
single Sketch.
+It supports VARBINARY data type.
+
+An example:
+
+{{< tabs "nested_update-example" >}}
+
+{{< tab "Flink" >}}
+
+ ```sql
+ -- source table
+ CREATE TABLE VISITS (
+ id INT PRIMARY KEY NOT ENFORCED,
+ user_id STRING
+ );
+
+ -- agg table
+ CREATE TABLE UV_AGG (
+ id INT PRIMARY KEY NOT ENFORCED,
+ uv VARBINARY
+ ) WITH (
+ 'merge-engine' = 'aggregation',
+ 'fields.f0.aggregate-function' = 'hll_sketch'
+ );
+
+ -- Register the following class as a Flink function with the name
"HLL_SKETCH"
+ -- which is used to transform input to sketch bytes array:
+ --
+ -- public static class HllSketchFunction extends ScalarFunction {
+ -- public byte[] eval(String user_id) {
+ -- HllSketch hllSketch = new HllSketch();
+ -- hllSketch.update(id);
+ -- return hllSketch.toCompactByteArray();
+ -- }
+ -- }
+ --
+ INSERT INTO UV_AGG SELECT id, HLL_SKETCH(user_id) FROM VISITS;
+
+ -- Register the following class as a Flink function with the name
"HLL_SKETCH_COUNT"
+ -- which is used to get cardinality from sketch bytes array:
+ --
+ -- public static class HllSketchCountFunction extends ScalarFunction {
+ -- public Double eval(byte[] sketchBytes) {
+ -- if (sketchBytes == null) {
+ -- return 0d;
+ -- }
+ -- return HllSketch.heapify(sketchBytes).getEstimate();
+ -- }
+ -- }
+ --
+ -- Then we can get user cardinality based on the aggregated field.
+ SELECT id, HLL_SKETCH_COUNT(UV) as uv FROM UV_AGG;
+ ```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+
+#### theta_sketch
The theta_sketch function aggregates multiple serialized Sketch objects into
a single Sketch.
It supports VARBINARY data type.
@@ -227,10 +310,10 @@ Current supported aggregate functions and data types are:
'fields.f0.aggregate-function' = 'theta_sketch'
);
- -- Register the following class as a Flink function with the name "SKETCH"
+ -- Register the following class as a Flink function with the name
"THETA_SKETCH"
-- which is used to transform input to sketch bytes array:
--
- -- public static class SketchFunction extends ScalarFunction {
+ -- public static class ThetaSketchFunction extends ScalarFunction {
-- public byte[] eval(String user_id) {
-- UpdateSketch updateSketch = UpdateSketch.builder().build();
-- updateSketch.update(user_id);
@@ -238,12 +321,12 @@ Current supported aggregate functions and data types are:
-- }
-- }
--
- INSERT INTO UV_AGG SELECT id, SKETCH(user_id) FROM VISITS;
+ INSERT INTO UV_AGG SELECT id, THETA_SKETCH(user_id) FROM VISITS;
- -- Register the following class as a Flink function with the name
"SKETCH_COUNT"
+ -- Register the following class as a Flink function with the name
"THETA_SKETCH_COUNT"
-- which is used to get cardinality from sketch bytes array:
--
- -- public static class SketchCountFunction extends ScalarFunction {
+ -- public static class ThetaSketchCountFunction extends ScalarFunction {
-- public Double eval(byte[] sketchBytes) {
-- if (sketchBytes == null) {
-- return 0d;
@@ -253,7 +336,7 @@ Current supported aggregate functions and data types are:
-- }
--
-- Then we can get user cardinality based on the aggregated field.
- SELECT id, SKETCH_COUNT(UV) as uv FROM UV_AGG;
+ SELECT id, THETA_SKETCH_COUNT(UV) as uv FROM UV_AGG;
```
{{< /tab >}}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/HllSketchUtil.java
b/paimon-common/src/main/java/org/apache/paimon/utils/HllSketchUtil.java
new file mode 100644
index 000000000..34c5464f7
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/HllSketchUtil.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+import org.apache.datasketches.hll.HllSketch;
+
+/** A compressed bitmap for 32-bit integer. */
+public class HllSketchUtil {
+
+ @VisibleForTesting
+ public static byte[] sketchOf(int... values) {
+ HllSketch hllSketch = new HllSketch();
+ for (int value : values) {
+ hllSketch.update(value);
+ }
+ return hllSketch.toCompactByteArray();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
index b3fb7b0eb..8fcdb14a6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
@@ -121,6 +121,13 @@ public abstract class FieldAggregator implements
Serializable {
fieldType);
fieldAggregator = new
FieldThetaSketchAgg((VarBinaryType) fieldType);
break;
+ case FieldHllSketchAgg.NAME:
+ checkArgument(
+ fieldType instanceof VarBinaryType,
+ "Data type for hll sketch column must be
'VarBinaryType' but was '%s'.",
+ fieldType);
+ fieldAggregator = new
FieldHllSketchAgg((VarBinaryType) fieldType);
+ break;
case FieldRoaringBitmap32Agg.NAME:
checkArgument(
fieldType instanceof VarBinaryType,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
new file mode 100644
index 000000000..939017536
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.aggregate;
+
+import org.apache.paimon.types.VarBinaryType;
+
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.TgtHllType;
+import org.apache.datasketches.hll.Union;
+
+/** HllSketch aggregate a field of a row. */
+public class FieldHllSketchAgg extends FieldAggregator {
+
+ public static final String NAME = "hll_sketch";
+
+ private static final long serialVersionUID = 1L;
+
+ public FieldHllSketchAgg(VarBinaryType dataType) {
+ super(dataType);
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public Object agg(Object accumulator, Object inputField) {
+ if (accumulator == null && inputField == null) {
+ return null;
+ }
+
+ if (accumulator == null || inputField == null) {
+ return accumulator == null ? inputField : accumulator;
+ }
+
+ HllSketch heapify = HllSketch.heapify((byte[]) accumulator);
+ Union union = Union.heapify((byte[]) inputField);
+ union.update(heapify);
+ HllSketch result = union.getResult(TgtHllType.HLL_4);
+ return result.toCompactByteArray();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index b0ae2c5c0..2d9a03b74 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -41,6 +41,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.HllSketchUtil;
import org.apache.paimon.utils.RoaringBitmap32;
import org.apache.paimon.utils.RoaringBitmap64;
@@ -740,6 +741,29 @@ public class FieldAggregatorTest {
assertThat(result4).isEqualTo(acc2);
}
+ @Test
+ public void testFieldHllSketchAgg() {
+ FieldHllSketchAgg agg = new FieldHllSketchAgg(DataTypes.VARBINARY(20));
+
+ byte[] inputVal = HllSketchUtil.sketchOf(1);
+ byte[] acc1 = HllSketchUtil.sketchOf(2, 3);
+ byte[] acc2 = HllSketchUtil.sketchOf(1, 2, 3);
+
+ assertThat(agg.agg(null, null)).isNull();
+
+ byte[] result1 = (byte[]) agg.agg(null, inputVal);
+ assertThat(inputVal).isEqualTo(result1);
+
+ byte[] result2 = (byte[]) agg.agg(acc1, null);
+ assertThat(result2).isEqualTo(acc1);
+
+ byte[] result3 = (byte[]) agg.agg(acc1, inputVal);
+ assertThat(result3).isEqualTo(acc2);
+
+ byte[] result4 = (byte[]) agg.agg(acc2, inputVal);
+ assertThat(result4).isEqualTo(acc2);
+ }
+
@Test
public void testFieldRoaringBitmap32Agg() {
FieldRoaringBitmap32Agg agg = new
FieldRoaringBitmap32Agg(DataTypes.VARBINARY(20));