This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 86a283ad7 [server] Update tablet server metadata cache when 
TableRegistration changed (#2378)
86a283ad7 is described below

commit 86a283ad7feeafb8877108441beaa18e062b0b82
Author: Liebing <[email protected]>
AuthorDate: Sat Jan 24 19:52:06 2026 +0800

    [server] Update tablet server metadata cache when TableRegistration changed 
(#2378)
---
 .../java/org/apache/fluss/metadata/TableInfo.java  |  17 --
 .../lake/paimon/LakeEnabledTableCreateITCase.java  |  36 ++++
 .../coordinator/CoordinatorEventProcessor.java     |  62 +++++++
 .../coordinator/CoordinatorRequestBatch.java       |   3 +-
 .../server/coordinator/CoordinatorService.java     |   5 -
 .../fluss/server/coordinator/MetadataManager.java  |  65 +-------
 .../event/TableRegistrationChangeEvent.java        |  68 ++++++++
 .../event/watcher/TableChangeWatcher.java          |  17 +-
 .../org/apache/fluss/server/log/LogTablet.java     |   2 -
 .../org/apache/fluss/server/replica/Replica.java   |  26 +++
 .../fluss/server/replica/ReplicaManager.java       |  29 ++++
 .../coordinator/CoordinatorEventProcessorTest.java |  67 +++++++-
 .../server/coordinator/LakeTableManagerITCase.java |  12 +-
 .../server/coordinator/TableManagerITCase.java     |  33 +---
 .../event/watcher/TableChangeWatcherTest.java      |  64 ++++++-
 .../fluss/server/log/remote/RemoteLogITCase.java   | 183 ++++++++++++++++++++-
 .../apache/fluss/server/replica/ReplicaTest.java   |  22 +++
 .../server/testutils/RpcMessageTestUtils.java      |  20 ++-
 18 files changed, 612 insertions(+), 119 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
index eba41e078..be894ee79 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
@@ -340,23 +340,6 @@ public final class TableInfo {
                 modifiedTime);
     }
 
-    /** Replace a TableInfo with a new SchemaInfo. */
-    public TableInfo withNewSchema(SchemaInfo schemaInfo) {
-        return new TableInfo(
-                tablePath,
-                tableId,
-                schemaInfo.getSchemaId(),
-                schemaInfo.getSchema(),
-                bucketKeys,
-                partitionKeys,
-                numBuckets,
-                properties,
-                customProperties,
-                comment,
-                createdTime,
-                modifiedTime);
-    }
-
     @Override
     public boolean equals(Object o) {
         if (o == null || getClass() != o.getClass()) {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 6fb700322..cf6def30f 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -27,9 +27,11 @@ import org.apache.fluss.exception.InvalidConfigException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.LakeTableAlreadyExistException;
 import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.types.DataTypes;
 
@@ -59,6 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import javax.annotation.Nullable;
 
 import java.nio.file.Files;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -72,6 +75,7 @@ import static 
org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 import static 
org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -584,6 +588,7 @@ class LakeEnabledTableCreateITCase {
                         .build();
         TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter");
         admin.createTable(logTablePath, logTable, false).get();
+        long tableId = admin.getTableInfo(logTablePath).get().getTableId();
 
         assertThatThrownBy(
                         () ->
@@ -591,6 +596,9 @@ class LakeEnabledTableCreateITCase {
                                         Identifier.create(DATABASE, 
logTablePath.getTableName())))
                 .isInstanceOf(Catalog.TableNotExistException.class);
 
+        // verify LogTablet datalake status is initially disabled
+        verifyLogTabletDataLakeEnabled(tableId, false);
+
         // enable lake
         TableChange.SetOption enableLake =
                 TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true");
@@ -598,6 +606,9 @@ class LakeEnabledTableCreateITCase {
 
         admin.alterTable(logTablePath, changes, false).get();
 
+        // verify LogTablet datalake status is enabled
+        verifyLogTabletDataLakeEnabled(tableId, true);
+
         Identifier paimonTablePath = Identifier.create(DATABASE, 
logTablePath.getTableName());
         Table enabledPaimonLogTable = paimonCatalog.getTable(paimonTablePath);
 
@@ -635,11 +646,17 @@ class LakeEnabledTableCreateITCase {
         // paimon table should still exist although lake is disabled
         paimonCatalog.getTable(paimonTablePath);
 
+        // verify LogTablet datalake status is disabled
+        verifyLogTabletDataLakeEnabled(tableId, false);
+
         // try to enable lake table again
         enableLake = 
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
         changes = Collections.singletonList(enableLake);
         admin.alterTable(logTablePath, changes, false).get();
 
+        // verify LogTablet datalake status is enabled again
+        verifyLogTabletDataLakeEnabled(tableId, true);
+
         // write some data to the lake table
         writeData(paimonCatalog.getTable(paimonTablePath));
         Optional<Snapshot> snapshot = 
paimonCatalog.getTable(paimonTablePath).latestSnapshot();
@@ -649,10 +666,16 @@ class LakeEnabledTableCreateITCase {
         changes = Collections.singletonList(disableLake);
         admin.alterTable(logTablePath, changes, false).get();
 
+        // verify LogTablet datalake status is disabled again
+        verifyLogTabletDataLakeEnabled(tableId, false);
+
         // try to enable lake table again, the snapshot should not change
         changes = Collections.singletonList(enableLake);
         admin.alterTable(logTablePath, changes, false).get();
         
assertThat(paimonCatalog.getTable(paimonTablePath).latestSnapshot()).isEqualTo(snapshot);
+
+        // verify LogTablet datalake status is enabled
+        verifyLogTabletDataLakeEnabled(tableId, true);
     }
 
     @Test
@@ -1021,6 +1044,19 @@ class LakeEnabledTableCreateITCase {
         assertThat(paimonTable.comment()).isEqualTo(flussTable.getComment());
     }
 
+    private void verifyLogTabletDataLakeEnabled(long tableId, boolean 
isDataLakeEnabled) {
+        for (int bucket = 0; bucket < BUCKET_NUM; bucket++) {
+            TableBucket tb = new TableBucket(tableId, bucket);
+            retry(
+                    Duration.ofMinutes(1),
+                    () -> {
+                        Replica replica = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
+                        assertThat(replica.getLogTablet().isDataLakeEnabled())
+                                .isEqualTo(isDataLakeEnabled);
+                    });
+        }
+    }
+
     private TableDescriptor createTableDescriptor(
             int columnNum,
             int bucketNum,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index 55b59753c..aa0afbbcd 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -85,6 +85,7 @@ import 
org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseRecei
 import org.apache.fluss.server.coordinator.event.RebalanceEvent;
 import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
 import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
+import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
 import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
 import 
org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
 import org.apache.fluss.server.coordinator.rebalance.RebalanceManager;
@@ -125,6 +126,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -560,6 +562,8 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         } else if (event instanceof SchemaChangeEvent) {
             SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
             processSchemaChange(schemaChangeEvent);
+        } else if (event instanceof TableRegistrationChangeEvent) {
+            processTableRegistrationChange((TableRegistrationChangeEvent) 
event);
         } else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
             processNotifyLeaderAndIsrResponseReceivedEvent(
                     (NotifyLeaderAndIsrResponseReceivedEvent) event);
@@ -720,6 +724,64 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                 null);
     }
 
+    private void processTableRegistrationChange(TableRegistrationChangeEvent 
event) {
+        TablePath tablePath = event.getTablePath();
+        Long tableId = coordinatorContext.getTableIdByPath(tablePath);
+
+        // Skip if the table is not yet registered in coordinator context.
+        // Should not happen in normal cases.
+        if (tableId == null) {
+            LOG.warn(
+                    "Table {} is not registered in coordinator context, "
+                            + "skip processing table registration change.",
+                    tablePath);
+            return;
+        }
+
+        TableInfo oldTableInfo = coordinatorContext.getTableInfoById(tableId);
+
+        TableInfo newTableInfo =
+                event.getNewTableRegistration()
+                        .toTableInfo(tablePath, oldTableInfo.getSchemaInfo());
+        coordinatorContext.putTableInfo(newTableInfo);
+        postAlterTableProperties(oldTableInfo, newTableInfo);
+
+        // Notify tablet servers about the metadata change
+        updateTabletServerMetadataCache(
+                new 
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+                tableId,
+                null,
+                null);
+    }
+
+    private void postAlterTableProperties(TableInfo oldTableInfo, TableInfo 
newTableInfo) {
+        boolean dataLakeEnabled = 
newTableInfo.getTableConfig().isDataLakeEnabled();
+        boolean toEnableDataLake =
+                !oldTableInfo.getTableConfig().isDataLakeEnabled()
+                        && newTableInfo.getTableConfig().isDataLakeEnabled();
+        boolean toDisableDataLake =
+                oldTableInfo.getTableConfig().isDataLakeEnabled()
+                        && !newTableInfo.getTableConfig().isDataLakeEnabled();
+
+        if (toEnableDataLake) {
+            // if the table is lake table, we need to add it to lake table 
tiering manager
+            lakeTableTieringManager.addNewLakeTable(newTableInfo);
+        } else if (toDisableDataLake) {
+            lakeTableTieringManager.removeLakeTable(newTableInfo.getTableId());
+        } else if (dataLakeEnabled) {
+            // The table is still a lake table, check if freshness has changed
+            Duration oldFreshness = 
oldTableInfo.getTableConfig().getDataLakeFreshness();
+            Duration newFreshness = 
newTableInfo.getTableConfig().getDataLakeFreshness();
+
+            // Check if freshness has changed
+            if (!Objects.equals(oldFreshness, newFreshness)) {
+                lakeTableTieringManager.updateTableLakeFreshness(
+                        newTableInfo.getTableId(), newFreshness.toMillis());
+            }
+        }
+        // more post-alter actions can be added here
+    }
+
     private void processCreatePartition(CreatePartitionEvent 
createPartitionEvent) {
         long partitionId = createPartitionEvent.getPartitionId();
         // skip the partition if it already exists
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
index 35527805c..436cc5203 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
@@ -280,6 +280,7 @@ public class CoordinatorRequestBatch {
      *   <li>case8: One newly tabletServer added into cluster
      *   <li>case9: One tabletServer is removed from cluster
      *   <li>case10: schemaId is changed after table is created.
+     *   <li>case 11: TableRegistration changed after table is created.
      * </ol>
      */
     // todo: improve this with different phase enum.
@@ -300,7 +301,7 @@ public class CoordinatorRequestBatch {
                         .computeIfAbsent(tableId, k -> new HashMap<>())
                         .put(partitionId, Collections.emptyList());
             } else {
-                // case3, case4, case10
+                // case3, case4, case10, case 11
                 updateMetadataRequestBucketMap.put(tableId, 
Collections.emptyList());
             }
         } else {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 94d56dd33..a9cd5b3c5 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -378,8 +378,6 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                             + "table properties or table schema.");
         }
 
-        LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer =
-                lakeCatalogDynamicLoader.getLakeCatalogContainer();
         LakeCatalog.Context lakeCatalogContext =
                 new DefaultLakeCatalogContext(false, 
currentSession().getPrincipal());
 
@@ -388,7 +386,6 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                     tablePath,
                     alterSchemaChanges,
                     request.isIgnoreIfNotExists(),
-                    lakeCatalogContainer.getLakeCatalog(),
                     lakeCatalogContext);
         }
 
@@ -398,8 +395,6 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                     alterTableConfigChanges,
                     tablePropertyChanges,
                     request.isIgnoreIfNotExists(),
-                    lakeCatalogContainer.getLakeCatalog(),
-                    lakeTableTieringManager,
                     lakeCatalogContext);
         }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index f728e91da..8d7b61a08 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -60,14 +60,12 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.time.Duration;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -327,7 +325,6 @@ public class MetadataManager {
             TablePath tablePath,
             List<TableChange> schemaChanges,
             boolean ignoreIfNotExists,
-            @Nullable LakeCatalog lakeCatalog,
             LakeCatalog.Context lakeCatalogContext)
             throws TableNotExistException, TableNotPartitionedException {
         try {
@@ -339,8 +336,7 @@ public class MetadataManager {
                 Schema newSchema = SchemaUpdate.applySchemaChanges(table, 
schemaChanges);
 
                 // Lake First: sync to Lake before updating Fluss schema
-                syncSchemaChangesToLake(
-                        tablePath, table, schemaChanges, lakeCatalog, 
lakeCatalogContext);
+                syncSchemaChangesToLake(tablePath, table, schemaChanges, 
lakeCatalogContext);
 
                 // Update Fluss schema (ZK) after Lake sync succeeds
                 if (!newSchema.equals(table.getSchema())) {
@@ -370,12 +366,13 @@ public class MetadataManager {
             TablePath tablePath,
             TableInfo tableInfo,
             List<TableChange> schemaChanges,
-            @Nullable LakeCatalog lakeCatalog,
             LakeCatalog.Context lakeCatalogContext) {
         if (!isDataLakeEnabled(tableInfo.toTableDescriptor())) {
             return;
         }
 
+        LakeCatalog lakeCatalog =
+                
lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();
         if (lakeCatalog == null) {
             throw new InvalidAlterTableException(
                     "Cannot alter schema for datalake enabled table "
@@ -399,8 +396,6 @@ public class MetadataManager {
             List<TableChange> tableChanges,
             TablePropertyChanges tablePropertyChanges,
             boolean ignoreIfNotExists,
-            @Nullable LakeCatalog lakeCatalog,
-            LakeTableTieringManager lakeTableTieringManager,
             LakeCatalog.Context lakeCatalogContext) {
         try {
             // it throws TableNotExistException if the table or database not 
exists
@@ -431,22 +426,12 @@ public class MetadataManager {
                         tableDescriptor,
                         newDescriptor,
                         tableChanges,
-                        lakeCatalog,
                         lakeCatalogContext);
                 // update the table to zk
                 TableRegistration updatedTableRegistration =
                         tableReg.newProperties(
                                 newDescriptor.getProperties(), 
newDescriptor.getCustomProperties());
                 zookeeperClient.updateTable(tablePath, 
updatedTableRegistration);
-
-                // post alter table properties, e.g. add the table to lake 
table tiering manager if
-                // it's to enable datalake for the table
-                postAlterTableProperties(
-                        tablePath,
-                        schemaInfo,
-                        tableDescriptor,
-                        updatedTableRegistration,
-                        lakeTableTieringManager);
             } else {
                 LOG.info(
                         "No properties changed when alter table {}, skip 
update table.", tablePath);
@@ -471,8 +456,10 @@ public class MetadataManager {
             TableDescriptor tableDescriptor,
             TableDescriptor newDescriptor,
             List<TableChange> tableChanges,
-            LakeCatalog lakeCatalog,
             LakeCatalog.Context lakeCatalogContext) {
+        LakeCatalog lakeCatalog =
+                
lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();
+
         if (isDataLakeEnabled(newDescriptor)) {
             if (lakeCatalog == null) {
                 throw new InvalidAlterTableException(
@@ -511,41 +498,6 @@ public class MetadataManager {
         }
     }
 
-    private void postAlterTableProperties(
-            TablePath tablePath,
-            SchemaInfo schemaInfo,
-            TableDescriptor oldTableDescriptor,
-            TableRegistration newTableRegistration,
-            LakeTableTieringManager lakeTableTieringManager) {
-
-        boolean dataLakeEnabled = 
isDataLakeEnabled(newTableRegistration.properties);
-        boolean toEnableDataLake = !isDataLakeEnabled(oldTableDescriptor) && 
dataLakeEnabled;
-        boolean toDisableDataLake = isDataLakeEnabled(oldTableDescriptor) && 
!dataLakeEnabled;
-
-        if (toEnableDataLake) {
-            TableInfo newTableInfo = 
newTableRegistration.toTableInfo(tablePath, schemaInfo);
-            // if the table is lake table, we need to add it to lake table 
tiering manager
-            lakeTableTieringManager.addNewLakeTable(newTableInfo);
-        } else if (toDisableDataLake) {
-            
lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId);
-        } else if (dataLakeEnabled) {
-            // The table is still a lake table, check if freshness has changed
-            Duration oldFreshness =
-                    Configuration.fromMap(oldTableDescriptor.getProperties())
-                            .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
-            Duration newFreshness =
-                    Configuration.fromMap(newTableRegistration.properties)
-                            .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
-
-            // Check if freshness has changed
-            if (!Objects.equals(oldFreshness, newFreshness)) {
-                lakeTableTieringManager.updateTableLakeFreshness(
-                        newTableRegistration.tableId, newFreshness.toMillis());
-            }
-        }
-        // more post-alter actions can be added here
-    }
-
     /**
      * Get a new TableDescriptor with updated properties.
      *
@@ -587,11 +539,6 @@ public class MetadataManager {
         return Boolean.parseBoolean(dataLakeEnabledValue);
     }
 
-    private boolean isDataLakeEnabled(Map<String, String> properties) {
-        String dataLakeEnabledValue = 
properties.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
-        return Boolean.parseBoolean(dataLakeEnabledValue);
-    }
-
     public void removeSensitiveTableOptions(Map<String, String> 
tableLakeOptions) {
         if (tableLakeOptions == null || tableLakeOptions.isEmpty()) {
             return;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java
new file mode 100644
index 000000000..6d128cfa7
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.data.TableRegistration;
+
+import java.util.Objects;
+
+/** An event for table registration change. */
+public class TableRegistrationChangeEvent implements CoordinatorEvent {
+    private final TablePath tablePath;
+    private final TableRegistration newTableRegistration;
+
+    public TableRegistrationChangeEvent(
+            TablePath tablePath, TableRegistration newTableRegistration) {
+        this.tablePath = tablePath;
+        this.newTableRegistration = newTableRegistration;
+    }
+
+    public TablePath getTablePath() {
+        return tablePath;
+    }
+
+    public TableRegistration getNewTableRegistration() {
+        return newTableRegistration;
+    }
+
+    @Override
+    public String toString() {
+        return "TablePropertiesChangeEvent{"
+                + "tablePath="
+                + tablePath
+                + ", tableRegistration="
+                + newTableRegistration
+                + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TableRegistrationChangeEvent that = (TableRegistrationChangeEvent) o;
+        return Objects.equals(tablePath, that.tablePath)
+                && Objects.equals(newTableRegistration, 
that.newTableRegistration);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(tablePath, newTableRegistration);
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
index 09c273e7d..5b4821f19 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
@@ -30,6 +30,7 @@ import 
org.apache.fluss.server.coordinator.event.DropPartitionEvent;
 import org.apache.fluss.server.coordinator.event.DropTableEvent;
 import org.apache.fluss.server.coordinator.event.EventManager;
 import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
+import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.PartitionAssignment;
 import org.apache.fluss.server.zk.data.TableAssignment;
@@ -127,7 +128,16 @@ public class TableChangeWatcher {
                             if (tablePath == null) {
                                 break;
                             }
-                            processCreateTable(tablePath, newData);
+                            // Distinguish between table creation and 
properties change.
+                            // If oldData exists and contains valid table 
registration data,
+                            // it's a properties change; otherwise, it's a 
table creation.
+                            if (oldData != null
+                                    && oldData.getData() != null
+                                    && oldData.getData().length > 0) {
+                                processTableRegistrationChange(tablePath, 
newData);
+                            } else {
+                                processCreateTable(tablePath, newData);
+                            }
                         }
                         break;
                     }
@@ -241,6 +251,11 @@ public class TableChangeWatcher {
                     new CreatePartitionEvent(
                             tablePath, tableId, partitionId, partitionName, 
partitionAssignment));
         }
+
+        private void processTableRegistrationChange(TablePath tablePath, 
ChildData newData) {
+            TableRegistration newTable = TableZNode.decode(newData.getData());
+            eventManager.put(new TableRegistrationChangeEvent(tablePath, 
newTable));
+        }
     }
 
     private void processSchemaChange(TablePath tablePath, int schemaId) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index a4d7668f9..3f333dc10 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -529,8 +529,6 @@ public final class LogTablet {
     public void updateLakeTableSnapshotId(long snapshotId) {
         if (snapshotId > this.lakeTableSnapshotId) {
             this.lakeTableSnapshotId = snapshotId;
-            // it means the data lake is enabled if we have got the snapshot id
-            this.isDataLakeEnabled = true;
         }
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 9314901fe..ee158b7a4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -560,6 +560,32 @@ public final class Replica {
         logTablet.updateLeaderEndOffsetSnapshot();
     }
 
+    public void updateIsDataLakeEnabled(boolean isDataLakeEnabled) {
+        boolean old = logTablet.isDataLakeEnabled();
+        if (old == isDataLakeEnabled) {
+            return;
+        }
+
+        logTablet.updateIsDataLakeEnabled(isDataLakeEnabled);
+
+        if (isLeader()) {
+            if (isDataLakeEnabled) {
+                registerLakeTieringMetrics();
+            } else {
+                if (lakeTieringMetricGroup != null) {
+                    lakeTieringMetricGroup.close();
+                    lakeTieringMetricGroup = null;
+                }
+            }
+        }
+
+        LOG.info(
+                "Replica for {} isDataLakeEnabled changed from {} to {}",
+                tableBucket,
+                old,
+                isDataLakeEnabled);
+    }
+
     private void createKv() {
         try {
             // create a closeable registry for the closable related to kv
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 421db4a06..76e3ea186 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -84,6 +84,7 @@ import org.apache.fluss.server.log.LogTablet;
 import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
 import org.apache.fluss.server.log.remote.RemoteLogManager;
 import org.apache.fluss.server.metadata.ClusterMetadata;
+import org.apache.fluss.server.metadata.TableMetadata;
 import org.apache.fluss.server.metadata.TabletServerMetadataCache;
 import org.apache.fluss.server.metrics.UserMetrics;
 import org.apache.fluss.server.metrics.group.BucketMetricGroup;
@@ -459,9 +460,37 @@ public class ReplicaManager {
                     // check or apply coordinator epoch.
                     validateAndApplyCoordinatorEpoch(coordinatorEpoch, 
"updateMetadataCache");
                     metadataCache.updateClusterMetadata(clusterMetadata);
+                    updateReplicaTableConfig(clusterMetadata);
                 });
     }
 
+    private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
+        Map<Long, Boolean> tableIdToLakeFlag = new HashMap<>();
+        for (TableMetadata tableMetadata : 
clusterMetadata.getTableMetadataList()) {
+            TableInfo tableInfo = tableMetadata.getTableInfo();
+            if (tableInfo.getTableConfig().getDataLakeFormat().isPresent()) {
+                long tableId = tableInfo.getTableId();
+                boolean dataLakeEnabled = 
tableInfo.getTableConfig().isDataLakeEnabled();
+                tableIdToLakeFlag.put(tableId, dataLakeEnabled);
+            }
+        }
+
+        if (tableIdToLakeFlag.isEmpty()) {
+            return;
+        }
+
+        for (Map.Entry<TableBucket, HostedReplica> entry : 
allReplicas.entrySet()) {
+            HostedReplica hostedReplica = entry.getValue();
+            if (hostedReplica instanceof OnlineReplica) {
+                Replica replica = ((OnlineReplica) hostedReplica).getReplica();
+                long tableId = replica.getTableBucket().getTableId();
+                if (tableIdToLakeFlag.containsKey(tableId)) {
+                    
replica.updateIsDataLakeEnabled(tableIdToLakeFlag.get(tableId));
+                }
+            }
+        }
+    }
+
     /**
      * Append log records to leader replicas of the buckets, and wait for them 
to be replicated to
      * other replicas.
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index dfb4088e4..d4ec8f8db 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -53,6 +53,7 @@ import 
org.apache.fluss.server.coordinator.statemachine.ReplicaState;
 import org.apache.fluss.server.entity.AdjustIsrResultForBucket;
 import org.apache.fluss.server.entity.CommitKvSnapshotData;
 import org.apache.fluss.server.entity.CommitRemoteLogManifestData;
+import org.apache.fluss.server.entity.TablePropertyChanges;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import 
org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
 import org.apache.fluss.server.metadata.BucketMetadata;
@@ -906,6 +907,70 @@ class CoordinatorEventProcessorTest {
                                 3, new TableMetadata(tableInfo2, 
Collections.emptyList())));
     }
 
+    @Test
+    void testTableRegistrationChange() throws Exception {
+        // make sure all request to gateway should be successful
+        initCoordinatorChannel();
+
+        // create a table
+        TablePath t1 = TablePath.of(defaultDatabase, 
"test_table_registration_change");
+        int nBuckets = 1;
+        int replicationFactor = 3;
+        TableAssignment tableAssignment =
+                generateAssignment(
+                        nBuckets,
+                        replicationFactor,
+                        new TabletServerInfo[] {
+                            new TabletServerInfo(0, "rack0"),
+                            new TabletServerInfo(1, "rack1"),
+                            new TabletServerInfo(2, "rack2")
+                        });
+        // create table
+        List<Integer> replicas = 
tableAssignment.getBucketAssignment(0).getReplicas();
+        metadataManager.createTable(t1, TEST_TABLE, tableAssignment, false);
+        TableInfo tableInfo = metadataManager.getTable(t1);
+
+        retry(
+                Duration.ofMinutes(1),
+                () ->
+                        verifyMetadataUpdateRequest(
+                                3,
+                                new TableMetadata(
+                                        tableInfo,
+                                        Collections.singletonList(
+                                                new BucketMetadata(
+                                                        0, replicas.get(0), 0, 
replicas)))));
+
+        // alter table properties (custom property)
+        TablePropertyChanges.Builder builder = TablePropertyChanges.builder();
+        builder.setCustomProperty("custom.key", "custom.value");
+        TablePropertyChanges tablePropertyChanges = builder.build();
+        metadataManager.alterTableProperties(
+                t1, Collections.emptyList(), tablePropertyChanges, false, 
null);
+
+        // get updated table info and verify metadata update request is sent
+        TableInfo updatedTableInfo = metadataManager.getTable(t1);
+        assertThat(updatedTableInfo.getCustomProperties().toMap())
+                .containsEntry("custom.key", "custom.value");
+
+        retry(
+                Duration.ofMinutes(1),
+                () ->
+                        verifyMetadataUpdateRequest(
+                                3, new TableMetadata(updatedTableInfo, 
Collections.emptyList())));
+
+        // verify the table info in coordinator context is updated
+        retryVerifyContext(
+                ctx -> {
+                    Long tableId = ctx.getTableIdByPath(t1);
+                    assertThat(tableId).isNotNull();
+                    TableInfo tableInfoInCtx = ctx.getTableInfoById(tableId);
+                    assertThat(tableInfoInCtx).isNotNull();
+                    assertThat(tableInfoInCtx.getCustomProperties().toMap())
+                            .containsEntry("custom.key", "custom.value");
+                });
+    }
+
     @Test
     void testDoBucketReassignment() throws Exception {
         zookeeperClient.registerTabletServer(
@@ -1305,7 +1370,7 @@ class CoordinatorEventProcessorTest {
     }
 
     private void alterTable(TablePath tablePath, List<TableChange> 
schemaChanges) {
-        metadataManager.alterTableSchema(tablePath, schemaChanges, true, null, 
null);
+        metadataManager.alterTableSchema(tablePath, schemaChanges, true, null);
     }
 
     private TableDescriptor getPartitionedTable() {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
index 2472d5041..dd25e0ef1 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
@@ -156,7 +156,8 @@ class LakeTableManagerITCase {
                 .alterTable(
                         newAlterTableRequest(
                                 tablePath,
-                                alterTableProperties(setProperties, new 
ArrayList<>()),
+                                setProperties,
+                                Collections.emptyList(),
                                 Collections.emptyList(),
                                 false))
                 .get();
@@ -185,7 +186,8 @@ class LakeTableManagerITCase {
                 .alterTable(
                         newAlterTableRequest(
                                 tablePath,
-                                alterTableProperties(new HashMap<>(), 
resetProperties),
+                                Collections.emptyMap(),
+                                resetProperties,
                                 Collections.emptyList(),
                                 false))
                 .get();
@@ -214,7 +216,8 @@ class LakeTableManagerITCase {
                 .alterTable(
                         newAlterTableRequest(
                                 tablePath,
-                                alterTableProperties(new HashMap<>(), 
resetProperties2),
+                                Collections.emptyMap(),
+                                resetProperties2,
                                 Collections.emptyList(),
                                 false))
                 .get();
@@ -280,7 +283,8 @@ class LakeTableManagerITCase {
                 .alterTable(
                         newAlterTableRequest(
                                 tablePath,
-                                alterTableProperties(setProperties, new 
ArrayList<>()),
+                                setProperties,
+                                Collections.emptyList(),
                                 Collections.emptyList(),
                                 false))
                 .get();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index f21d5bdd8..155b2c4fe 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -23,7 +23,6 @@ import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.config.AutoPartitionTimeUnit;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.config.cluster.AlterConfigOpType;
 import org.apache.fluss.exception.DatabaseAlreadyExistException;
 import org.apache.fluss.exception.DatabaseNotEmptyException;
 import org.apache.fluss.exception.DatabaseNotExistException;
@@ -51,7 +50,6 @@ import org.apache.fluss.rpc.messages.ListDatabasesRequest;
 import org.apache.fluss.rpc.messages.MetadataRequest;
 import org.apache.fluss.rpc.messages.MetadataResponse;
 import org.apache.fluss.rpc.messages.PbAddColumn;
-import org.apache.fluss.rpc.messages.PbAlterConfig;
 import org.apache.fluss.rpc.messages.PbBucketMetadata;
 import org.apache.fluss.rpc.messages.PbPartitionMetadata;
 import org.apache.fluss.rpc.messages.PbServerNode;
@@ -303,7 +301,8 @@ class TableManagerITCase {
                 .alterTable(
                         newAlterTableRequest(
                                 tablePath,
-                                alterTableProperties(setProperties, 
resetProperties),
+                                setProperties,
+                                resetProperties,
                                 Collections.emptyList(),
                                 false))
                 .get();
@@ -679,7 +678,11 @@ class TableManagerITCase {
         adminGateway
                 .alterTable(
                         newAlterTableRequest(
-                                tablePath, Collections.emptyList(), 
alterTableAddColumns(), false))
+                                tablePath,
+                                Collections.emptyMap(),
+                                Collections.emptyList(),
+                                alterTableAddColumns(),
+                                false))
                 .get();
 
         // restart coordinatorServer
@@ -820,28 +823,6 @@ class TableManagerITCase {
                 .build();
     }
 
-    private static List<PbAlterConfig> alterTableProperties(
-            Map<String, String> setProperties, List<String> resetProperties) {
-        List<PbAlterConfig> res = new ArrayList<>();
-
-        for (Map.Entry<String, String> entry : setProperties.entrySet()) {
-            PbAlterConfig info = new PbAlterConfig();
-            info.setConfigKey(entry.getKey());
-            info.setConfigValue(entry.getValue());
-            info.setOpType(AlterConfigOpType.SET.value());
-            res.add(info);
-        }
-
-        for (String resetProperty : resetProperties) {
-            PbAlterConfig info = new PbAlterConfig();
-            info.setConfigKey(resetProperty);
-            info.setOpType(AlterConfigOpType.DELETE.value());
-            res.add(info);
-        }
-
-        return res;
-    }
-
     private static List<PbAddColumn> alterTableAddColumns() {
         List<PbAddColumn> addColumns = new ArrayList<>();
         PbAddColumn newNestedColumn = new PbAddColumn();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
index 691f0d9ce..76634b970 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
@@ -35,12 +35,15 @@ import 
org.apache.fluss.server.coordinator.event.CreateTableEvent;
 import org.apache.fluss.server.coordinator.event.DropPartitionEvent;
 import org.apache.fluss.server.coordinator.event.DropTableEvent;
 import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
+import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
 import org.apache.fluss.server.coordinator.event.TestingEventManager;
+import org.apache.fluss.server.entity.TablePropertyChanges;
 import org.apache.fluss.server.zk.NOPErrorHandler;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.ZooKeeperExtension;
 import org.apache.fluss.server.zk.data.PartitionAssignment;
 import org.apache.fluss.server.zk.data.TableAssignment;
+import org.apache.fluss.server.zk.data.TableRegistration;
 import org.apache.fluss.testutils.common.AllCallbackWrapper;
 import org.apache.fluss.types.DataTypes;
 
@@ -309,7 +312,6 @@ class TableChangeWatcherTest {
                                     null,
                                     TableChange.ColumnPosition.last())),
                     false,
-                    null,
                     null);
             Schema newSchema =
                     Schema.newBuilder()
@@ -329,4 +331,64 @@ class TableChangeWatcherTest {
                         assertThat(eventManager.getEvents())
                                 
.containsExactlyInAnyOrderElementsOf(allEvents));
     }
+
+    @Test
+    void testTableRegistrationChange() {
+        // create a table
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"table_registration_change");
+        TableAssignment tableAssignment =
+                generateAssignment(
+                        3,
+                        3,
+                        new TabletServerInfo[] {
+                            new TabletServerInfo(0, "rack0"),
+                            new TabletServerInfo(1, "rack1"),
+                            new TabletServerInfo(2, "rack2")
+                        });
+        long tableId = metadataManager.createTable(tablePath, TEST_TABLE, 
tableAssignment, false);
+        SchemaInfo schemaInfo = metadataManager.getLatestSchema(tablePath);
+        long currentMillis = System.currentTimeMillis();
+
+        List<CoordinatorEvent> expectedEvents = new ArrayList<>();
+        expectedEvents.add(
+                new CreateTableEvent(
+                        TableInfo.of(
+                                tablePath,
+                                tableId,
+                                schemaInfo.getSchemaId(),
+                                TEST_TABLE,
+                                currentMillis,
+                                currentMillis),
+                        tableAssignment));
+        expectedEvents.add(new SchemaChangeEvent(tablePath, schemaInfo));
+
+        retry(
+                Duration.ofMinutes(1),
+                () ->
+                        assertThat(eventManager.getEvents())
+                                
.containsExactlyInAnyOrderElementsOf(expectedEvents));
+
+        // alter table properties (custom property)
+        TablePropertyChanges.Builder builder = TablePropertyChanges.builder();
+        builder.setCustomProperty("custom.key", "custom.value");
+        TablePropertyChanges tablePropertyChanges = builder.build();
+        metadataManager.alterTableProperties(
+                tablePath, Collections.emptyList(), tablePropertyChanges, 
false, null);
+
+        // get the updated table registration
+        TableRegistration updatedTableRegistration =
+                metadataManager.getTableRegistration(tablePath);
+
+        // verify TableRegistrationChangeEvent is generated
+        expectedEvents.add(new TableRegistrationChangeEvent(tablePath, 
updatedTableRegistration));
+
+        metadataManager.dropTable(tablePath, false);
+        expectedEvents.add(new DropTableEvent(tableId, false, false));
+
+        retry(
+                Duration.ofMinutes(1),
+                () ->
+                        assertThat(eventManager.getEvents())
+                                
.containsExactlyInAnyOrderElementsOf(expectedEvents));
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
index 3187e5db5..90dfa0914 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
@@ -22,17 +22,24 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.MemorySize;
 import org.apache.fluss.fs.FileSystem;
 import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.remote.RemoteLogSegment;
 import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.server.entity.FetchReqInfo;
 import org.apache.fluss.server.log.FetchParams;
+import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.server.tablet.TabletServer;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.utils.FlussPaths;
+import org.apache.fluss.utils.clock.ManualClock;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -41,14 +48,19 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static org.apache.fluss.record.TestData.DATA1;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable;
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableRequest;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest;
 import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
@@ -58,11 +70,15 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for remote log. */
 public class RemoteLogITCase {
+
+    private static final ManualClock MANUAL_CLOCK = new 
ManualClock(System.currentTimeMillis());
+
     @RegisterExtension
     public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
             FlussClusterExtension.builder()
                     .setNumOfTabletServers(3)
                     .setClusterConf(initConfig())
+                    .setClock(MANUAL_CLOCK)
                     .build();
 
     private TableBucket setupTableBucket() throws Exception {
@@ -75,6 +91,11 @@ public class RemoteLogITCase {
 
     private void produceRecordsAndWaitRemoteLogCopy(
             TabletServerGateway leaderGateway, TableBucket tb) throws 
Exception {
+        produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 0L);
+    }
+
+    private void produceRecordsAndWaitRemoteLogCopy(
+            TabletServerGateway leaderGateway, TableBucket tb, long 
baseOffset) throws Exception {
         for (int i = 0; i < 10; i++) {
             assertProduceLogResponse(
                     leaderGateway
@@ -86,7 +107,7 @@ public class RemoteLogITCase {
                                             
genMemoryLogRecordsByObject(DATA1)))
                             .get(),
                     0,
-                    i * 10L);
+                    baseOffset + i * 10L);
         }
         FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(
                 new TableBucket(tb.getTableId(), 0));
@@ -202,6 +223,164 @@ public class RemoteLogITCase {
         FLUSS_CLUSTER_EXTENSION.waitUntilReplicaExpandToIsr(tb, follower);
     }
 
+    @Test
+    void testRemoteLogTTLWithDynamicLakeToggle() throws Exception {
+        TablePath tablePath = TablePath.of("fluss", 
"test_remote_log_ttl_dynamic_lake");
+        // ==================== Stage A: Lake Disabled (Initial) 
====================
+        // Create table without data lake enabled
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(DATA1_SCHEMA)
+                        .distributedBy(1)
+                        // Set a short TTL for testing (1 hour)
+                        .property(ConfigOptions.TABLE_LOG_TTL, 
Duration.ofHours(1))
+                        .build();
+
+        long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
tableDescriptor);
+        TableBucket tb = new TableBucket(tableId, 0);
+        FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
+
+        int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
+        TabletServerGateway leaderGateway =
+                FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId);
+
+        // Produce records to create remote log segments
+        produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb);
+
+        TabletServer tabletServer = 
FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId);
+        RemoteLogManager remoteLogManager = 
tabletServer.getReplicaManager().getRemoteLogManager();
+        RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb);
+
+        
assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0);
+        assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(0L);
+
+        Replica leaderReplica = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
+        LogTablet logTablet = leaderReplica.getLogTablet();
+        assertThat(logTablet.isDataLakeEnabled()).isFalse();
+
+        // Advance time past TTL (1 hour + buffer)
+        MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30));
+
+        // Since lake is disabled, expired segments should be deleted directly
+        retry(
+                Duration.ofMinutes(2),
+                () -> {
+                    
assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty();
+                    
assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE);
+                });
+
+        // ==================== Stage B: Dynamic Enable & Retention 
====================
+        // Dynamically enable data lake
+        CoordinatorGateway coordinatorGateway = 
FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
+        coordinatorGateway
+                .alterTable(
+                        newAlterTableRequest(
+                                tablePath,
+                                Collections.singletonMap(
+                                        
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                false))
+                .get();
+        retry(Duration.ofMinutes(1), () -> 
assertThat(logTablet.isDataLakeEnabled()).isTrue());
+
+        // Produce new data after enabling lake (baseOffset = 100 from Stage A)
+        produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 100L);
+        retry(
+                Duration.ofMinutes(2),
+                () -> 
assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0));
+        int stageBSegmentCount = remoteLogTablet.allRemoteLogSegments().size();
+        long stageBRemoteLogEndOffset = 
remoteLogTablet.getRemoteLogEndOffset().orElse(-1L);
+        assertThat(stageBRemoteLogEndOffset).isGreaterThan(0L);
+
+        // Advance time past TTL (1 hour + buffer)
+        MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30));
+
+        // Since lake is enabled but no data has been tiered (lakeLogEndOffset 
= -1),
+        // the expired segments should NOT be deleted
+        
assertThat(remoteLogTablet.allRemoteLogSegments()).hasSize(stageBSegmentCount);
+
+        // ==================== Stage C: Lake Progress Update (Cleanup) 
====================
+        // Step 1: Partially update lake log end offset to trigger partial 
cleanup
+        // Get segments sorted by remoteLogEndOffset and use middle segment's 
end offset
+        List<RemoteLogSegment> sortedSegments =
+                remoteLogTablet.allRemoteLogSegments().stream()
+                        
.sorted(Comparator.comparingLong(RemoteLogSegment::remoteLogEndOffset))
+                        .collect(Collectors.toList());
+        assertThat(sortedSegments.size()).isGreaterThanOrEqualTo(2);
+
+        // Use the end offset of a middle segment to ensure partial deletion
+        int midIndex = sortedSegments.size() / 2;
+        long partialLakeOffset = 
sortedSegments.get(midIndex).remoteLogEndOffset();
+        logTablet.updateLakeLogEndOffset(partialLakeOffset);
+
+        final int expectedRemainingSegments = sortedSegments.size() - midIndex 
- 1;
+        // The new remoteLogStartOffset should be the start offset of the 
first remaining segment
+        final long expectedNewStartOffset = sortedSegments.get(midIndex + 
1).remoteLogStartOffset();
+
+        // Wait for partial cleanup - only segments that have been tiered 
should be deleted
+        retry(
+                Duration.ofMinutes(2),
+                () -> {
+                    // Some segments should be deleted (those with endOffset 
<= partialLakeOffset)
+                    int currentSegmentCount = 
remoteLogTablet.allRemoteLogSegments().size();
+                    
assertThat(currentSegmentCount).isEqualTo(expectedRemainingSegments);
+                    // Remote log start offset should be updated to the first 
remaining segment's
+                    // start
+                    assertThat(remoteLogTablet.getRemoteLogStartOffset())
+                            .isEqualTo(expectedNewStartOffset);
+                    // Remaining segments should have remoteLogEndOffset > 
partialLakeOffset
+                    assertThat(remoteLogTablet.allRemoteLogSegments())
+                            .allSatisfy(
+                                    segment ->
+                                            
assertThat(segment.remoteLogEndOffset())
+                                                    
.isGreaterThan(partialLakeOffset));
+                });
+
+        // Step 2: Fully update lake log end offset to trigger complete cleanup
+        logTablet.updateLakeLogEndOffset(stageBRemoteLogEndOffset);
+
+        // Wait for complete cleanup - all segments should be deleted
+        retry(
+                Duration.ofMinutes(2),
+                () -> {
+                    
assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty();
+                    
assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE);
+                });
+
+        // ==================== Stage D: Dynamic Disable ====================
+        // Dynamically disable data lake
+        coordinatorGateway
+                .alterTable(
+                        newAlterTableRequest(
+                                tablePath,
+                                Collections.singletonMap(
+                                        
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                false))
+                .get();
+        retry(Duration.ofMinutes(1), () -> 
assertThat(logTablet.isDataLakeEnabled()).isFalse());
+
+        // Produce new data after disabling lake (baseOffset = 200 from Stage 
A + B)
+        produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 200L);
+        retry(
+                Duration.ofMinutes(2),
+                () -> 
assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0));
+
+        // Advance time past TTL (1 hour + buffer)
+        MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30));
+
+        // Since lake is disabled, expired segments should be deleted directly,
+        // ignoring the lake offset status
+        retry(
+                Duration.ofMinutes(2),
+                () -> {
+                    
assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty();
+                    
assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE);
+                });
+    }
+
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
         conf.setInt(ConfigOptions.DEFAULT_BUCKET_NUMBER, 1);
