Copilot commented on code in PR #2924:
URL: https://github.com/apache/fluss/pull/2924#discussion_r2986462721
##########
fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java:
##########
@@ -177,4 +182,142 @@ void
testTableWithExplicitDatalakeFormatCanEnableDatalake() throws Exception {
TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get();
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
}
+
+ @Test
+ void testCannotEnableTableWhenTableFormatDiffersFromClusterFormat() throws
Exception {
+ String databaseName = "test_db";
+ String tableName = "test_table_format_mismatch";
+ TablePath tablePath = TablePath.of(databaseName, tableName);
+
+ admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY,
true).get();
+ admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(), null,
AlterConfigOpType.SET)))
Review Comment:
These tests mutate cluster dynamic configs, but the FlussClusterExtension is
`static` and shared across test methods. Here you only unset `datalake.format`;
if a previous test left `datalake.enabled=true`, this config change would be
invalid (enabled=true requires format) and the test becomes
order-dependent/flaky. Ensure each test (or a `@BeforeEach`) resets both
`datalake.enabled` and `datalake.format` to a known baseline before proceeding.
```suggestion
Arrays.asList(
new AlterConfig(
DATALAKE_FORMAT.key(), null,
AlterConfigOpType.SET),
new AlterConfig(
DATALAKE_ENABLED.key(), "false",
AlterConfigOpType.SET)))
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -63,17 +66,37 @@ public void validate(Configuration newConfig) throws
ConfigException {
newConfig.getOptional(DATALAKE_FORMAT).isPresent()
? newConfig.get(DATALAKE_FORMAT)
: currentConfiguration.get(DATALAKE_FORMAT);
+
+ // TODO: validate(...) only sees the merged effective cluster config,
so it cannot
+ // detect the case where a user enables datalake.enabled and unsets
+ // datalake.format in the same dynamic config change. This may leave
the cluster
+ // with datalake.enabled set but no datalake.format. Fixing this
likely requires
+ // extending the validate/reconfigure framework to expose the
incremental change
+ // set, rather than only the merged result. We accept this for now
because
+ // table-level enablement is still validated, and enabling datalake
for a table
+ // will fail if datalake.format is not configured.
Review Comment:
`validate()` derives `newDatalakeFormat` by falling back to
`currentConfiguration.get(DATALAKE_FORMAT)` when the key is absent in
`newConfig`. Since `newConfig` is already the merged effective config, this
fallback can mis-handle updates that remove `datalake.format` (e.g., it may
still treat the old format as effective), and it can prevent the intended
"datalake.enabled=true requires datalake.format" validation from triggering
with a clear error message. Consider deriving the effective format solely from
`newConfig` (e.g., `newConfig.getOptional(DATALAKE_FORMAT).orElse(null)`) and
basing the required-format check on that value.
```suggestion
newConfig.getOptional(DATALAKE_FORMAT).orElse(null);
// validate(...) sees the merged effective cluster config, so if a
user enables
// datalake.enabled and unsets datalake.format in the same dynamic
config change,
// newDatalakeFormat will be null and the required-format check
below will trigger.
```
##########
fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java:
##########
@@ -177,4 +182,142 @@ void
testTableWithExplicitDatalakeFormatCanEnableDatalake() throws Exception {
TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get();
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
}
+
+ @Test
+ void testCannotEnableTableWhenTableFormatDiffersFromClusterFormat() throws
Exception {
+ String databaseName = "test_db";
+ String tableName = "test_table_format_mismatch";
+ TablePath tablePath = TablePath.of(databaseName, tableName);
+
+ admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY,
true).get();
+ admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(), null,
AlterConfigOpType.SET)))
+ .get();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .build())
+ .distributedBy(3, "c1")
+ .property(ConfigOptions.TABLE_DATALAKE_FORMAT,
DataLakeFormat.ICEBERG)
+ .build();
+ admin.createTable(tablePath, tableDescriptor, false).get();
+
+ admin.alterClusterConfigs(
+ Arrays.asList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(),
+ DataLakeFormat.PAIMON.toString(),
+ AlterConfigOpType.SET),
+ new AlterConfig(
+ DATALAKE_ENABLED.key(), "true",
AlterConfigOpType.SET)))
+ .get();
+
+ List<TableChange> enableDatalakeChange =
+
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(),
"true"));
+ assertThatThrownBy(() -> admin.alterTable(tablePath,
enableDatalakeChange, false).get())
+ .cause()
+ .isInstanceOf(InvalidConfigException.class)
+ .hasMessageContaining("'table.datalake.format' ('iceberg')")
+ .hasMessageContaining("cluster 'datalake.format' ('paimon')");
+ }
+
+ @Test
+ void testEnableTableAfterClusterEnablesDataLake() throws Exception {
+ String databaseName = "test_db";
+ String tableName = "test_enable_datalake_table";
+ TablePath tablePath = TablePath.of(databaseName, tableName);
+
+ admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY,
true).get();
+ admin.alterClusterConfigs(
+ Arrays.asList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(),
+ DataLakeFormat.PAIMON.toString(),
+ AlterConfigOpType.SET),
+ new AlterConfig(
+ DATALAKE_ENABLED.key(), "false",
AlterConfigOpType.SET)))
+ .get();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .build())
+ .distributedBy(3, "c1")
+ .build();
+ admin.createTable(tablePath, tableDescriptor, false).get();
+
+ TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+
assertThat(tableInfo.getTableConfig().getDataLakeFormat()).hasValue(DataLakeFormat.PAIMON);
+ assertThat(tableInfo.getTableConfig().isDataLakeEnabled()).isFalse();
+
+ List<TableChange> enableDatalakeChange =
+
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(),
"true"));
+ assertThatThrownBy(() -> admin.alterTable(tablePath,
enableDatalakeChange, false).get())
+ .cause()
+ .isInstanceOf(InvalidAlterTableException.class)
+ .hasMessageContaining("doesn't enable datalake tables");
+
+ admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ DATALAKE_ENABLED.key(), "true",
AlterConfigOpType.SET)))
+ .get();
+ admin.alterTable(tablePath, enableDatalakeChange, false).get();
+
+ TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get();
+
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
+ assertThat(updatedTableInfo.getTableConfig().getDataLakeFormat())
+ .hasValue(DataLakeFormat.PAIMON);
+ }
+
+ @Test
+ void testLegacyClusterCanStillEnableTableLevelDatalake() throws Exception {
+ String databaseName = "test_db_legacy_enable";
+ String tableName = "test_table_legacy_enable";
+ TablePath tablePath = TablePath.of(databaseName, tableName);
+
+ admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY,
true).get();
+ admin.alterClusterConfigs(
+ Collections.singletonList(
+ // not set DATALAKE_ENABLED to mock legacy
cluster
+ new AlterConfig(
Review Comment:
This block says it is "not set DATALAKE_ENABLED to mock legacy cluster", but
because the cluster extension is shared across test methods, `datalake.enabled`
may already be set by a prior test. To actually test legacy behavior reliably,
explicitly unset `datalake.enabled` (and set/unset `datalake.format` as needed)
as part of this test's setup.
```suggestion
Arrays.asList(
// explicitly unset DATALAKE_ENABLED to mock
legacy cluster
new AlterConfig(
DATALAKE_ENABLED.key(),
null,
AlterConfigOpType.DELETE),
new AlterConfig(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]