This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 8dfd6956b [server] Support alter "table.datalake.enabled" option 
(#1753)
8dfd6956b is described below

commit 8dfd6956b9ca53f2a0ffc070c5a830c1ed6c3909
Author: Yang Guo <[email protected]>
AuthorDate: Sun Sep 28 22:06:37 2025 +0800

    [server] Support alter "table.datalake.enabled" option (#1753)
    
    ]
---
 .../org/apache/fluss/config/FlussConfigUtils.java  |   9 +-
 .../lake/paimon/LakeEnabledTableCreateITCase.java  |  67 +++++++++
 .../testutils/FlinkPaimonTieringTestBase.java      |  12 +-
 .../lake/paimon/tiering/PaimonTieringITCase.java   | 161 ++++++++++++++++++++-
 .../server/coordinator/CoordinatorService.java     |  11 +-
 .../fluss/server/coordinator/MetadataManager.java  |  96 +++++++++++-
 .../server/utils/TableDescriptorValidation.java    |   3 +-
 7 files changed, 346 insertions(+), 13 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index 74bb3012f..fa9c4274c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -36,18 +36,23 @@ public class FlussConfigUtils {
     public static final String CLIENT_PREFIX = "client.";
     public static final String CLIENT_SECURITY_PREFIX = "client.security.";
 
-    public static final List<String> ALTERABLE_TABLE_CONFIG;
+    public static final List<String> ALTERABLE_TABLE_OPTIONS;
 
     static {
         TABLE_OPTIONS = extractConfigOptions("table.");
         CLIENT_OPTIONS = extractConfigOptions("client.");
-        ALTERABLE_TABLE_CONFIG = Collections.emptyList();
+        ALTERABLE_TABLE_OPTIONS =
+                
Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
     }
 
     public static boolean isTableStorageConfig(String key) {
         return key.startsWith(TABLE_PREFIX);
     }
 
+    public static boolean isAlterableTableOption(String key) {
+        return ALTERABLE_TABLE_OPTIONS.contains(key);
+    }
+
     @VisibleForTesting
     static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
         Map<String, ConfigOption<?>> options = new HashMap<>();
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 1be24772d..5e97a635a 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -25,6 +25,7 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -48,7 +49,9 @@ import javax.annotation.Nullable;
 
 import java.nio.file.Files;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
@@ -364,6 +367,70 @@ class LakeEnabledTableCreateITCase {
                 BUCKET_NUM);
     }
 
+    @Test
+    void testAlterLakeEnabledLogTable() throws Exception {
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("k1", "v1");
+        customProperties.put("paimon.file.format", "parquet");
+
+        // log table with lake disabled
+        TableDescriptor logTable =
+                TableDescriptor.builder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("log_c1", DataTypes.INT())
+                                        .column("log_c2", DataTypes.STRING())
+                                        .build())
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
+                        .customProperties(customProperties)
+                        .distributedBy(BUCKET_NUM, "log_c1", "log_c2")
+                        .build();
+        TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter");
+        admin.createTable(logTablePath, logTable, false).get();
+
+        assertThatThrownBy(
+                        () ->
+                                paimonCatalog.getTable(
+                                        Identifier.create(DATABASE, 
logTablePath.getTableName())))
+                .isInstanceOf(Catalog.TableNotExistException.class);
+
+        // enable lake
+        TableChange.SetOption enableLake =
+                TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true");
+        List<TableChange> changes = Collections.singletonList(enableLake);
+
+        admin.alterTable(logTablePath, changes, false).get();
+
+        Table enabledPaimonLogTable =
+                paimonCatalog.getTable(Identifier.create(DATABASE, 
logTablePath.getTableName()));
+
+        Map<String, String> updatedProperties = new HashMap<>();
+        updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true");
+        TableDescriptor updatedLogTable = 
logTable.withProperties(updatedProperties);
+        // check the gotten log table
+        verifyPaimonTable(
+                enabledPaimonLogTable,
+                updatedLogTable,
+                RowType.of(
+                        new DataType[] {
+                            org.apache.paimon.types.DataTypes.INT(),
+                            org.apache.paimon.types.DataTypes.STRING(),
+                            // for __bucket, __offset, __timestamp
+                            org.apache.paimon.types.DataTypes.INT(),
+                            org.apache.paimon.types.DataTypes.BIGINT(),
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                        },
+                        new String[] {
+                            "log_c1",
+                            "log_c2",
+                            BUCKET_COLUMN_NAME,
+                            OFFSET_COLUMN_NAME,
+                            TIMESTAMP_COLUMN_NAME
+                        }),
+                "log_c1,log_c2",
+                BUCKET_NUM);
+    }
+
     @Test
     void testThrowExceptionWhenConflictWithSystemColumn() {
         for (String systemColumn :
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 995b2db46..2580370d8 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -256,10 +256,16 @@ public abstract class FlinkPaimonTieringTestBase {
     }
 
     protected long createLogTable(TablePath tablePath, int bucketNum) throws 
Exception {
-        return createLogTable(tablePath, bucketNum, false);
+        return createLogTable(
+                tablePath, bucketNum, false, Collections.emptyMap(), 
Collections.emptyMap());
     }
 
-    protected long createLogTable(TablePath tablePath, int bucketNum, boolean 
isPartitioned)
+    protected long createLogTable(
+            TablePath tablePath,
+            int bucketNum,
+            boolean isPartitioned,
+            Map<String, String> properties,
+            Map<String, String> customProperties)
             throws Exception {
         Schema.Builder schemaBuilder =
                 Schema.newBuilder().column("a", DataTypes.INT()).column("b", 
DataTypes.STRING());
@@ -277,6 +283,8 @@ public abstract class FlinkPaimonTieringTestBase {
             tableBuilder.property(
                     ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
         }
+        tableBuilder.properties(properties);
+        tableBuilder.customProperties(customProperties);
         tableBuilder.schema(schemaBuilder.build());
         return createTable(tablePath, tableBuilder.build());
     }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 442962f31..b4990f198 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -22,6 +22,7 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
@@ -175,10 +176,7 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
                         {
                             put(
                                     FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
-                                    "["
-                                            + 
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
-                                            + 
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
-                                            + "]");
+                                    getPartitionOffsetStr(partitionNameByIds));
                         }
                     };
             checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
@@ -187,6 +185,150 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
         }
     }
 
