This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit c8dc2405a15cec52f65dc3fb5cd520140be36d33 Author: Yang Wang <[email protected]> AuthorDate: Wed Feb 11 00:13:39 2026 +0800 [kv] Forbid aggregate merge engine with WAL changelog image (#2617) --- .../fluss/client/admin/FlussAdminITCase.java | 106 +++++++++++++++++++++ .../server/utils/TableDescriptorValidation.java | 11 +++ 2 files changed, 117 insertions(+) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index f2c23f2ae..063fad65b 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -1710,4 +1710,110 @@ class FlussAdminITCase extends ClientToServerITCaseBase { // 11. Cleanup admin.dropTable(tablePath, false).get(); } + + /** + * Test that aggregate merge engine tables cannot use WAL changelog image mode. + * + * <p>Validates: Requirements 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 2.1, 2.2 + */ + @Test + void testCreateTableWithAggregateEngineAndWalChangelog() throws Exception { + // Schema for aggregate merge engine tables + Schema aggregateSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("count", DataTypes.BIGINT(), AggFunctions.SUM()) + .primaryKey("id") + .build(); + + // Test 1: Aggregate + WAL should be rejected + TablePath tablePath1 = TablePath.of("fluss", "test_aggregate_wal_changelog_rejected"); + Map<String, String> properties1 = new HashMap<>(); + properties1.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation"); + properties1.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL"); + TableDescriptor tableDescriptor1 = + TableDescriptor.builder() + .schema(aggregateSchema) + .comment("aggregate merge engine table with WAL changelog") + .properties(properties1) + .build(); + assertThatThrownBy(() -> admin.createTable(tablePath1, tableDescriptor1, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining( + "Table with 'AGGREGATION' merge engine does not support 'WAL' changelog image mode. " + + "Aggregation merge engine tables require FULL changelog image mode for correct UNDO recovery. " + + "Please set 'table.changelog.image' to 'FULL' or remove the setting."); + + // Test 2: Aggregate + FULL should be allowed + TablePath tablePath2 = TablePath.of("fluss", "test_aggregate_full_changelog_allowed"); + Map<String, String> properties2 = new HashMap<>(); + properties2.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation"); + properties2.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "FULL"); + TableDescriptor tableDescriptor2 = + TableDescriptor.builder() + .schema(aggregateSchema) + .comment("aggregate merge engine table with FULL changelog") + .properties(properties2) + .build(); + admin.createTable(tablePath2, tableDescriptor2, false).get(); + assertThat(admin.tableExists(tablePath2).get()).isTrue(); + + // Test 3: Aggregate + default (no explicit changelog image) should be allowed + TablePath tablePath3 = TablePath.of("fluss", "test_aggregate_default_changelog_allowed"); + Map<String, String> properties3 = new HashMap<>(); + properties3.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation"); + // No explicit changelog image setting - defaults to FULL + TableDescriptor tableDescriptor3 = + TableDescriptor.builder() + .schema(aggregateSchema) + .comment("aggregate merge engine table with default changelog") + .properties(properties3) + .build(); + admin.createTable(tablePath3, tableDescriptor3, false).get(); + assertThat(admin.tableExists(tablePath3).get()).isTrue(); + + // Test 4: WAL + no merge engine should be allowed + TablePath tablePath4 = TablePath.of("fluss", "test_wal_no_merge_engine_allowed"); + Map<String, String> properties4 = new HashMap<>(); + properties4.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL"); + // No merge engine setting + TableDescriptor tableDescriptor4 = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("primary key table with WAL changelog and no merge engine") + .properties(properties4) + .build(); + admin.createTable(tablePath4, tableDescriptor4, false).get(); + assertThat(admin.tableExists(tablePath4).get()).isTrue(); + + // Test 5: FIRST_ROW + WAL should be allowed + TablePath tablePath5 = TablePath.of("fluss", "test_first_row_wal_changelog_allowed"); + Map<String, String> properties5 = new HashMap<>(); + properties5.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row"); + properties5.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL"); + TableDescriptor tableDescriptor5 = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("first_row merge engine table with WAL changelog") + .properties(properties5) + .build(); + admin.createTable(tablePath5, tableDescriptor5, false).get(); + assertThat(admin.tableExists(tablePath5).get()).isTrue(); + + // Test 6: VERSIONED + WAL should be allowed + TablePath tablePath6 = TablePath.of("fluss", "test_versioned_wal_changelog_allowed"); + Map<String, String> properties6 = new HashMap<>(); + properties6.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned"); + properties6.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(), "age"); + properties6.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL"); + TableDescriptor tableDescriptor6 = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("versioned merge engine table with WAL changelog") + .properties(properties6) + .build(); + admin.createTable(tablePath6, tableDescriptor6, false).get(); + assertThat(admin.tableExists(tablePath6).get()).isTrue(); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 96282c1b6..760ad50ae 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.TooManyBucketsException; import org.apache.fluss.metadata.AggFunction; +import org.apache.fluss.metadata.ChangelogImage; import org.apache.fluss.metadata.DeleteBehavior; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.LogFormat; @@ -302,6 +303,16 @@ public class TableDescriptorValidation { versionColumn.get(), columnType)); } } else if (mergeEngine == MergeEngineType.AGGREGATION) { + // Check aggregate merge engine with WAL changelog image + ChangelogImage changelogImage = tableConf.get(ConfigOptions.TABLE_CHANGELOG_IMAGE); + if (changelogImage == ChangelogImage.WAL) { + throw new InvalidConfigException( + String.format( + "Table with 'AGGREGATION' merge engine does not support 'WAL' changelog image mode. " + + "Aggregation merge engine tables require FULL changelog image mode " + + "for correct UNDO recovery. Please set '%s' to 'FULL' or remove the setting.", + ConfigOptions.TABLE_CHANGELOG_IMAGE.key())); + } // Validate aggregation function parameters for aggregation merge engine validateAggregationFunctionParameters(schema); }