@@ -213,6 +392,8 @@ public class RemoteLogITCase {
         // set a shorter max log time to allow replica shrink from isr. Don't 
be too low, otherwise
         // normal follower synchronization will also be affected
         conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, 
Duration.ofSeconds(5));
+
+        conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON);
         return conf;
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index c8e26ea7b..fd487a0bc 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -752,6 +752,28 @@ final class ReplicaTest extends ReplicaTestBase {
         verifyGetKeyValues(kvTablet, expectedKeyValues);
     }
 
+    @Test
+    void testUpdateIsDataLakeEnabled() throws Exception {
+        Replica logReplica =
+                makeLogReplica(DATA1_PHYSICAL_TABLE_PATH, new 
TableBucket(DATA1_TABLE_ID, 1));
+        makeLogReplicaAsLeader(logReplica);
+
+        // initial state should be false
+        assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isFalse();
+
+        // update to true
+        logReplica.updateIsDataLakeEnabled(true);
+        assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isTrue();
+
+        // update with same value should not change anything (no-op)
+        logReplica.updateIsDataLakeEnabled(true);
+        assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isTrue();
+
+        // update to false
+        logReplica.updateIsDataLakeEnabled(false);
+        assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isFalse();
+    }
+
     private void makeLogReplicaAsLeader(Replica replica) throws Exception {
         makeLeaderReplica(
                 replica,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
index b567eff1f..5055105e9 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.server.testutils;
 
+import org.apache.fluss.config.cluster.AlterConfigOpType;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.SchemaGetter;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -75,6 +76,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
@@ -147,9 +149,25 @@ public class RpcMessageTestUtils {
 
     public static AlterTableRequest newAlterTableRequest(
             TablePath tablePath,
-            List<PbAlterConfig> alterConfigs,
+            Map<String, String> setProperties,
+            List<String> resetProperties,
             List<PbAddColumn> addColumns,
             boolean ignoreIfExists) {
+        List<PbAlterConfig> alterConfigs = new ArrayList<>();
+        for (Map.Entry<String, String> entry : setProperties.entrySet()) {
+            PbAlterConfig info = new PbAlterConfig();
+            info.setConfigKey(entry.getKey());
+            info.setConfigValue(entry.getValue());
+            info.setOpType(AlterConfigOpType.SET.value());
+            alterConfigs.add(info);
+        }
+        for (String resetProperty : resetProperties) {
+            PbAlterConfig info = new PbAlterConfig();
+            info.setConfigKey(resetProperty);
+            info.setOpType(AlterConfigOpType.DELETE.value());
+            alterConfigs.add(info);
+        }
+
         AlterTableRequest request = new AlterTableRequest();
         request.addAllConfigChanges(alterConfigs)
                 .addAllAddColumns(addColumns)

Reply via email to