+    @Test
+    void testTieringForAlterTable() throws Exception {
+        TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableAlter");
+        Map<String, String> tableProperties = new HashMap<>();
+        tableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"false");
+
+        long t1Id = createPkTable(t1, 1, tableProperties, 
Collections.emptyMap());
+
+        TableChange.SetOption setOption =
+                TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true");
+        List<TableChange> changes = Collections.singletonList(setOption);
+        admin.alterTable(t1, changes, false).get();
+
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+        // write records
+        List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+        writeRows(t1, rows, false);
+        waitUntilSnapshot(t1Id, 1, 0);
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        try {
+            // check the status of replica after synced
+            assertReplicaStatus(t1Bucket, 3);
+            // check data in paimon
+            checkDataInPaimonPrimaryKeyTable(t1, rows);
+            // check snapshot property in paimon
+            Map<String, String> properties =
+                    new HashMap<String, String>() {
+                        {
+                            put(
+                                    FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                    "[{\"bucket_id\":0,\"log_offset\":3}]");
+                        }
+                    };
+            checkSnapshotPropertyInPaimon(t1, properties);
+
+            // then, create another log table
+            TablePath t2 = TablePath.of(DEFAULT_DB, "logTableAlter");
+
+            Map<String, String> logTableProperties = new HashMap<>();
+            logTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"false");
+            long t2Id = createLogTable(t2, 1, false, logTableProperties, 
Collections.emptyMap());
+            // enable lake
+            admin.alterTable(t2, changes, false).get();
+
+            TableBucket t2Bucket = new TableBucket(t2Id, 0);
+            List<InternalRow> flussRows = new ArrayList<>();
+            // write records
+            for (int i = 0; i < 10; i++) {
+                rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
+                flussRows.addAll(rows);
+                // write records
+                writeRows(t2, rows, true);
+            }
+            // check the status of replica after synced;
+            // note: we can't update log start offset for unaware bucket mode 
log table
+            assertReplicaStatus(t2Bucket, 30);
+
+            // check data in paimon
+            checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
+
+            // then write data to the pk tables
+            // write records
+            rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, 
"v333"));
+            // write records
+            writeRows(t1, rows, false);
+
+            // check the status of replica of t2 after synced
+            // not check start offset since we won't
+            // update start log offset for primary key table
+            assertReplicaStatus(t1Bucket, 9);
+
+            checkDataInPaimonPrimaryKeyTable(t1, rows);
+
+            // then create partitioned table and wait partitions are ready
+            TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, 
"partitionedTableAlter");
+            Map<String, String> partitionTableProperties = new HashMap<>();
+            
partitionTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"false");
+
+            Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
+                    createPartitionedTable(
+                            partitionedTablePath, partitionTableProperties, 
Collections.emptyMap());
+
+            admin.alterTable(partitionedTablePath, changes, false).get();
+
+            Map<Long, String> partitionNameByIds = 
waitUntilPartitions(partitionedTablePath);
+
+            // now, write rows into partitioned table
+            TableDescriptor partitionedTableDescriptor = 
tableIdAndDescriptor.f1;
+            Map<String, List<InternalRow>> writtenRowsByPartition =
+                    writeRowsIntoPartitionedTable(
+                            partitionedTablePath, partitionedTableDescriptor, 
partitionNameByIds);
+            long tableId = tableIdAndDescriptor.f0;
+
+            // wait until synced to paimon
+            for (Long partitionId : partitionNameByIds.keySet()) {
+                TableBucket tableBucket = new TableBucket(tableId, 
partitionId, 0);
+                assertReplicaStatus(tableBucket, 3);
+            }
+
+            // now, let's check data in paimon per partition
+            // check data in paimon
+            String partitionCol = 
partitionedTableDescriptor.getPartitionKeys().get(0);
+            for (String partitionName : partitionNameByIds.values()) {
+                checkDataInPaimonAppendOnlyPartitionedTable(
+                        partitionedTablePath,
+                        Collections.singletonMap(partitionCol, partitionName),
+                        writtenRowsByPartition.get(partitionName),
+                        0);
+            }
+
+            properties =
+                    new HashMap<String, String>() {
+                        {
+                            put(
+                                    FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                    getPartitionOffsetStr(partitionNameByIds));
+                        }
+                    };
+            checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
+    private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds) 
{
+        String raw =
+                
"{\"partition_id\":%s,\"bucket_id\":0,\"partition_name\":\"date=%s\",\"log_offset\":3}";
+        List<Long> partitionIds = new ArrayList<>(partitionNameByIds.keySet());
+        Collections.sort(partitionIds);
+        List<String> partitionOffsetStrs = new ArrayList<>();
+
+        for (Long partitionId : partitionIds) {
+            String partitionName = partitionNameByIds.get(partitionId);
+            String partitionOffsetStr = String.format(raw, partitionId, 
partitionName);
+            partitionOffsetStrs.add(partitionOffsetStr);
+        }
+
+        return "[" + String.join(",", partitionOffsetStrs) + "]";
+    }
+
     @Test
     void testTieringToDvEnabledTable() throws Exception {
         TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");
@@ -214,6 +356,15 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
 
     private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath 
partitionedTablePath)
             throws Exception {
+        return createPartitionedTable(
+                partitionedTablePath, Collections.emptyMap(), 
Collections.emptyMap());
+    }
+
+    private Tuple2<Long, TableDescriptor> createPartitionedTable(
+            TablePath partitionedTablePath,
+            Map<String, String> properties,
+            Map<String, String> customProperties)
+            throws Exception {
         TableDescriptor partitionedTableDescriptor =
                 TableDescriptor.builder()
                         .schema(
@@ -229,6 +380,8 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
                                 AutoPartitionTimeUnit.YEAR)
                         .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
                         .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500))
