This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 864bbe677 [kv] Add roaring bitmap aggregate function for aggregation 
merge engine (#2390)
864bbe677 is described below

commit 864bbe677b9b7e6e21030558ebed5af5b8fc7542
Author: Yang Wang <[email protected]>
AuthorDate: Tue Jan 20 17:28:41 2026 +0800

    [kv] Add roaring bitmap aggregate function for aggregation merge engine 
(#2390)
---
 .../org/apache/fluss/metadata/AggFunctionType.java |  6 +-
 .../org/apache/fluss/metadata/AggFunctions.java    | 26 +++++++
 fluss-server/pom.xml                               | 10 +++
 .../factory/FieldRoaringBitmap32AggFactory.java    | 49 +++++++++++++
 .../factory/FieldRoaringBitmap64AggFactory.java    | 49 +++++++++++++
 .../functions/FieldRoaringBitmap32Agg.java         | 63 ++++++++++++++++
 .../functions/FieldRoaringBitmap64Agg.java         | 63 ++++++++++++++++
 .../fluss/server/utils/RoaringBitmapUtils.java     | 74 +++++++++++++++++++
 fluss-server/src/main/resources/META-INF/NOTICE    |  1 +
 ...merger.aggregate.factory.FieldAggregatorFactory |  2 +
 .../aggregate/AggregationContextTest.java          |  9 ++-
 .../FieldAggregatorParameterizedTest.java          | 83 ++++++++++++++++++++++
 pom.xml                                            |  1 +
 .../docs/table-design/merge-engines/aggregation.md | 74 +++++++++++++++++++
 14 files changed, 506 insertions(+), 4 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
index 9cf148ad2..a8a5e56bc 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java
@@ -49,7 +49,11 @@ public enum AggFunctionType {
 
     // Boolean aggregation
     BOOL_AND,
-    BOOL_OR;
+    BOOL_OR,
+
+    // Roaring bitmap aggregation
+    RBM32,
+    RBM64;
 
     /** Parameter name for delimiter used in LISTAGG and STRING_AGG functions. 
*/
     public static final String PARAM_DELIMITER = "delimiter";
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
index 7cc7024d0..3fb647158 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java
@@ -271,6 +271,32 @@ public final class AggFunctions {
         return new AggFunction(AggFunctionType.BOOL_OR, null);
     }
 
+    // 
===================================================================================
+    // Roaring Bitmap Aggregation Functions
+    // 
===================================================================================
+
+    /**
+     * Creates a RBM32 aggregation function that merges serialized 32-bit 
roaring bitmaps.
+     *
+     * <p>Supported data types: BYTES
+     *
+     * @return a RBM32 aggregation function
+     */
+    public static AggFunction RBM32() {
+        return new AggFunction(AggFunctionType.RBM32, null);
+    }
+
+    /**
+     * Creates a RBM64 aggregation function that merges serialized 64-bit 
roaring bitmaps.
+     *
+     * <p>Supported data types: BYTES
+     *
+     * @return a RBM64 aggregation function
+     */
+    public static AggFunction RBM64() {
+        return new AggFunction(AggFunctionType.RBM64, null);
+    }
+
     // 
===================================================================================
     // Internal Factory Methods
     // 
===================================================================================
diff --git a/fluss-server/pom.xml b/fluss-server/pom.xml
index 85e18b2c4..fff4edbe0 100644
--- a/fluss-server/pom.xml
+++ b/fluss-server/pom.xml
@@ -72,6 +72,12 @@
             <artifactId>fluss-shaded-zookeeper</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.roaringbitmap</groupId>
+            <artifactId>RoaringBitmap</artifactId>
+            <version>${roaringbitmap.version}</version>
+        </dependency>
+
         <!--
         we will start zookeeper server when start a local cluster,
         need include this module for starting zookeeper requires it -->
@@ -138,6 +144,10 @@
                                     <pattern>org.apache.commons</pattern>
                                     
<shadedPattern>org.apache.fluss.shaded.org.apache.commons</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>org.roaringbitmap</pattern>
+                                    
<shadedPattern>org.apache.fluss.shaded.org.roaringbitmap</shadedPattern>
+                                </relocation>
                             </relocations>
                         </configuration>
                     </execution>
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap32AggFactory.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap32AggFactory.java
new file mode 100644
index 000000000..02e9a705c
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap32AggFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rowmerger.aggregate.factory;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.AggFunctionType;
+import 
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap32Agg;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/** Factory for {@link FieldRoaringBitmap32Agg}. */
+public class FieldRoaringBitmap32AggFactory implements FieldAggregatorFactory {
+
+    @Override
+    public FieldRoaringBitmap32Agg create(DataType fieldType, AggFunction 
aggFunction) {
+        checkArgument(
+                fieldType.getTypeRoot() == DataTypeRoot.BYTES,
+                "Data type for rbm32 column must be 'BytesType' but was '%s'.",
+                fieldType);
+        return new FieldRoaringBitmap32Agg(fieldType);
+    }
+
+    @Override
+    public String identifier() {
+        return AggFunctionType.RBM32.toString();
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap64AggFactory.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap64AggFactory.java
new file mode 100644
index 000000000..988d7da2c
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldRoaringBitmap64AggFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rowmerger.aggregate.factory;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.AggFunctionType;
+import 
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap64Agg;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/** Factory for {@link FieldRoaringBitmap64Agg}. */
+public class FieldRoaringBitmap64AggFactory implements FieldAggregatorFactory {
+
+    @Override
+    public FieldRoaringBitmap64Agg create(DataType fieldType, AggFunction 
aggFunction) {
+        checkArgument(
+                fieldType.getTypeRoot() == DataTypeRoot.BYTES,
+                "Data type for rbm64 column must be 'BytesType' but was '%s'.",
+                fieldType);
+        return new FieldRoaringBitmap64Agg(fieldType);
+    }
+
+    @Override
+    public String identifier() {
+        return AggFunctionType.RBM64.toString();
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap32Agg.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap32Agg.java
new file mode 100644
index 000000000..e978f3cc4
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap32Agg.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rowmerger.aggregate.functions;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+import org.apache.fluss.server.utils.RoaringBitmapUtils;
+import org.apache.fluss.types.DataType;
+
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.IOException;
+
+/** Roaring bitmap aggregator for serialized 32-bit bitmaps. */
+public class FieldRoaringBitmap32Agg extends FieldAggregator {
+
+    private static final long serialVersionUID = 1L;
+    private final RoaringBitmap roaringBitmapAcc;
+    private final RoaringBitmap roaringBitmapInput;
+
+    public FieldRoaringBitmap32Agg(DataType dataType) {
+        super(dataType);
+        this.roaringBitmapAcc = new RoaringBitmap();
+        this.roaringBitmapInput = new RoaringBitmap();
+    }
+
+    @Override
+    public Object agg(Object accumulator, Object inputField) {
+        if (accumulator == null || inputField == null) {
+            return accumulator == null ? inputField : accumulator;
+        }
+
+        try {
+            RoaringBitmapUtils.deserializeRoaringBitmap32(roaringBitmapAcc, 
(byte[]) accumulator);
+            RoaringBitmapUtils.deserializeRoaringBitmap32(roaringBitmapInput, 
(byte[]) inputField);
+            roaringBitmapAcc.or(roaringBitmapInput);
+            return 
RoaringBitmapUtils.serializeRoaringBitmap32(roaringBitmapAcc);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to se/deserialize roaring 
bitmap.", e);
+        } finally {
+            roaringBitmapAcc.clear();
+            roaringBitmapInput.clear();
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap64Agg.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap64Agg.java
new file mode 100644
index 000000000..e1bbc270c
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldRoaringBitmap64Agg.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rowmerger.aggregate.functions;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+import org.apache.fluss.server.utils.RoaringBitmapUtils;
+import org.apache.fluss.types.DataType;
+
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+import java.io.IOException;
+
+/** Roaring bitmap aggregator for serialized 64-bit bitmaps. */
+public class FieldRoaringBitmap64Agg extends FieldAggregator {
+
+    private static final long serialVersionUID = 1L;
+    private final Roaring64Bitmap roaringBitmapAcc;
+    private final Roaring64Bitmap roaringBitmapInput;
+
+    public FieldRoaringBitmap64Agg(DataType dataType) {
+        super(dataType);
+        this.roaringBitmapAcc = new Roaring64Bitmap();
+        this.roaringBitmapInput = new Roaring64Bitmap();
+    }
+
+    @Override
+    public Object agg(Object accumulator, Object inputField) {
+        if (accumulator == null || inputField == null) {
+            return accumulator == null ? inputField : accumulator;
+        }
+
+        try {
+            RoaringBitmapUtils.deserializeRoaringBitmap64(roaringBitmapAcc, 
(byte[]) accumulator);
+            RoaringBitmapUtils.deserializeRoaringBitmap64(roaringBitmapInput, 
(byte[]) inputField);
+            roaringBitmapAcc.or(roaringBitmapInput);
+            return 
RoaringBitmapUtils.serializeRoaringBitmap64(roaringBitmapAcc);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to se/deserialize roaring 
bitmap.", e);
+        } finally {
+            roaringBitmapAcc.clear();
+            roaringBitmapInput.clear();
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/RoaringBitmapUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/RoaringBitmapUtils.java
new file mode 100644
index 000000000..37554ca56
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/RoaringBitmapUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.utils;
+
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** Utility methods for serializing roaring bitmaps. */
+public final class RoaringBitmapUtils {
+
+    private RoaringBitmapUtils() {
+        // Utility class, no instantiation
+    }
+
+    /**
+     * Serializes a 32-bit RoaringBitmap to a byte array using ByteBuffer.
+     *
+     * <p>Uses ByteBuffer as recommended by the RoaringBitmap Javadoc: "This 
is the preferred method
+     * to serialize to a byte array (byte[])".
+     */
+    public static byte[] serializeRoaringBitmap32(RoaringBitmap bitmap) throws 
IOException {
+        bitmap.runOptimize();
+        ByteBuffer buffer = 
ByteBuffer.allocate(bitmap.serializedSizeInBytes());
+        bitmap.serialize(buffer);
+        return buffer.array();
+    }
+
+    public static void deserializeRoaringBitmap32(RoaringBitmap bitmap, byte[] 
bytes)
+            throws IOException {
+        bitmap.deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    /**
+     * Serializes a 64-bit Roaring64Bitmap to a byte array using 
DataOutputStream.
+     *
+     * <p>Note: Unlike RoaringBitmap (32-bit), Roaring64Bitmap does not 
provide a
+     * serialize(ByteBuffer) method. It only supports serialize(DataOutput), 
hence the different
+     * serialization strategy.
+     */
+    public static byte[] serializeRoaringBitmap64(Roaring64Bitmap bitmap) 
throws IOException {
+        bitmap.runOptimize();
+        try (ByteArrayOutputStream output = new ByteArrayOutputStream();
+                DataOutputStream dataOutput = new DataOutputStream(output)) {
+            bitmap.serialize(dataOutput);
+            return output.toByteArray();
+        }
+    }
+
+    public static void deserializeRoaringBitmap64(Roaring64Bitmap bitmap, 
byte[] bytes)
+            throws IOException {
+        bitmap.deserialize(ByteBuffer.wrap(bytes));
+    }
+}
diff --git a/fluss-server/src/main/resources/META-INF/NOTICE 
b/fluss-server/src/main/resources/META-INF/NOTICE
index 014b76f17..2604b80cb 100644
--- a/fluss-server/src/main/resources/META-INF/NOTICE
+++ b/fluss-server/src/main/resources/META-INF/NOTICE
@@ -13,6 +13,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - commons-cli:commons-cli:1.5.0
 - org.apache.commons:commons-lang3:3.18.0
 - org.apache.commons:commons-math3:3.6.1
+- org.roaringbitmap:RoaringBitmap:1.3.0
 - at.yawk.lz4:lz4-java:1.10.2
 - org.xerial.snappy:snappy-java:1.1.10.4
 
diff --git 
a/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory
 
b/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory
index f3a27da5f..cb4475ff3 100644
--- 
a/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory
+++ 
b/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory
@@ -28,3 +28,5 @@ 
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldBoolAndAggFactory
 org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldBoolOrAggFactory
 org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldListaggAggFactory
 org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldStringAggFactory
+org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldRoaringBitmap32AggFactory
+org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldRoaringBitmap64AggFactory
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContextTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContextTest.java
index ab05d18ca..78fbefe66 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContextTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContextTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.fluss.server.kv.rowmerger.aggregate;
 
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.config.TableConfig;
 import org.apache.fluss.metadata.AggFunctions;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.Schema;
@@ -32,6 +30,8 @@ import 
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldListaggAgg;
 import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldMaxAgg;
 import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldMinAgg;
 import 
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldProductAgg;
+import 
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap32Agg;
+import 
org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap64Agg;
 import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldSumAgg;
 import org.apache.fluss.types.DataTypes;
 
@@ -65,10 +65,11 @@ class AggregationContextTest {
                         .column("bool_or_col", DataTypes.BOOLEAN(), 
AggFunctions.BOOL_OR())
                         .column("listagg_col", DataTypes.STRING(), 
AggFunctions.LISTAGG())
                         .column("string_agg_col", DataTypes.STRING(), 
AggFunctions.STRING_AGG())
+                        .column("rbm32_col", DataTypes.BYTES(), 
AggFunctions.RBM32())
+                        .column("rbm64_col", DataTypes.BYTES(), 
AggFunctions.RBM64())
                         .primaryKey("id")
                         .build();
 
-        TableConfig tableConfig = new TableConfig(new Configuration());
         AggregationContext context = AggregationContext.create(schema, 
KvFormat.COMPACTED);
 
         // Primary key field should use FieldLastValueAgg (not aggregated)
@@ -86,5 +87,7 @@ class AggregationContextTest {
         
assertThat(context.getAggregators()[11]).isInstanceOf(FieldListaggAgg.class);
         assertThat(context.getAggregators()[12])
                 .isInstanceOf(FieldListaggAgg.class); // string_agg is alias
+        
assertThat(context.getAggregators()[13]).isInstanceOf(FieldRoaringBitmap32Agg.class);
+        
assertThat(context.getAggregators()[14]).isInstanceOf(FieldRoaringBitmap64Agg.class);
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java
index 37a4a3d46..0968dffa0 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java
@@ -29,6 +29,7 @@ import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.server.kv.rowmerger.AggregateRowMerger;
+import org.apache.fluss.server.utils.RoaringBitmapUtils;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypeChecks;
 import org.apache.fluss.types.DataTypes;
@@ -37,7 +38,10 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.Instant;
 import java.time.LocalDate;
@@ -703,6 +707,85 @@ class FieldAggregatorParameterizedTest {
         assertThat(merged2.row.getString(1).toString()).isEqualTo("a,b");
     }
 
+    // 
===================================================================================
+    // Roaring Bitmap Aggregation Tests
+    // 
===================================================================================
+
+    @Test
+    void testRbm32Aggregation() throws IOException {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("value", DataTypes.BYTES(), 
AggFunctions.RBM32())
+                        .primaryKey("id")
+                        .build();
+
+        TableConfig tableConfig = new TableConfig(new Configuration());
+        AggregateRowMerger merger = createMerger(schema, tableConfig);
+
+        RoaringBitmap bitmap1 = new RoaringBitmap();
+        bitmap1.add(1);
+        bitmap1.add(2);
+        RoaringBitmap bitmap2 = new RoaringBitmap();
+        bitmap2.add(2);
+        bitmap2.add(3);
+
+        BinaryRow row1 =
+                compactedRow(
+                        schema.getRowType(),
+                        new Object[] {1, 
RoaringBitmapUtils.serializeRoaringBitmap32(bitmap1)});
+        BinaryRow row2 =
+                compactedRow(
+                        schema.getRowType(),
+                        new Object[] {1, 
RoaringBitmapUtils.serializeRoaringBitmap32(bitmap2)});
+
+        BinaryValue merged = merger.merge(toBinaryValue(row1), 
toBinaryValue(row2));
+
+        RoaringBitmap expected = bitmap1.clone();
+        expected.or(bitmap2);
+        byte[] expectedBytes = 
RoaringBitmapUtils.serializeRoaringBitmap32(expected);
+
+        assertThat(merged.row.getBinary(1, 
expectedBytes.length)).isEqualTo(expectedBytes);
+    }
+
+    @Test
+    void testRbm64Aggregation() throws IOException {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("value", DataTypes.BYTES(), 
AggFunctions.RBM64())
+                        .primaryKey("id")
+                        .build();
+
+        TableConfig tableConfig = new TableConfig(new Configuration());
+        AggregateRowMerger merger = createMerger(schema, tableConfig);
+
+        Roaring64Bitmap bitmap1 = new Roaring64Bitmap();
+        bitmap1.add(10L);
+        bitmap1.add(20L);
+        Roaring64Bitmap bitmap2 = new Roaring64Bitmap();
+        bitmap2.add(20L);
+        bitmap2.add(30L);
+
+        BinaryRow row1 =
+                compactedRow(
+                        schema.getRowType(),
+                        new Object[] {1, 
RoaringBitmapUtils.serializeRoaringBitmap64(bitmap1)});
+        BinaryRow row2 =
+                compactedRow(
+                        schema.getRowType(),
+                        new Object[] {1, 
RoaringBitmapUtils.serializeRoaringBitmap64(bitmap2)});
+
+        BinaryValue merged = merger.merge(toBinaryValue(row1), 
toBinaryValue(row2));
+
+        Roaring64Bitmap expected = new Roaring64Bitmap();
+        expected.or(bitmap1);
+        expected.or(bitmap2);
+        byte[] expectedBytes = 
RoaringBitmapUtils.serializeRoaringBitmap64(expected);
+
+        assertThat(merged.row.getBinary(1, 
expectedBytes.length)).isEqualTo(expectedBytes);
+    }
+
     // 
===================================================================================
     // Helper Methods
     // 
===================================================================================
diff --git a/pom.xml b/pom.xml
index 99558b790..07b7f9003 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
         <arrow.version>15.0.0</arrow.version>
         <paimon.version>1.3.1</paimon.version>
         <iceberg.version>1.10.0</iceberg.version>
+        <roaringbitmap.version>1.3.0</roaringbitmap.version>
 
         <!-- spark & scala -->
         <scala212.version>2.12.18</scala212.version>
diff --git a/website/docs/table-design/merge-engines/aggregation.md 
b/website/docs/table-design/merge-engines/aggregation.md
index b328dc2b7..4523e830a 100644
--- a/website/docs/table-design/merge-engines/aggregation.md
+++ b/website/docs/table-design/merge-engines/aggregation.md
@@ -827,6 +827,80 @@ TableDescriptor.builder()
 </TabItem>
 </Tabs>
 
+### rbm32
+
+Aggregates serialized 32-bit RoaringBitmap values by union.
+
+- **Supported Data Types**: BYTES
+- **Behavior**: ORs incoming bitmaps with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+<Tabs>
+<TabItem value="flink-sql" label="Flink SQL" default>
+
+```sql
+CREATE TABLE user_visits (
+    user_id BIGINT,
+    visit_bitmap BYTES,
+    PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+    'table.merge-engine' = 'aggregation',
+    'fields.visit_bitmap.agg' = 'rbm32'
+);
+```
+
+</TabItem>
+<TabItem value="java-client" label="Java Client">
+
+```java
+Schema schema = Schema.newBuilder()
+    .column("user_id", DataTypes.BIGINT())
+    .column("visit_bitmap", DataTypes.BYTES(), AggFunctions.RBM32())
+    .primaryKey("user_id")
+    .build();
+```
+
+</TabItem>
+</Tabs>
+
+### rbm64
+
+Aggregates serialized 64-bit RoaringBitmap values by union.
+
+- **Supported Data Types**: BYTES
+- **Behavior**: ORs incoming bitmaps with the accumulator
+- **Null Handling**: Null values are ignored
+
+**Example:**
+<Tabs>
+<TabItem value="flink-sql" label="Flink SQL" default>
+
+```sql
+CREATE TABLE session_interactions (
+    session_id BIGINT,
+    interaction_bitmap BYTES,
+    PRIMARY KEY (session_id) NOT ENFORCED
+) WITH (
+    'table.merge-engine' = 'aggregation',
+    'fields.interaction_bitmap.agg' = 'rbm64'
+);
+```
+
+</TabItem>
+<TabItem value="java-client" label="Java Client">
+
+```java
+Schema schema = Schema.newBuilder()
+    .column("session_id", DataTypes.BIGINT())
+    .column("interaction_bitmap", DataTypes.BYTES(), AggFunctions.RBM64())
+    .primaryKey("session_id")
+    .build();
+```
+
+</TabItem>
+</Tabs>
+
 ### bool_and
 
 Evaluates whether all boolean values in a set are true (logical AND).

Reply via email to