luoyuxia commented on code in PR #2365:
URL: https://github.com/apache/fluss/pull/2365#discussion_r2692855851


##########
website/docs/engine-flink/ddl.md:
##########
@@ -261,9 +261,14 @@ When using SET to modify [Storage 
Options](engine-flink/options.md#storage-optio
 - All [Read Options](engine-flink/options.md#read-options), [Write 
Options](engine-flink/options.md#write-options), [Lookup 
Options](engine-flink/options.md#lookup-options) and [Other 
Options](engine-flink/options.md#other-options) except `bootstrap.servers`.
 - The following [Storage Options](engine-flink/options.md#storage-options):
   - `table.datalake.enabled`: Enable or disable lakehouse storage for the 
table.
+  - `table.datalake.freshness`: Set the freshness time for lakehouse storage.

Review Comment:
   ```suggestion
     - `table.datalake.freshness`: Set the data freshness for lakehouse storage.
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -224,6 +224,48 @@ public void removeLakeTable(long tableId) {
                 });
     }
 
+    /**
+     * Update the lake freshness for a table. This method should be called 
when the table's datalake
+     * freshness property is changed via ALTER TABLE.
+     *
+     * @param tableId the table id
+     * @param newFreshnessMs the new freshness interval in milliseconds
+     */
+    public void updateTableLakeFreshness(long tableId, long newFreshnessMs) {
+        inLock(
+                lock,
+                () -> {
+                    Long currentFreshness = tableLakeFreshness.get(tableId);
+                    if (currentFreshness == null) {
+                        // the table is not a lake table or has been dropped, 
skip update
+                        LOG.warn(
+                                "Cannot update lake freshness for table {} as 
it's not tracked by lake tiering manager.",
+                                tableId);
+                        return;
+                    }
+
+                    if (currentFreshness.equals(newFreshnessMs)) {
+                        // no change, skip update
+                        return;
+                    }
+
+                    tableLakeFreshness.put(tableId, newFreshnessMs);
+                    LOG.info(
+                            "Updated lake freshness for table {} from {} ms to 
{} ms.",
+                            tableId,
+                            currentFreshness,
+                            newFreshnessMs);
+
+                    // If the table is in Scheduled state, we need to 
reschedule it with the new
+                    // freshness
+                    TieringState currentState = tieringStates.get(tableId);
+                    if (currentState == TieringState.Scheduled) {
+                        // Reschedule the table tiering with the new freshness 
interval

Review Comment:
   Before reschedule, we need to remove the existing `DelayedTiering`. So, we 
need to 
   maintain 
   `private final Map<Long, DelayedTiering> delayedTieringByTableId = new 
HashMap<>();`
   
   - put into `delayedTieringByTableId` in method `scheduleTableTiering`
   -   remove it in method `removeLakeTable`, 
   -  remove it in here, and cancel the `delayedTiering`
   - remove it in `DelayedTiering#run`
   
   



##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java:
##########
@@ -106,4 +120,234 @@ void testCreateAndGetTable() throws Exception {
                 .isInstanceOf(TableAlreadyExistException.class)
                 .hasMessage("Table %s already exists.", lakeTablePath);
     }
+
+    @Test
+    void testAlterAndResetTableDatalakeProperties() throws Exception {
+        AdminReadOnlyGateway gateway = getAdminOnlyGateway(true);
+        AdminGateway adminGateway = getAdminGateway();
+
+        String db1 = "test_alter_reset_datalake_db";
+        String tb1 = "tb1";
+        TablePath tablePath = TablePath.of(db1, tb1);
+        // first create a database
+        adminGateway.createDatabase(newCreateDatabaseRequest(db1, 
false)).get();
+
+        // Step 1: create a table with datalake enabled and initial freshness 
(5min)
+        Map<String, String> initialProperties = new HashMap<>();
+        initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true");
+        initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), 
"5min");
+        TableDescriptor tableDescriptor = 
newPkTable().withProperties(initialProperties);
+        adminGateway.createTable(newCreateTableRequest(tablePath, 
tableDescriptor, false)).get();
+
+        // Step 2: verify initial properties
+        GetTableInfoResponse response =
+                gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        TableDescriptor gottenTable = 
TableDescriptor.fromJsonBytes(response.getTableJson());
+        
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+                .isEqualTo("true");
+        
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+                .isEqualTo("5min");
+
+        // Step 3: alter table to change datalake freshness (SET operation)
+        Map<String, String> setProperties = new HashMap<>();
+        setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), 
"3min");
+
+        adminGateway
+                .alterTable(
+                        newAlterTableRequest(
+                                tablePath,
+                                alterTableProperties(setProperties, new 
ArrayList<>()),
+                                Collections.emptyList(),
+                                false))
+                .get();
+
+        // Step 4: verify freshness was updated to 3min
+        GetTableInfoResponse responseAfterSet =
+                gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        TableDescriptor gottenTableAfterSet =
+                TableDescriptor.fromJsonBytes(responseAfterSet.getTableJson());
+        assertThat(
+                        gottenTableAfterSet
+                                .getProperties()
+                                
.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+                .isEqualTo("3min");
+        assertThat(
+                        gottenTableAfterSet
+                                .getProperties()
+                                
.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+                .isEqualTo("true");
+
+        // Step 5: reset datalake freshness property (RESET operation)
+        List<String> resetProperties = new ArrayList<>();
+        resetProperties.add(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
+
+        adminGateway
+                .alterTable(
+                        newAlterTableRequest(
+                                tablePath,
+                                alterTableProperties(new HashMap<>(), 
resetProperties),
+                                Collections.emptyList(),
+                                false))
+                .get();
+
+        // Step 6: verify freshness was removed but datalake.enabled remains
+        GetTableInfoResponse responseAfterReset =
+                gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        TableDescriptor gottenTableAfterReset =
+                
TableDescriptor.fromJsonBytes(responseAfterReset.getTableJson());
+        assertThat(
+                        gottenTableAfterReset
+                                .getProperties()
+                                
.containsKey(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+                .isFalse();
+        assertThat(
+                        gottenTableAfterReset
+                                .getProperties()
+                                
.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+                .isEqualTo("true");
+
+        // Step 7: reset datalake enabled property
+        List<String> resetProperties2 = new ArrayList<>();
+        resetProperties2.add(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+
+        adminGateway
+                .alterTable(
+                        newAlterTableRequest(
+                                tablePath,
+                                alterTableProperties(new HashMap<>(), 
resetProperties2),
+                                Collections.emptyList(),
+                                false))
+                .get();
+
+        // Step 8: verify datalake.enabled was also removed
+        GetTableInfoResponse responseAfterReset2 =
+                gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        TableDescriptor gottenTableAfterReset2 =
+                
TableDescriptor.fromJsonBytes(responseAfterReset2.getTableJson());
+        assertThat(
+                        gottenTableAfterReset2
+                                .getProperties()
+                                
.containsKey(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+                .isFalse();
+
+        // cleanup
+        adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get();
+        adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, 
true)).get();
+    }
+
+    @Test
+    void testAlterTableDatalakeFreshnessAffectsTiering() throws Exception {
+        AdminGateway adminGateway = getAdminGateway();
+
+        String db1 = "test_tiering_freshness_db";
+        String tb1 = "tb1";
+        TablePath tablePath = TablePath.of(db1, tb1);
+        adminGateway.createDatabase(newCreateDatabaseRequest(db1, 
false)).get();
+
+        // Step 1: Create a table with a large datalake freshness (10 minutes)
+        Map<String, String> initialProperties = new HashMap<>();
+        initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true");
+        initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), 
"10min");
+        TableDescriptor tableDescriptor = 
newPkTable().withProperties(initialProperties);
+        adminGateway.createTable(newCreateTableRequest(tablePath, 
tableDescriptor, false)).get();
+
+        // Get the table id for later verification
+        GetTableInfoResponse response =
+                
adminGateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        long tableId = response.getTableId();
+
+        LakeTableTieringManager tieringManager =
+                FLUSS_CLUSTER_EXTENSION
+                        .getCoordinatorServer()
+                        .getCoordinatorService()
+                        .getLakeTableTieringManager();
+
+        // Wait a bit for the table to be registered in tiering manager
+        Thread.sleep(1000);
+
+        // Step 2: Try to request the table for tiering within 3 seconds, 
should NOT get it
+        retry(

Review Comment:
   change retry to
   ```suggestion
           assertThat(tieringManager.requestTable()).isNull();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to