This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 864bbe677 [kv] Add roaring bitmap aggregate function for aggregation
merge engine (#2390)
864bbe677 is described below
commit 864bbe677b9b7e6e21030558ebed5af5b8fc7542
Author: Yang Wang <[email protected]>
AuthorDate: Tue Jan 20 17:28:41 2026 +0800
[kv] Add roaring bitmap aggregate function for aggregation merge engine
(#2390)
---
.../org/apache/fluss/metadata/AggFunctionType.java | 6 +-
.../org/apache/fluss/metadata/AggFunctions.java | 26 +++++++
fluss-server/pom.xml | 10 +++
.../factory/FieldRoaringBitmap32AggFactory.java | 49 +++++++++++++
.../factory/FieldRoaringBitmap64AggFactory.java | 49 +++++++++++++
.../functions/FieldRoaringBitmap32Agg.java | 63 ++++++++++++++++
.../functions/FieldRoaringBitmap64Agg.java | 63 ++++++++++++++++
.../fluss/server/utils/RoaringBitmapUtils.java | 74 +++++++++++++++++++
fluss-server/src/main/resources/META-INF/NOTICE | 1 +
...merger.aggregate.factory.FieldAggregatorFactory | 2 +
.../aggregate/AggregationContextTest.java | 9 ++-
.../FieldAggregatorParameterizedTest.java | 83 ++++++++++++++++++++++
pom.xml | 1 +
.../docs/table-design/merge-engines/aggregation.md | 74 +++++++++++++++++++
14 files changed, 506 insertions(+), 4 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
index 9cf148ad2..a8a5e56bc 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
@@ -49,7 +49,11 @@ public enum AggFunctionType {
// Boolean aggregation
BOOL_AND,
- BOOL_OR;
+ BOOL_OR,
+
+ // Roaring bitmap aggregation
+ RBM32,
+ RBM64;
/** Parameter name for delimiter used in LISTAGG and STRING_AGG functions.
*/
public static final String PARAM_DELIMITER = "delimiter";
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
index 7cc7024d0..3fb647158 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
@@ -271,6 +271,32 @@ public final class AggFunctions {
return new AggFunction(AggFunctionType.BOOL_OR, null);
}
+ //
===================================================================================
+ // Roaring Bitmap Aggregation Functions
+ //
===================================================================================
+
+ /**
+ * Creates a RBM32 aggregation function that merges serialized 32-bit
roaring bitmaps.
+ *
+ * <p>Supported data types: BYTES
+ *
+ * @return a RBM32 aggregation function
+ */
+ public static AggFunction RBM32() {
+ return new AggFunction(AggFunctionType.RBM32, null);
+ }
+
+ /**
+ * Creates a RBM64 aggregation function that merges serialized 64-bit
roaring bitmaps.
+ *
+ * <p>Supported data types: BYTES
+ *
+ * @return a RBM64 aggregation function
+ */
+ public static AggFunction RBM64() {
+ return new AggFunction(AggFunctionType.RBM64, null);
+ }
+
//
===================================================================================
// Internal Factory Methods
//
===================================================================================
diff --git a/fluss-server/pom.xml b/fluss-server/pom.xml
index 85e18b2c4..fff4edbe0 100644
--- a/fluss-server/pom.xml
+++ b/fluss-server/pom.xml
@@ -72,6 +72,12 @@
<artifactId>fluss-shaded-zookeeper</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ <version>${roaringbitmap.version}</version>
+ </dependency>
+
<!--
we will start zookeeper server when start a local cluster,
need include this module for starting zookeeper requires it -->
@@ -138,6 +144,10 @@
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.fluss.shaded.org.apache.commons</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.roaringbitmap</pattern>
+
<shadedPattern>org.apache.fluss.shaded.org.roaringbitmap</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap32AggFactory.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap32AggFactory.java
new file mode 100644
index 000000000..02e9a705c
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap32AggFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rowmerger.aggregate.factory;
+
+/* This file is based on source code of Apache Paimon Project
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.AggFunctionType;
+import
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap32Agg;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/** Factory for {@link FieldRoaringBitmap32Agg}. */
+public class FieldRoaringBitmap32AggFactory implements FieldAggregatorFactory {
+
+ @Override
+ public FieldRoaringBitmap32Agg create(DataType fieldType, AggFunction
aggFunction) {
+ checkArgument(
+ fieldType.getTypeRoot() == DataTypeRoot.BYTES,
+ "Data type for rbm32 column must be 'BytesType' but was '%s'.",
+ fieldType);
+ return new FieldRoaringBitmap32Agg(fieldType);
+ }
+
+ @Override
+ public String identifier() {
+ return AggFunctionType.RBM32.toString();
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap64AggFactory.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap64AggFactory.java
new file mode 100644
index 000000000..988d7da2c
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap64AggFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rowmerger.aggregate.factory;
+
+/* This file is based on source code of Apache Paimon Project
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.AggFunctionType;
+import
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap64Agg;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/** Factory for {@link FieldRoaringBitmap64Agg}. */
+public class FieldRoaringBitmap64AggFactory implements FieldAggregatorFactory {
+
+ @Override
+ public FieldRoaringBitmap64Agg create(DataType fieldType, AggFunction
aggFunction) {
+ checkArgument(
+ fieldType.getTypeRoot() == DataTypeRoot.BYTES,
+ "Data type for rbm64 column must be 'BytesType' but was '%s'.",
+ fieldType);
+ return new FieldRoaringBitmap64Agg(fieldType);
+ }
+
+ @Override
+ public String identifier() {
+ return AggFunctionType.RBM64.toString();
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap32Agg.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap32Agg.java
new file mode 100644
index 000000000..e978f3cc4
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap32Agg.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rowmerger.aggregate.functions;
+
+/* This file is based on source code of Apache Paimon Project
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+import org.apache.fluss.server.utils.RoaringBitmapUtils;
+import org.apache.fluss.types.DataType;
+
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.IOException;
+
+/** Roaring bitmap aggregator for serialized 32-bit bitmaps. */
+public class FieldRoaringBitmap32Agg extends FieldAggregator {
+
+ private static final long serialVersionUID = 1L;
+ private final RoaringBitmap roaringBitmapAcc;
+ private final RoaringBitmap roaringBitmapInput;
+
+ public FieldRoaringBitmap32Agg(DataType dataType) {
+ super(dataType);
+ this.roaringBitmapAcc = new RoaringBitmap();
+ this.roaringBitmapInput = new RoaringBitmap();
+ }
+
+ @Override
+ public Object agg(Object accumulator, Object inputField) {
+ if (accumulator == null || inputField == null) {
+ return accumulator == null ? inputField : accumulator;
+ }
+
+ try {
+ RoaringBitmapUtils.deserializeRoaringBitmap32(roaringBitmapAcc,
(byte[]) accumulator);
+ RoaringBitmapUtils.deserializeRoaringBitmap32(roaringBitmapInput,
(byte[]) inputField);
+ roaringBitmapAcc.or(roaringBitmapInput);
+ return
RoaringBitmapUtils.serializeRoaringBitmap32(roaringBitmapAcc);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to se/deserialize roaring
bitmap.", e);
+ } finally {
+ roaringBitmapAcc.clear();
+ roaringBitmapInput.clear();
+ }
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap64Agg.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap64Agg.java
new file mode 100644
index 000000000..e1bbc270c
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap64Agg.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rowmerger.aggregate.functions;
+
+/* This file is based on source code of Apache Paimon Project
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+import org.apache.fluss.server.utils.RoaringBitmapUtils;
+import org.apache.fluss.types.DataType;
+
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+import java.io.IOException;
+
+/** Roaring bitmap aggregator for serialized 64-bit bitmaps. */
+public class FieldRoaringBitmap64Agg extends FieldAggregator {
+
+ private static final long serialVersionUID = 1L;
+ private final Roaring64Bitmap roaringBitmapAcc;
+ private final Roaring64Bitmap roaringBitmapInput;
+
+ public FieldRoaringBitmap64Agg(DataType dataType) {
+ super(dataType);
+ this.roaringBitmapAcc = new Roaring64Bitmap();
+ this.roaringBitmapInput = new Roaring64Bitmap();
+ }
+
+ @Override
+ public Object agg(Object accumulator, Object inputField) {
+ if (accumulator == null || inputField == null) {
+ return accumulator == null ? inputField : accumulator;
+ }
+
+ try {
+ RoaringBitmapUtils.deserializeRoaringBitmap64(roaringBitmapAcc,
(byte[]) accumulator);
+ RoaringBitmapUtils.deserializeRoaringBitmap64(roaringBitmapInput,
(byte[]) inputField);
+ roaringBitmapAcc.or(roaringBitmapInput);
+ return
RoaringBitmapUtils.serializeRoaringBitmap64(roaringBitmapAcc);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to se/deserialize roaring
bitmap.", e);
+ } finally {
+ roaringBitmapAcc.clear();
+ roaringBitmapInput.clear();
+ }
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/RoaringBitmapUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/RoaringBitmapUtils.java
new file mode 100644
index 000000000..37554ca56
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/RoaringBitmapUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.utils;
+
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** Utility methods for serializing roaring bitmaps. */
+public final class RoaringBitmapUtils {
+
+ private RoaringBitmapUtils() {
+ // Utility class, no instantiation
+ }
+
+ /**
+ * Serializes a 32-bit RoaringBitmap to a byte array using ByteBuffer.
+ *
+ * <p>Uses ByteBuffer as recommended by the RoaringBitmap Javadoc: "This
is the preferred method
+ * to serialize to a byte array (byte[])".
+ */
+ public static byte[] serializeRoaringBitmap32(RoaringBitmap bitmap) throws
IOException {
+ bitmap.runOptimize();
+ ByteBuffer buffer =
ByteBuffer.allocate(bitmap.serializedSizeInBytes());
+ bitmap.serialize(buffer);
+ return buffer.array();
+ }
+
+ public static void deserializeRoaringBitmap32(RoaringBitmap bitmap, byte[]
bytes)
+ throws IOException {
+ bitmap.deserialize(ByteBuffer.wrap(bytes));
+ }
+
+ /**
+ * Serializes a 64-bit Roaring64Bitmap to a byte array using
DataOutputStream.
+ *
+ * <p>Note: Unlike RoaringBitmap (32-bit), Roaring64Bitmap does not
provide a
+ * serialize(ByteBuffer) method. It only supports serialize(DataOutput),
hence the different
+ * serialization strategy.
+ */
+ public static byte[] serializeRoaringBitmap64(Roaring64Bitmap bitmap)
throws IOException {
+ bitmap.runOptimize();
+ try (ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataOutputStream dataOutput = new DataOutputStream(output)) {
+ bitmap.serialize(dataOutput);
+ return output.toByteArray();
+ }
+ }
+
+ public static void deserializeRoaringBitmap64(Roaring64Bitmap bitmap,
byte[] bytes)
+ throws IOException {
+ bitmap.deserialize(ByteBuffer.wrap(bytes));
+ }
+}
diff --git a/fluss-server/src/main/resources/META-INF/NOTICE
b/fluss-server/src/main/resources/META-INF/NOTICE
index 014b76f17..2604b80cb 100644
--- a/fluss-server/src/main/resources/META-INF/NOTICE
+++ b/fluss-server/src/main/resources/META-INF/NOTICE
@@ -13,6 +13,7 @@ This project bundles the following dependencies under the
Apache Software Licens
- commons-cli:commons-cli:1.5.0
- org.apache.commons:commons-lang3:3.18.0
- org.apache.commons:commons-math3:3.6.1
+- org.roaringbitmap:RoaringBitmap:1.3.0
- at.yawk.lz4:lz4-java:1.10.2
- org.xerial.snappy:snappy-java:1.1.10.4
diff --git
a/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory
b/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory
index f3a27da5f..cb4475ff3 100644
---
a/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory
+++
b/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory
@@ -28,3 +28,5 @@
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldBoolAndAggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldBoolOrAggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldListaggAggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldStringAggFactory
+org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldRoaringBitmap32AggFactory
+org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldRoaringBitmap64AggFactory
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContextTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContextTest.java
index ab05d18ca..78fbefe66 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContextTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContextTest.java
@@ -17,8 +17,6 @@
package org.apache.fluss.server.kv.rowmerger.aggregate;
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.config.TableConfig;
import org.apache.fluss.metadata.AggFunctions;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.Schema;
@@ -32,6 +30,8 @@ import
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldListaggAgg;
import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldMaxAgg;
import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldMinAgg;
import
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldProductAgg;
+import
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap32Agg;
+import
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap64Agg;
import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldSumAgg;
import org.apache.fluss.types.DataTypes;
@@ -65,10 +65,11 @@ class AggregationContextTest {
.column("bool_or_col", DataTypes.BOOLEAN(),
AggFunctions.BOOL_OR())
.column("listagg_col", DataTypes.STRING(),
AggFunctions.LISTAGG())
.column("string_agg_col", DataTypes.STRING(),
AggFunctions.STRING_AGG())
+ .column("rbm32_col", DataTypes.BYTES(),
AggFunctions.RBM32())
+ .column("rbm64_col", DataTypes.BYTES(),
AggFunctions.RBM64())
.primaryKey("id")
.build();
- TableConfig tableConfig = new TableConfig(new Configuration());
AggregationContext context = AggregationContext.create(schema,
KvFormat.COMPACTED);
// Primary key field should use FieldLastValueAgg (not aggregated)
@@ -86,5 +87,7 @@ class AggregationContextTest {
assertThat(context.getAggregators()[11]).isInstanceOf(FieldListaggAgg.class);
assertThat(context.getAggregators()[12])
.isInstanceOf(FieldListaggAgg.class); // string_agg is alias
+
assertThat(context.getAggregators()[13]).isInstanceOf(FieldRoaringBitmap32Agg.class);
+
assertThat(context.getAggregators()[14]).isInstanceOf(FieldRoaringBitmap64Agg.class);
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java
index 37a4a3d46..0968dffa0 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java
@@ -29,6 +29,7 @@ import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.server.kv.rowmerger.AggregateRowMerger;
+import org.apache.fluss.server.utils.RoaringBitmapUtils;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DataTypes;
@@ -37,7 +38,10 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+import java.io.IOException;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
@@ -703,6 +707,85 @@ class FieldAggregatorParameterizedTest {
assertThat(merged2.row.getString(1).toString()).isEqualTo("a,b");
}
+ //
===================================================================================
+ // Roaring Bitmap Aggregation Tests
+ //
===================================================================================
+
+ @Test
+ void testRbm32Aggregation() throws IOException {
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("value", DataTypes.BYTES(),
AggFunctions.RBM32())
+ .primaryKey("id")
+ .build();
+
+ TableConfig tableConfig = new TableConfig(new Configuration());
+ AggregateRowMerger merger = createMerger(schema, tableConfig);
+
+ RoaringBitmap bitmap1 = new RoaringBitmap();
+ bitmap1.add(1);
+ bitmap1.add(2);
+ RoaringBitmap bitmap2 = new RoaringBitmap();
+ bitmap2.add(2);
+ bitmap2.add(3);
+
+ BinaryRow row1 =
+ compactedRow(
+ schema.getRowType(),
+ new Object[] {1,
RoaringBitmapUtils.serializeRoaringBitmap32(bitmap1)});
+ BinaryRow row2 =
+ compactedRow(
+ schema.getRowType(),
+ new Object[] {1,
RoaringBitmapUtils.serializeRoaringBitmap32(bitmap2)});
+
+ BinaryValue merged = merger.merge(toBinaryValue(row1),
toBinaryValue(row2));
+
+ RoaringBitmap expected = bitmap1.clone();
+ expected.or(bitmap2);
+ byte[] expectedBytes =
RoaringBitmapUtils.serializeRoaringBitmap32(expected);
+
+ assertThat(merged.row.getBinary(1,
expectedBytes.length)).isEqualTo(expectedBytes);
+ }
+
+ @Test
+ void testRbm64Aggregation() throws IOException {
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("value", DataTypes.BYTES(),
AggFunctions.RBM64())
+ .primaryKey("id")
+ .build();
+
+ TableConfig tableConfig = new TableConfig(new Configuration());
+ AggregateRowMerger merger = createMerger(schema, tableConfig);
+
+ Roaring64Bitmap bitmap1 = new Roaring64Bitmap();
+ bitmap1.add(10L);
+ bitmap1.add(20L);
+ Roaring64Bitmap bitmap2 = new Roaring64Bitmap();
+ bitmap2.add(20L);
+ bitmap2.add(30L);
+
+ BinaryRow row1 =
+ compactedRow(
+ schema.getRowType(),
+ new Object[] {1,
RoaringBitmapUtils.serializeRoaringBitmap64(bitmap1)});
+ BinaryRow row2 =
+ compactedRow(
+ schema.getRowType(),
+ new Object[] {1,
RoaringBitmapUtils.serializeRoaringBitmap64(bitmap2)});
+
+ BinaryValue merged = merger.merge(toBinaryValue(row1),
toBinaryValue(row2));
+
+ Roaring64Bitmap expected = new Roaring64Bitmap();
+ expected.or(bitmap1);
+ expected.or(bitmap2);
+ byte[] expectedBytes =
RoaringBitmapUtils.serializeRoaringBitmap64(expected);
+
+ assertThat(merged.row.getBinary(1,
expectedBytes.length)).isEqualTo(expectedBytes);
+ }
+
//
===================================================================================
// Helper Methods
//
===================================================================================
diff --git a/pom.xml b/pom.xml
index 99558b790..07b7f9003 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
<arrow.version>15.0.0</arrow.version>
<paimon.version>1.3.1</paimon.version>
<iceberg.version>1.10.0</iceberg.version>
+ <roaringbitmap.version>1.3.0</roaringbitmap.version>
<!-- spark & scala -->
<scala212.version>2.12.18</scala212.version>
diff --git a/website/docs/table-design/merge-engines/aggregation.md
b/website/docs/table-design/merge-engines/aggregation.md
index b328dc2b7..4523e830a 100644
--- a/website/docs/table-design/merge-engines/aggregation.md
+++ b/website/docs/table-design/merge-engines/aggregation.md
@@ -827,6 +827,80 @@ TableDescriptor.builder()
</TabItem>
</Tabs>
+### rbm32
+
+Aggregates serialized 32-bit RoaringBitmap values by union.
+
+- **Supported Data Types**: BYTES
+- **Behavior**: ORs incoming bitmaps with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+<Tabs>
+<TabItem value="flink-sql" label="Flink SQL" default>
+
+```sql
+CREATE TABLE user_visits (
+ user_id BIGINT,
+ visit_bitmap BYTES,
+ PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+ 'table.merge-engine' = 'aggregation',
+ 'fields.visit_bitmap.agg' = 'rbm32'
+);
+```
+
+</TabItem>
+<TabItem value="java-client" label="Java Client">
+
+```java
+Schema schema = Schema.newBuilder()
+ .column("user_id", DataTypes.BIGINT())
+ .column("visit_bitmap", DataTypes.BYTES(), AggFunctions.RBM32())
+ .primaryKey("user_id")
+ .build();
+```
+
+</TabItem>
+</Tabs>
+
+### rbm64
+
+Aggregates serialized 64-bit RoaringBitmap values by union.
+
+- **Supported Data Types**: BYTES
+- **Behavior**: ORs incoming bitmaps with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+<Tabs>
+<TabItem value="flink-sql" label="Flink SQL" default>
+
+```sql
+CREATE TABLE session_interactions (
+ session_id BIGINT,
+ interaction_bitmap BYTES,
+ PRIMARY KEY (session_id) NOT ENFORCED
+) WITH (
+ 'table.merge-engine' = 'aggregation',
+ 'fields.interaction_bitmap.agg' = 'rbm64'
+);
+```
+
+</TabItem>
+<TabItem value="java-client" label="Java Client">
+
+```java
+Schema schema = Schema.newBuilder()
+ .column("session_id", DataTypes.BIGINT())
+ .column("interaction_bitmap", DataTypes.BYTES(), AggFunctions.RBM64())
+ .primaryKey("session_id")
+ .build();
+```
+
+</TabItem>
+</Tabs>
+
### bool_and
Evaluates whether all boolean values in a set are true (logical AND).