+                        .properties(properties)
+                        .customProperties(customProperties)
                         .build();
         return Tuple2.of(
                 createTable(partitionedTablePath, partitionedTableDescriptor),
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 8b0ffba40..5e0c5dc2a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -90,6 +90,7 @@ import 
org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
 import org.apache.fluss.server.coordinator.event.EventManager;
 import org.apache.fluss.server.entity.CommitKvSnapshotData;
 import org.apache.fluss.server.entity.LakeTieringTableInfo;
+import org.apache.fluss.server.entity.TablePropertyChanges;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde;
 import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
@@ -297,10 +298,16 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
             authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.table(tablePath));
         }
 
+        TablePropertyChanges tablePropertyChanges =
+                toTablePropertyChanges(request.getConfigChangesList());
+
         metadataManager.alterTableProperties(
                 tablePath,
-                toTablePropertyChanges(request.getConfigChangesList()),
-                request.isIgnoreIfNotExists());
+                tablePropertyChanges,
+                request.isIgnoreIfNotExists(),
+                lakeCatalog,
+                dataLakeFormat,
+                lakeTableTieringManager);
 
         return CompletableFuture.completedFuture(new 
AlterTablePropertiesResponse());
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index f65814c65..2056068a5 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -23,7 +23,9 @@ import 
org.apache.fluss.exception.DatabaseAlreadyExistException;
 import org.apache.fluss.exception.DatabaseNotEmptyException;
 import org.apache.fluss.exception.DatabaseNotExistException;
 import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidPartitionException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
 import org.apache.fluss.exception.PartitionAlreadyExistsException;
 import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.exception.SchemaNotExistException;
