This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 1881c2cef [kv] Forbid aggregate merge engine with WAL changelog image
(#2617)
1881c2cef is described below
commit 1881c2cefcc376d21f5ba5f34bdbf1bfa2544bd1
Author: Yang Wang <[email protected]>
AuthorDate: Wed Feb 11 00:13:39 2026 +0800
[kv] Forbid aggregate merge engine with WAL changelog image (#2617)
---
.../fluss/client/admin/FlussAdminITCase.java | 106 +++++++++++++++++++++
.../server/utils/TableDescriptorValidation.java | 11 +++
2 files changed, 117 insertions(+)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index f2c23f2ae..063fad65b 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -1710,4 +1710,110 @@ class FlussAdminITCase extends ClientToServerITCaseBase
{
// 11. Cleanup
admin.dropTable(tablePath, false).get();
}
+
+ /**
+ * Test that aggregate merge engine tables cannot use WAL changelog image
mode.
+ *
+ * <p>Validates: Requirements 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 2.1, 2.2
+ */
+ @Test
+ void testCreateTableWithAggregateEngineAndWalChangelog() throws Exception {
+ // Schema for aggregate merge engine tables
+ Schema aggregateSchema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("count", DataTypes.BIGINT(),
AggFunctions.SUM())
+ .primaryKey("id")
+ .build();
+
+ // Test 1: Aggregate + WAL should be rejected
+ TablePath tablePath1 = TablePath.of("fluss",
"test_aggregate_wal_changelog_rejected");
+ Map<String, String> properties1 = new HashMap<>();
+ properties1.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
+ properties1.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL");
+ TableDescriptor tableDescriptor1 =
+ TableDescriptor.builder()
+ .schema(aggregateSchema)
+ .comment("aggregate merge engine table with WAL
changelog")
+ .properties(properties1)
+ .build();
+ assertThatThrownBy(() -> admin.createTable(tablePath1,
tableDescriptor1, false).get())
+ .cause()
+ .isInstanceOf(InvalidConfigException.class)
+ .hasMessageContaining(
+ "Table with 'AGGREGATION' merge engine does not
support 'WAL' changelog image mode. "
+ + "Aggregation merge engine tables require
FULL changelog image mode for correct UNDO recovery. "
+ + "Please set 'table.changelog.image' to
'FULL' or remove the setting.");
+
+ // Test 2: Aggregate + FULL should be allowed
+ TablePath tablePath2 = TablePath.of("fluss",
"test_aggregate_full_changelog_allowed");
+ Map<String, String> properties2 = new HashMap<>();
+ properties2.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
+ properties2.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "FULL");
+ TableDescriptor tableDescriptor2 =
+ TableDescriptor.builder()
+ .schema(aggregateSchema)
+ .comment("aggregate merge engine table with FULL
changelog")
+ .properties(properties2)
+ .build();
+ admin.createTable(tablePath2, tableDescriptor2, false).get();
+ assertThat(admin.tableExists(tablePath2).get()).isTrue();
+
+ // Test 3: Aggregate + default (no explicit changelog image) should be
allowed
+ TablePath tablePath3 = TablePath.of("fluss",
"test_aggregate_default_changelog_allowed");
+ Map<String, String> properties3 = new HashMap<>();
+ properties3.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
+ // No explicit changelog image setting - defaults to FULL
+ TableDescriptor tableDescriptor3 =
+ TableDescriptor.builder()
+ .schema(aggregateSchema)
+ .comment("aggregate merge engine table with default
changelog")
+ .properties(properties3)
+ .build();
+ admin.createTable(tablePath3, tableDescriptor3, false).get();
+ assertThat(admin.tableExists(tablePath3).get()).isTrue();
+
+ // Test 4: WAL + no merge engine should be allowed
+ TablePath tablePath4 = TablePath.of("fluss",
"test_wal_no_merge_engine_allowed");
+ Map<String, String> properties4 = new HashMap<>();
+ properties4.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL");
+ // No merge engine setting
+ TableDescriptor tableDescriptor4 =
+ TableDescriptor.builder()
+ .schema(DEFAULT_SCHEMA)
+ .comment("primary key table with WAL changelog and no
merge engine")
+ .properties(properties4)
+ .build();
+ admin.createTable(tablePath4, tableDescriptor4, false).get();
+ assertThat(admin.tableExists(tablePath4).get()).isTrue();
+
+ // Test 5: FIRST_ROW + WAL should be allowed
+ TablePath tablePath5 = TablePath.of("fluss",
"test_first_row_wal_changelog_allowed");
+ Map<String, String> properties5 = new HashMap<>();
+ properties5.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row");
+ properties5.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL");
+ TableDescriptor tableDescriptor5 =
+ TableDescriptor.builder()
+ .schema(DEFAULT_SCHEMA)
+ .comment("first_row merge engine table with WAL
changelog")
+ .properties(properties5)
+ .build();
+ admin.createTable(tablePath5, tableDescriptor5, false).get();
+ assertThat(admin.tableExists(tablePath5).get()).isTrue();
+
+ // Test 6: VERSIONED + WAL should be allowed
+ TablePath tablePath6 = TablePath.of("fluss",
"test_versioned_wal_changelog_allowed");
+ Map<String, String> properties6 = new HashMap<>();
+ properties6.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned");
+ properties6.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(),
"age");
+ properties6.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL");
+ TableDescriptor tableDescriptor6 =
+ TableDescriptor.builder()
+ .schema(DEFAULT_SCHEMA)
+ .comment("versioned merge engine table with WAL
changelog")
+ .properties(properties6)
+ .build();
+ admin.createTable(tablePath6, tableDescriptor6, false).get();
+ assertThat(admin.tableExists(tablePath6).get()).isTrue();
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 96282c1b6..760ad50ae 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.TooManyBucketsException;
import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.ChangelogImage;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.LogFormat;
@@ -302,6 +303,16 @@ public class TableDescriptorValidation {
versionColumn.get(), columnType));
}
} else if (mergeEngine == MergeEngineType.AGGREGATION) {
+ // Check aggregate merge engine with WAL changelog image
+ ChangelogImage changelogImage =
tableConf.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
+ if (changelogImage == ChangelogImage.WAL) {
+ throw new InvalidConfigException(
+ String.format(
+ "Table with 'AGGREGATION' merge engine
does not support 'WAL' changelog image mode. "
+ + "Aggregation merge engine tables
require FULL changelog image mode "
+ + "for correct UNDO recovery.
Please set '%s' to 'FULL' or remove the setting.",
+
ConfigOptions.TABLE_CHANGELOG_IMAGE.key()));
+ }
// Validate aggregation function parameters for aggregation
merge engine
validateAggregationFunctionParameters(schema);
}