This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fc820dff [lake] Support alter table.datalake.freshness (#2365)
2fc820dff is described below

commit 2fc820dff9ab7e6f5b27ab07ffb5d17787a414f1
Author: zhaomin1423 <[email protected]>
AuthorDate: Tue Jan 20 15:35:14 2026 +0800

    [lake] Support alter table.datalake.freshness (#2365)
---
 .../org/apache/fluss/config/FlussConfigUtils.java  |   6 +-
 .../server/coordinator/CoordinatorService.java     |   5 +
 .../coordinator/LakeTableTieringManager.java       |  69 +++++-
 .../fluss/server/coordinator/MetadataManager.java  |  25 ++-
 .../server/coordinator/LakeTableManagerITCase.java | 241 +++++++++++++++++++++
 website/docs/engine-flink/ddl.md                   |   5 +
 6 files changed, 339 insertions(+), 12 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index fa9c4274c..af4044458 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -21,7 +21,7 @@ import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.annotation.VisibleForTesting;
 
 import java.lang.reflect.Field;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +42,9 @@ public class FlussConfigUtils {
         TABLE_OPTIONS = extractConfigOptions("table.");
         CLIENT_OPTIONS = extractConfigOptions("client.");
         ALTERABLE_TABLE_OPTIONS =
-                
Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+                Arrays.asList(
+                        ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
+                        ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
     }
 
     public static boolean isTableStorageConfig(String key) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 70b359e4d..94d56dd33 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -919,6 +919,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return 
lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat();
     }
 
+    @VisibleForTesting
+    public LakeTableTieringManager getLakeTableTieringManager() {
+        return lakeTableTieringManager;
+    }
+
     private void validateHeartbeatRequest(
             PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) {
         if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
index 165ae7b84..c25c77969 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
@@ -130,6 +130,9 @@ public class LakeTableTieringManager implements 
AutoCloseable {
     // from table_id -> last heartbeat time by the tiering service
     private final Map<Long, Long> liveTieringTableIds;
 
+    // table_id -> delayed tiering task
+    private final Map<Long, DelayedTiering> delayedTieringByTableId;
+
     private final Lock lock = new ReentrantLock();
 
     public LakeTableTieringManager() {
@@ -159,6 +162,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
                 this::checkTieringServiceTimeout, 0, 15, TimeUnit.SECONDS);
         this.tableTierEpoch = new HashMap<>();
         this.tableLastTieredTime = new HashMap<>();
+        this.delayedTieringByTableId = new HashMap<>();
     }
 
     public void initWithLakeTables(List<Tuple2<TableInfo, Long>> 
tableInfoWithTieredTime) {
@@ -205,10 +209,17 @@ public class LakeTableTieringManager implements 
AutoCloseable {
             // the table has been dropped, return directly
             return;
         }
+        // Before reschedule, remove the existing DelayedTiering if present
+        DelayedTiering existingDelayedTiering = 
delayedTieringByTableId.remove(tableId);
+        if (existingDelayedTiering != null) {
+            existingDelayedTiering.cancel();
+        }
         long delayMs = freshnessInterval - (clock.milliseconds() - 
lastTieredTime);
         // if the delayMs is < 0, the DelayedTiering will be triggered at once 
without
         // adding into timing wheel.
-        lakeTieringScheduleTimer.add(new DelayedTiering(tableId, delayMs));
+        DelayedTiering delayedTiering = new DelayedTiering(tableId, delayMs);
+        delayedTieringByTableId.put(tableId, delayedTiering);
+        lakeTieringScheduleTimer.add(delayedTiering);
     }
 
     public void removeLakeTable(long tableId) {
@@ -221,6 +232,53 @@ public class LakeTableTieringManager implements 
AutoCloseable {
                     tieringStates.remove(tableId);
                     liveTieringTableIds.remove(tableId);
                     tableTierEpoch.remove(tableId);
+                    // Remove and cancel the delayed tiering task if present
+                    DelayedTiering delayedTiering = 
delayedTieringByTableId.remove(tableId);
+                    if (delayedTiering != null) {
+                        delayedTiering.cancel();
+                    }
+                });
+    }
+
+    /**
+     * Update the lake freshness for a table. This method should be called 
when the table's datalake
+     * freshness property is changed via ALTER TABLE.
+     *
+     * @param tableId the table id
+     * @param newFreshnessMs the new freshness interval in milliseconds
+     */
+    public void updateTableLakeFreshness(long tableId, long newFreshnessMs) {
+        inLock(
+                lock,
+                () -> {
+                    Long currentFreshness = tableLakeFreshness.get(tableId);
+                    if (currentFreshness == null) {
+                        // the table is not a lake table or has been dropped, 
skip update
+                        LOG.warn(
+                                "Cannot update lake freshness for table {} as 
it's not tracked by lake tiering manager.",
+                                tableId);
+                        return;
+                    }
+
+                    if (currentFreshness.equals(newFreshnessMs)) {
+                        // no change, skip update
+                        return;
+                    }
+
+                    tableLakeFreshness.put(tableId, newFreshnessMs);
+                    LOG.info(
+                            "Updated lake freshness for table {} from {} ms to 
{} ms.",
+                            tableId,
+                            currentFreshness,
+                            newFreshnessMs);
+
+                    // If the table is in Scheduled state, we need to 
reschedule it with the new
+                    // freshness
+                    TieringState currentState = tieringStates.get(tableId);
+                    if (currentState == TieringState.Scheduled) {
+                        // Reschedule the table tiering with the new freshness 
interval
+                        scheduleTableTiering(tableId);
+                    }
                 });
     }
 
@@ -458,9 +516,12 @@ public class LakeTableTieringManager implements 
AutoCloseable {
         public void run() {
             inLock(
                     lock,
-                    () ->
-                            // to pending state
-                            doHandleStateChange(tableId, 
TieringState.Pending));
+                    () -> {
+                        // to pending state
+                        doHandleStateChange(tableId, TieringState.Pending);
+                        // Remove from map after execution
+                        delayedTieringByTableId.remove(tableId);
+                    });
         }
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 05c5d51de..f728e91da 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -60,12 +60,14 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -516,12 +518,9 @@ public class MetadataManager {
             TableRegistration newTableRegistration,
             LakeTableTieringManager lakeTableTieringManager) {
 
-        boolean toEnableDataLake =
-                !isDataLakeEnabled(oldTableDescriptor)
-                        && isDataLakeEnabled(newTableRegistration.properties);
-        boolean toDisableDataLake =
-                isDataLakeEnabled(oldTableDescriptor)
-                        && !isDataLakeEnabled(newTableRegistration.properties);
+        boolean dataLakeEnabled = 
isDataLakeEnabled(newTableRegistration.properties);
+        boolean toEnableDataLake = !isDataLakeEnabled(oldTableDescriptor) && 
dataLakeEnabled;
+        boolean toDisableDataLake = isDataLakeEnabled(oldTableDescriptor) && 
!dataLakeEnabled;
 
         if (toEnableDataLake) {
             TableInfo newTableInfo = 
newTableRegistration.toTableInfo(tablePath, schemaInfo);
@@ -529,6 +528,20 @@ public class MetadataManager {
             lakeTableTieringManager.addNewLakeTable(newTableInfo);
         } else if (toDisableDataLake) {
             
lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId);
+        } else if (dataLakeEnabled) {
+            // The table is still a lake table, check if freshness has changed
+            Duration oldFreshness =
+                    Configuration.fromMap(oldTableDescriptor.getProperties())
+                            .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
+            Duration newFreshness =
+                    Configuration.fromMap(newTableRegistration.properties)
+                            .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
+
+            // Check if freshness has changed
+            if (!Objects.equals(oldFreshness, newFreshness)) {
+                lakeTableTieringManager.updateTableLakeFreshness(
+                        newTableRegistration.tableId, newFreshness.toMillis());
+            }
         }
         // more post-alter actions can be added here
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
index fb653b7d6..2472d5041 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
@@ -19,23 +19,37 @@ package org.apache.fluss.server.coordinator;
 
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.rpc.gateway.AdminGateway;
+import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
+import org.apache.fluss.rpc.messages.GetTableInfoResponse;
+import org.apache.fluss.rpc.messages.PbAlterConfig;
+import org.apache.fluss.server.entity.LakeTieringTableInfo;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.types.DataTypes;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableRequest;
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateDatabaseRequest;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateTableRequest;
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropDatabaseRequest;
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newGetTableInfoRequest;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -106,4 +120,231 @@ class LakeTableManagerITCase {
                 .isInstanceOf(TableAlreadyExistException.class)
                 .hasMessage("Table %s already exists.", lakeTablePath);
     }
+
+    @Test
+    void testAlterAndResetTableDatalakeProperties() throws Exception {
+        AdminReadOnlyGateway gateway = getAdminOnlyGateway(true);
+        AdminGateway adminGateway = getAdminGateway();
+
+        String db1 = "test_alter_reset_datalake_db";
+        String tb1 = "tb1";
+        TablePath tablePath = TablePath.of(db1, tb1);
+        // first create a database
+        adminGateway.createDatabase(newCreateDatabaseRequest(db1, 
false)).get();
+
+        // Step 1: create a table with datalake enabled and initial freshness 
(5min)
+        Map<String, String> initialProperties = new HashMap<>();
+        initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true");
+        initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), 
"5min");
+        TableDescriptor tableDescriptor = 
newPkTable().withProperties(initialProperties);
+        adminGateway.createTable(newCreateTableRequest(tablePath, 
tableDescriptor, false)).get();
+
+        // Step 2: verify initial properties
+        GetTableInfoResponse response =
+                gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        TableDescriptor gottenTable = 
TableDescriptor.fromJsonBytes(response.getTableJson());
+        
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+                .isEqualTo("true");
+        
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+                .isEqualTo("5min");
+
+        // Step 3: alter table to change datalake freshness (SET operation)
+        Map<String, String> setProperties = new HashMap<>();
+        setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), 
"3min");
+
+        adminGateway
+                .alterTable(
+                        newAlterTableRequest(
+                                tablePath,
+                                alterTableProperties(setProperties, new 
ArrayList<>()),
+                                Collections.emptyList(),
+                                false))
+                .get();
+
+        // Step 4: verify freshness was updated to 3min
+        GetTableInfoResponse responseAfterSet =
+                gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        TableDescriptor gottenTableAfterSet =
+                TableDescriptor.fromJsonBytes(responseAfterSet.getTableJson());
+        assertThat(
+                        gottenTableAfterSet
+                                .getProperties()
+                                
.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+                .isEqualTo("3min");
+        assertThat(
+                        gottenTableAfterSet
+                                .getProperties()
+                                
.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+                .isEqualTo("true");
+
+        // Step 5: reset datalake freshness property (RESET operation)
+        List<String> resetProperties = new ArrayList<>();
+        resetProperties.add(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
+
+        adminGateway
+                .alterTable(
+                        newAlterTableRequest(
+                                tablePath,
+                                alterTableProperties(new HashMap<>(), 
resetProperties),
+                                Collections.emptyList(),
+                                false))
+                .get();
+
+        // Step 6: verify freshness was removed but datalake.enabled remains
+        GetTableInfoResponse responseAfterReset =
+                gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        TableDescriptor gottenTableAfterReset =
+                
TableDescriptor.fromJsonBytes(responseAfterReset.getTableJson());
+        assertThat(
+                        gottenTableAfterReset
+                                .getProperties()
+                                
.containsKey(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+                .isFalse();
+        assertThat(
+                        gottenTableAfterReset
+                                .getProperties()
+                                
.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+                .isEqualTo("true");
+
+        // Step 7: reset datalake enabled property
+        List<String> resetProperties2 = new ArrayList<>();
+        resetProperties2.add(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+
+        adminGateway
+                .alterTable(
+                        newAlterTableRequest(
+                                tablePath,
+                                alterTableProperties(new HashMap<>(), 
resetProperties2),
+                                Collections.emptyList(),
+                                false))
+                .get();
+
+        // Step 8: verify datalake.enabled was also removed
+        GetTableInfoResponse responseAfterReset2 =
+                gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        TableDescriptor gottenTableAfterReset2 =
+                
TableDescriptor.fromJsonBytes(responseAfterReset2.getTableJson());
+        assertThat(
+                        gottenTableAfterReset2
+                                .getProperties()
+                                
.containsKey(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+                .isFalse();
+
+        // cleanup
+        adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get();
+        adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, 
true)).get();
+    }
+
+    @Test
+    void testAlterTableDatalakeFreshnessAffectsTiering() throws Exception {
+        AdminGateway adminGateway = getAdminGateway();
+
+        String db1 = "test_tiering_freshness_db";
+        String tb1 = "tb1";
+        TablePath tablePath = TablePath.of(db1, tb1);
+        adminGateway.createDatabase(newCreateDatabaseRequest(db1, 
false)).get();
+
+        // Step 1: Create a table with a large datalake freshness (10 minutes)
+        Map<String, String> initialProperties = new HashMap<>();
+        initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true");
+        initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), 
"10min");
+        TableDescriptor tableDescriptor = 
newPkTable().withProperties(initialProperties);
+        adminGateway.createTable(newCreateTableRequest(tablePath, 
tableDescriptor, false)).get();
+
+        // Get the table id for later verification
+        GetTableInfoResponse response =
+                
adminGateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        long tableId = response.getTableId();
+
+        LakeTableTieringManager tieringManager =
+                FLUSS_CLUSTER_EXTENSION
+                        .getCoordinatorServer()
+                        .getCoordinatorService()
+                        .getLakeTableTieringManager();
+
+        // Wait a bit for the table to be registered in tiering manager
+        Thread.sleep(1000);
+
+        // Step 2: Try to request the table for tiering within 3 seconds, 
should NOT get it
+        retry(
+                Duration.ofSeconds(3),
+                () -> {
+                    assertThat(tieringManager.requestTable()).isNull();
+                });
+
+        // Step 3: Change freshness to a very small value (100ms)
+        Map<String, String> setProperties = new HashMap<>();
+        setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), 
"100ms");
+
+        adminGateway
+                .alterTable(
+                        newAlterTableRequest(
+                                tablePath,
+                                alterTableProperties(setProperties, new 
ArrayList<>()),
+                                Collections.emptyList(),
+                                false))
+                .get();
+
+        // Step 4: Now retry requesting the table, should get it within 3 
seconds
+        retry(
+                Duration.ofSeconds(3),
+                () -> {
+                    LakeTieringTableInfo table = tieringManager.requestTable();
+                    assertThat(table).isNotNull();
+                    assertThat(table.tableId()).isEqualTo(tableId);
+                    assertThat(table.tablePath()).isEqualTo(tablePath);
+                });
+
+        // cleanup
+        adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get();
+        adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, 
true)).get();
+    }
+
+    private AdminReadOnlyGateway getAdminOnlyGateway(boolean 
isCoordinatorServer) {
+        if (isCoordinatorServer) {
+            return getAdminGateway();
+        } else {
+            return FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(0);
+        }
+    }
+
+    private AdminGateway getAdminGateway() {
+        return FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
+    }
+
+    private static TableDescriptor newPkTable() {
+        return TableDescriptor.builder()
+                .schema(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT())
+                                .withComment("a comment")
+                                .column("b", DataTypes.STRING())
+                                .primaryKey("a")
+                                .build())
+                .comment("first table")
+                .distributedBy(3, "a")
+                .build();
+    }
+
+    private static List<PbAlterConfig> alterTableProperties(
+            Map<String, String> setProperties, List<String> resetProperties) {
+        List<PbAlterConfig> res = new ArrayList<>();
+
+        for (Map.Entry<String, String> entry : setProperties.entrySet()) {
+            PbAlterConfig info = new PbAlterConfig();
+            info.setConfigKey(entry.getKey());
+            info.setConfigValue(entry.getValue());
+            info.setOpType(AlterConfigOpType.SET.value());
+            res.add(info);
+        }
+
+        for (String resetProperty : resetProperties) {
+            PbAlterConfig info = new PbAlterConfig();
+            info.setConfigKey(resetProperty);
+            info.setOpType(AlterConfigOpType.DELETE.value());
+            res.add(info);
+        }
+
+        return res;
+    }
 }
diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md
index 40e12a9c1..29330832e 100644
--- a/website/docs/engine-flink/ddl.md
+++ b/website/docs/engine-flink/ddl.md
@@ -265,9 +265,14 @@ When using SET to modify [Storage 
Options](engine-flink/options.md#storage-optio
 - All [Read Options](engine-flink/options.md#read-options), [Write 
Options](engine-flink/options.md#write-options), [Lookup 
Options](engine-flink/options.md#lookup-options) and [Other 
Options](engine-flink/options.md#other-options) except `bootstrap.servers`.
 - The following [Storage Options](engine-flink/options.md#storage-options):
   - `table.datalake.enabled`: Enable or disable lakehouse storage for the 
table.
+  - `table.datalake.freshness`: Set the data freshness for lakehouse storage.
 
 ```sql title="Flink SQL"
+-- Enable lakehouse storage for the table
 ALTER TABLE my_table SET ('table.datalake.enabled' = 'true');
+
+-- Set the freshness to 5 minutes for lakehouse storage
+ALTER TABLE my_table SET ('table.datalake.freshness' = '5min');
 ```
 
 **Limits**

Reply via email to