This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit c8dc2405a15cec52f65dc3fb5cd520140be36d33
Author: Yang Wang <[email protected]>
AuthorDate: Wed Feb 11 00:13:39 2026 +0800

    [kv] Forbid aggregate merge engine with WAL changelog image (#2617)
---
 .../fluss/client/admin/FlussAdminITCase.java       | 106 +++++++++++++++++++++
 .../server/utils/TableDescriptorValidation.java    |  11 +++
 2 files changed, 117 insertions(+)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index f2c23f2ae..063fad65b 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -1710,4 +1710,110 @@ class FlussAdminITCase extends ClientToServerITCaseBase 
{
         // 11. Cleanup
         admin.dropTable(tablePath, false).get();
     }
+
+    /**
+     * Test that aggregate merge engine tables cannot use WAL changelog image 
mode.
+     *
+     * <p>Validates: Requirements 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 2.1, 2.2
+     */
+    @Test
+    void testCreateTableWithAggregateEngineAndWalChangelog() throws Exception {
+        // Schema for aggregate merge engine tables
+        Schema aggregateSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("count", DataTypes.BIGINT(), 
AggFunctions.SUM())
+                        .primaryKey("id")
+                        .build();
+
+        // Test 1: Aggregate + WAL should be rejected
+        TablePath tablePath1 = TablePath.of("fluss", 
"test_aggregate_wal_changelog_rejected");
+        Map<String, String> properties1 = new HashMap<>();
+        properties1.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
+        properties1.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL");
+        TableDescriptor tableDescriptor1 =
+                TableDescriptor.builder()
+                        .schema(aggregateSchema)
+                        .comment("aggregate merge engine table with WAL 
changelog")
+                        .properties(properties1)
+                        .build();
+        assertThatThrownBy(() -> admin.createTable(tablePath1, 
tableDescriptor1, false).get())
+                .cause()
+                .isInstanceOf(InvalidConfigException.class)
+                .hasMessageContaining(
+                        "Table with 'AGGREGATION' merge engine does not 
support 'WAL' changelog image mode. "
+                                + "Aggregation merge engine tables require 
FULL changelog image mode for correct UNDO recovery. "
+                                + "Please set 'table.changelog.image' to 
'FULL' or remove the setting.");
+
+        // Test 2: Aggregate + FULL should be allowed
+        TablePath tablePath2 = TablePath.of("fluss", 
"test_aggregate_full_changelog_allowed");
+        Map<String, String> properties2 = new HashMap<>();
+        properties2.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
+        properties2.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "FULL");
+        TableDescriptor tableDescriptor2 =
+                TableDescriptor.builder()
+                        .schema(aggregateSchema)
+                        .comment("aggregate merge engine table with FULL 
changelog")
+                        .properties(properties2)
+                        .build();
+        admin.createTable(tablePath2, tableDescriptor2, false).get();
+        assertThat(admin.tableExists(tablePath2).get()).isTrue();
+
+        // Test 3: Aggregate + default (no explicit changelog image) should be 
allowed
+        TablePath tablePath3 = TablePath.of("fluss", 
"test_aggregate_default_changelog_allowed");
+        Map<String, String> properties3 = new HashMap<>();
+        properties3.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
+        // No explicit changelog image setting - defaults to FULL
+        TableDescriptor tableDescriptor3 =
+                TableDescriptor.builder()
+                        .schema(aggregateSchema)
+                        .comment("aggregate merge engine table with default 
changelog")
+                        .properties(properties3)
+                        .build();
+        admin.createTable(tablePath3, tableDescriptor3, false).get();
+        assertThat(admin.tableExists(tablePath3).get()).isTrue();
+
+        // Test 4: WAL + no merge engine should be allowed
+        TablePath tablePath4 = TablePath.of("fluss", 
"test_wal_no_merge_engine_allowed");
+        Map<String, String> properties4 = new HashMap<>();
+        properties4.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL");
+        // No merge engine setting
+        TableDescriptor tableDescriptor4 =
+                TableDescriptor.builder()
+                        .schema(DEFAULT_SCHEMA)
+                        .comment("primary key table with WAL changelog and no 
merge engine")
+                        .properties(properties4)
+                        .build();
+        admin.createTable(tablePath4, tableDescriptor4, false).get();
+        assertThat(admin.tableExists(tablePath4).get()).isTrue();
+
+        // Test 5: FIRST_ROW + WAL should be allowed
+        TablePath tablePath5 = TablePath.of("fluss", 
"test_first_row_wal_changelog_allowed");
+        Map<String, String> properties5 = new HashMap<>();
+        properties5.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row");
+        properties5.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL");
+        TableDescriptor tableDescriptor5 =
+                TableDescriptor.builder()
+                        .schema(DEFAULT_SCHEMA)
+                        .comment("first_row merge engine table with WAL 
changelog")
+                        .properties(properties5)
+                        .build();
+        admin.createTable(tablePath5, tableDescriptor5, false).get();
+        assertThat(admin.tableExists(tablePath5).get()).isTrue();
+
+        // Test 6: VERSIONED + WAL should be allowed
+        TablePath tablePath6 = TablePath.of("fluss", 
"test_versioned_wal_changelog_allowed");
+        Map<String, String> properties6 = new HashMap<>();
+        properties6.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned");
+        properties6.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(), 
"age");
+        properties6.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL");
+        TableDescriptor tableDescriptor6 =
+                TableDescriptor.builder()
+                        .schema(DEFAULT_SCHEMA)
+                        .comment("versioned merge engine table with WAL 
changelog")
+                        .properties(properties6)
+                        .build();
+        admin.createTable(tablePath6, tableDescriptor6, false).get();
+        assertThat(admin.tableExists(tablePath6).get()).isTrue();
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 96282c1b6..760ad50ae 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidConfigException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.TooManyBucketsException;
 import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.ChangelogImage;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
@@ -302,6 +303,16 @@ public class TableDescriptorValidation {
                                     versionColumn.get(), columnType));
                 }
             } else if (mergeEngine == MergeEngineType.AGGREGATION) {
+                // Check aggregate merge engine with WAL changelog image
+                ChangelogImage changelogImage = 
tableConf.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
+                if (changelogImage == ChangelogImage.WAL) {
+                    throw new InvalidConfigException(
+                            String.format(
+                                    "Table with 'AGGREGATION' merge engine 
does not support 'WAL' changelog image mode. "
+                                            + "Aggregation merge engine tables 
require FULL changelog image mode "
+                                            + "for correct UNDO recovery. 
Please set '%s' to 'FULL' or remove the setting.",
+                                    
ConfigOptions.TABLE_CHANGELOG_IMAGE.key()));
+                }
                 // Validate aggregation function parameters for aggregation 
merge engine
                 validateAggregationFunctionParameters(schema);
             }

Reply via email to