@@ -32,6 +34,8 @@ import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.exception.TableNotPartitionedException;
 import org.apache.fluss.exception.TooManyBucketsException;
 import org.apache.fluss.exception.TooManyPartitionsException;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
@@ -307,7 +311,10 @@ public class MetadataManager {
     public void alterTableProperties(
             TablePath tablePath,
             TablePropertyChanges tablePropertyChanges,
-            boolean ignoreIfNotExists) {
+            boolean ignoreIfNotExists,
+            @Nullable LakeCatalog lakeCatalog,
+            @Nullable DataLakeFormat dataLakeFormat,
+            LakeTableTieringManager lakeTableTieringManager) {
         try {
             // it throws TableNotExistException if the table or database not 
exists
             TableRegistration tableReg = getTableRegistration(tablePath);
@@ -328,12 +335,26 @@ public class MetadataManager {
 
             if (newDescriptor != null) {
                 // reuse the same validate logic with the createTable() method
-                validateTableDescriptor(tableDescriptor, maxBucketNum);
+                validateTableDescriptor(newDescriptor, maxBucketNum);
+
+                // pre alter table properties, e.g. create lake table in lake 
storage if it's to
+                // enable datalake for the table
+                preAlterTableProperties(
+                        tablePath, tableDescriptor, newDescriptor, 
lakeCatalog, dataLakeFormat);
                 // update the table to zk
                 TableRegistration updatedTableRegistration =
                         tableReg.newProperties(
                                 newDescriptor.getProperties(), 
newDescriptor.getCustomProperties());
                 zookeeperClient.updateTable(tablePath, 
updatedTableRegistration);
+
+                // post alter table properties, e.g. add the table to lake 
table tiering manager if
+                // it's to enable datalake for the table
+                postAlterTableProperties(
+                        tablePath,
+                        schemaInfo,
+                        tableDescriptor,
+                        updatedTableRegistration,
+                        lakeTableTieringManager);
             } else {
                 LOG.info(
                         "No properties changed when alter table {}, skip 
update table.", tablePath);
@@ -352,6 +373,66 @@ public class MetadataManager {
         }
     }
 
+    private void preAlterTableProperties(
+            TablePath tablePath,
+            TableDescriptor tableDescriptor,
+            TableDescriptor newDescriptor,
+            LakeCatalog lakeCatalog,
+            DataLakeFormat dataLakeFormat) {
+
+        boolean toEnableDataLake =
+                !isDataLakeEnabled(tableDescriptor) && 
isDataLakeEnabled(newDescriptor);
+
+        // enable lake table
+        if (toEnableDataLake) {
+            // TODO: should tolerate if the lake exist but matches our schema. 
This ensures
+            // eventually
+            //  consistent by idempotently creating the table multiple times. 
See #846
+            // before create table in fluss, we may create in lake
+            if (lakeCatalog == null) {
+                throw new InvalidAlterTableException(
+                        "Cannot alter table "
+                                + tablePath
+                                + " to enable data lake, because the Fluss 
cluster doesn't enable datalake tables.");
+            } else {
+                try {
+                    lakeCatalog.createTable(tablePath, newDescriptor);
+                } catch (TableAlreadyExistException e) {
+                    throw new LakeTableAlreadyExistException(
+                            String.format(
+                                    "The table %s already exists in %s 
catalog, please "
+                                            + "first drop the table in %s 
catalog or use a new table name.",
+                                    tablePath, dataLakeFormat, 
dataLakeFormat));
+                }
+            }
+        }
+        // more pre-alter actions can be added here
+    }
+
+    private void postAlterTableProperties(
+            TablePath tablePath,
+            SchemaInfo schemaInfo,
+            TableDescriptor oldTableDescriptor,
+            TableRegistration newTableRegistration,
+            LakeTableTieringManager lakeTableTieringManager) {
+
+        boolean toEnableDataLake =
+                !isDataLakeEnabled(oldTableDescriptor)
+                        && isDataLakeEnabled(newTableRegistration.properties);
+        boolean toDisableDataLake =
+                isDataLakeEnabled(oldTableDescriptor)
+                        && !isDataLakeEnabled(newTableRegistration.properties);
+
+        if (toEnableDataLake) {
+            TableInfo newTableInfo = 
newTableRegistration.toTableInfo(tablePath, schemaInfo);
+            // if the table is lake table, we need to add it to lake table 
tiering manager
+            lakeTableTieringManager.addNewLakeTable(newTableInfo);
+        } else if (toDisableDataLake) {
+            
lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId);
+        }
+        // more post-alter actions can be added here
+    }
+
     /**
      * Get a new TableDescriptor with updated properties.
      *
@@ -387,6 +468,17 @@ public class MetadataManager {
         }
     }
 
+    private boolean isDataLakeEnabled(TableDescriptor tableDescriptor) {
+        String dataLakeEnabledValue =
+                
tableDescriptor.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+        return Boolean.parseBoolean(dataLakeEnabledValue);
+    }
+
+    private boolean isDataLakeEnabled(Map<String, String> properties) {
+        String dataLakeEnabledValue = 
properties.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+        return Boolean.parseBoolean(dataLakeEnabledValue);
+    }
+
     public TableInfo getTable(TablePath tablePath) throws 
TableNotExistException {
         Optional<TableRegistration> optionalTable;
         try {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 8b56166d6..76e8adc82 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -47,6 +47,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
+import static org.apache.fluss.config.FlussConfigUtils.isAlterableTableOption;
 import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
 import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -108,7 +109,7 @@ public class TableDescriptorValidation {
             TableInfo currentTable, Set<String> tableKeysToChange, Set<String> 
customKeysToChange) {
         tableKeysToChange.forEach(
                 k -> {
-                    if (isTableStorageConfig(k)) {
+                    if (isTableStorageConfig(k) && !isAlterableTableOption(k)) 
{
                         throw new InvalidAlterTableException(
                                 "The option '" + k + "' is not supported to 
alter yet.");
                     }

Reply via email to