This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a4cc966b1 [server] Not permitted to enable datalake for tables created
prior to the cluster being set to datalake format (#2100)
a4cc966b1 is described below
commit a4cc966b19479ebae17819764e184b26df297067
Author: Liebing <[email protected]>
AuthorDate: Wed Jan 7 11:49:04 2026 +0800
[server] Not permitted to enable datalake for tables created prior to the
cluster being set to datalake format (#2100)
---
.../fluss/client/table/LakeEnableTableITCase.java | 181 +++++++++++++++++++++
.../server/utils/TableDescriptorValidation.java | 11 +-
2 files changed, 191 insertions(+), 1 deletion(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
new file mode 100644
index 000000000..d1d663d1b
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.cluster.AlterConfig;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
+import org.apache.fluss.exception.InvalidAlterTableException;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for lake enable table. */
+class LakeEnableTableITCase extends ClientToServerITCaseBase {
+
+ @Test
+ void testCannotEnableDatalakeForTableCreatedBeforeClusterEnabledDatalake()
throws Exception {
+ String databaseName = "test_db";
+ String tableName = "test_table_before_datalake";
+ TablePath tablePath = TablePath.of(databaseName, tableName);
+
+ // Disable datalake format for the cluster
+ admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(), null,
AlterConfigOpType.SET)))
+ .get();
+ // Verify cluster now has no datalake format enabled
+ assertThat(
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getDataLakeFormat())
+ .isEqualTo(null);
+
+ // Create database
+ admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY,
true).get();
+
+ // Create table before cluster enables datalake format
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .build())
+ .distributedBy(3, "c1")
+ .build();
+ admin.createTable(tablePath, tableDescriptor, false).get();
+
+ // Verify table was created without datalake format
+ TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+
assertThat(tableInfo.getTableConfig().getDataLakeFormat().isPresent()).isFalse();
+ assertThat(tableInfo.getTableConfig().isDataLakeEnabled()).isFalse();
+
+ // Enable datalake format for the cluster
+ admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(),
+ DataLakeFormat.PAIMON.toString(),
+ AlterConfigOpType.SET)))
+ .get();
+ // Verify cluster now has datalake format enabled
+ assertThat(
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getDataLakeFormat())
+ .isEqualTo(DataLakeFormat.PAIMON);
+
+ // Try to enable datalake for the table created before cluster enabled
datalake
+ // This should fail with InvalidAlterTableException
+ List<TableChange> enableDatalakeChange =
+
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(),
"true"));
+ assertThatThrownBy(() -> admin.alterTable(tablePath,
enableDatalakeChange, false).get())
+ .cause()
+ .isInstanceOf(InvalidAlterTableException.class)
+ .hasMessageContaining(
+ "The option 'table.datalake.enabled' cannot be altered
for tables that were"
+ + " created before the Fluss cluster enabled
datalake.");
+ }
+
+ @Test
+ void testTableWithExplicitDatalakeFormatCanEnableDatalake() throws
Exception {
+ String databaseName = "test_db";
+ String tableName = "test_table_explicit_format";
+ TablePath tablePath = TablePath.of(databaseName, tableName);
+
+ // Create database
+ admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY,
true).get();
+
+ // Disable datalake format for the cluster
+ admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(), null,
AlterConfigOpType.SET)))
+ .get();
+ // Verify cluster now has no datalake format enabled
+ assertThat(
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getDataLakeFormat())
+ .isEqualTo(null);
+
+ // Create table with explicit datalake format set (even though cluster
doesn't set it)
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .build())
+ .distributedBy(3, "c1")
+ .property(ConfigOptions.TABLE_DATALAKE_FORMAT,
DataLakeFormat.PAIMON)
+ .build();
+ admin.createTable(tablePath, tableDescriptor, false).get();
+ // Verify table has datalake format
+ TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+
assertThat(tableInfo.getTableConfig().getDataLakeFormat().isPresent()).isTrue();
+ assertThat(tableInfo.getTableConfig().getDataLakeFormat().get())
+ .isEqualTo(DataLakeFormat.PAIMON);
+
+ // Enable datalake format for the cluster
+ admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(),
+ DataLakeFormat.PAIMON.toString(),
+ AlterConfigOpType.SET)))
+ .get();
+ // Verify cluster now has datalake format enabled
+ assertThat(
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getDataLakeFormat())
+ .isEqualTo(DataLakeFormat.PAIMON);
+
+ // Enable datalake for the table - this should succeed although the
table was created
+ // before cluster enabled datalake, because the table has explicit
datalake format set
+ List<TableChange> enableDatalakeChange =
+
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(),
"true"));
+ admin.alterTable(tablePath, enableDatalakeChange, false).get();
+ // Verify datalake is now enabled
+ TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get();
+
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 45065ca41..8aedc194b 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -115,15 +115,24 @@ public class TableDescriptorValidation {
public static void validateAlterTableProperties(
TableInfo currentTable, Set<String> tableKeysToChange, Set<String>
customKeysToChange) {
+ TableConfig currentConfig = currentTable.getTableConfig();
tableKeysToChange.forEach(
k -> {
if (isTableStorageConfig(k) && !isAlterableTableOption(k))
{
throw new InvalidAlterTableException(
"The option '" + k + "' is not supported to
alter yet.");
}
+
+ if (!currentConfig.getDataLakeFormat().isPresent()
+ &&
ConfigOptions.TABLE_DATALAKE_ENABLED.key().equals(k)) {
+ throw new InvalidAlterTableException(
+ String.format(
+ "The option '%s' cannot be altered for
tables that were"
+ + " created before the Fluss
cluster enabled datalake.",
+
ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
+ }
});
- TableConfig currentConfig = currentTable.getTableConfig();
if (currentConfig.isDataLakeEnabled() &&
currentConfig.getDataLakeFormat().isPresent()) {
String format = currentConfig.getDataLakeFormat().get().toString();
customKeysToChange.forEach(