This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 079b78a3e [lake] Allow enabling lakehouse for log tables without
bucket key created before cluster-level lakehouse is enabled (#2973)
079b78a3e is described below
commit 079b78a3e6176e048abe77b70bc1a2d69178c1a8
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Apr 2 09:50:38 2026 +0800
[lake] Allow enabling lakehouse for log tables without bucket key created
before cluster-level lakehouse is enabled (#2973)
---
.../fluss/client/table/LakeEnableTableITCase.java | 59 ++++++++++++++++++++++
.../fluss/server/coordinator/MetadataManager.java | 21 ++++++++
.../server/utils/TableDescriptorValidation.java | 16 ++++++
3 files changed, 96 insertions(+)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
index f4488774e..1b85a441f 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
@@ -111,6 +111,65 @@ class LakeEnableTableITCase extends
ClientToServerITCaseBase {
"The following options cannot be altered for tables
that were created before the Fluss cluster enabled datalake:
'table.datalake.enabled'.");
}
+ @Test
+ void
testCanEnableDatalakeForLogTableWithoutBucketKeyCreatedBeforeClusterEnabledDatalake()
+ throws Exception {
+ String databaseName = "test_db";
+ String tableName =
+
"test_log_table_without_bucket_key_created_before_cluster_enabled_datalake";
+ TablePath tablePath = TablePath.of(databaseName, tableName);
+
+ admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(), null,
AlterConfigOpType.SET)))
+ .get();
+ assertThat(
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getDataLakeFormat())
+ .isEqualTo(null);
+
+ admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY,
true).get();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .build())
+ .distributedBy(3)
+ .build();
+ admin.createTable(tablePath, tableDescriptor, false).get();
+
+ // enable cluster with datalake
+ admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(),
+ DataLakeFormat.PAIMON.toString(),
+ AlterConfigOpType.SET)))
+ .get();
+ assertThat(
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getDataLakeFormat())
+ .isEqualTo(DataLakeFormat.PAIMON);
+
+ List<TableChange> enableDatalakeChange =
+
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(),
"true"));
+ // alter table to enable datalake
+ admin.alterTable(tablePath, enableDatalakeChange, false).get();
+
+ TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get();
+
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
+ assertThat(updatedTableInfo.getTableConfig().getDataLakeFormat())
+ .contains(DataLakeFormat.PAIMON);
+ }
+
@Test
void testTableWithExplicitDatalakeFormatCanEnableDatalake() throws
Exception {
String databaseName = "test_db";
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index cc2bb5b70..b1efa0232 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -35,6 +35,7 @@ import
org.apache.fluss.exception.TableNotPartitionedException;
import org.apache.fluss.exception.TooManyBucketsException;
import org.apache.fluss.exception.TooManyPartitionsException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.DatabaseSummary;
@@ -513,6 +514,26 @@ public class MetadataManager {
getUpdatedTableDescriptor(tableDescriptor,
tablePropertyChanges);
if (newDescriptor != null) {
+ // is to enable datalake for the table
+ if (isDataLakeEnabled(newDescriptor) &&
!isDataLakeEnabled(tableDescriptor)) {
+ // The table was created before cluster-level datalake was
enabled.
+ // Backfill `table.datalake.format` before enabling
datalake on the table
+ // so the updated table metadata stays consistent with the
cluster setting.
+ if
(!tableInfo.getTableConfig().getDataLakeFormat().isPresent()) {
+ DataLakeFormat dataLakeFormat =
+ lakeCatalogDynamicLoader
+ .getLakeCatalogContainer()
+ .getDataLakeFormat();
+ if (dataLakeFormat == null) {
+ throw new InvalidAlterTableException(
+ "Cannot alter table "
+ + tablePath
+ + " in data lake, because the
Fluss cluster doesn't enable datalake tables.");
+ }
+ newDescriptor =
newDescriptor.withDataLakeFormat(dataLakeFormat);
+ }
+ }
+
// reuse the same validate logic with the createTable() method
validateTableDescriptor(newDescriptor, maxBucketNum);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 14e4330d7..5f0d6416c 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -143,6 +143,22 @@ public class TableDescriptorValidation {
.filter(k -> k.startsWith("table.datalake."))
.collect(Collectors.toList());
if (!datalakeKeys.isEmpty()) {
+ // Allow log tables without bucket keys to enable datalake
even when
+ // `table.datalake.format` was not recorded at creation time,
because bucket
+ // distribution does not need to stay aligned with the lake
format in this case.
+ boolean alterLegacyLogTableWithoutBucketKey =
+ !currentTable.hasPrimaryKey()
+ && !currentTable.hasBucketKey()
+ && datalakeKeys.stream()
+ .allMatch(
+ k ->
+ k.equals(
+
ConfigOptions.TABLE_DATALAKE_ENABLED
+
.key()));
+ if (alterLegacyLogTableWithoutBucketKey) {
+ return;
+ }
+
throw new InvalidAlterTableException(
String.format(
"The following options cannot be altered for
tables that were"