luoyuxia commented on code in PR #2365:
URL: https://github.com/apache/fluss/pull/2365#discussion_r2692855851
##########
website/docs/engine-flink/ddl.md:
##########
@@ -261,9 +261,14 @@ When using SET to modify [Storage
Options](engine-flink/options.md#storage-optio
- All [Read Options](engine-flink/options.md#read-options), [Write
Options](engine-flink/options.md#write-options), [Lookup
Options](engine-flink/options.md#lookup-options) and [Other
Options](engine-flink/options.md#other-options) except `bootstrap.servers`.
- The following [Storage Options](engine-flink/options.md#storage-options):
- `table.datalake.enabled`: Enable or disable lakehouse storage for the
table.
+ - `table.datalake.freshness`: Set the freshness time for lakehouse storage.
Review Comment:
```suggestion
- `table.datalake.freshness`: Set the data freshness for lakehouse storage.
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -224,6 +224,48 @@ public void removeLakeTable(long tableId) {
});
}
+ /**
+ * Update the lake freshness for a table. This method should be called
when the table's datalake
+ * freshness property is changed via ALTER TABLE.
+ *
+ * @param tableId the table id
+ * @param newFreshnessMs the new freshness interval in milliseconds
+ */
+ public void updateTableLakeFreshness(long tableId, long newFreshnessMs) {
+ inLock(
+ lock,
+ () -> {
+ Long currentFreshness = tableLakeFreshness.get(tableId);
+ if (currentFreshness == null) {
+ // the table is not a lake table or has been dropped,
skip update
+ LOG.warn(
+ "Cannot update lake freshness for table {} as
it's not tracked by lake tiering manager.",
+ tableId);
+ return;
+ }
+
+ if (currentFreshness.equals(newFreshnessMs)) {
+ // no change, skip update
+ return;
+ }
+
+ tableLakeFreshness.put(tableId, newFreshnessMs);
+ LOG.info(
+ "Updated lake freshness for table {} from {} ms to
{} ms.",
+ tableId,
+ currentFreshness,
+ newFreshnessMs);
+
+ // If the table is in Scheduled state, we need to
reschedule it with the new
+ // freshness
+ TieringState currentState = tieringStates.get(tableId);
+ if (currentState == TieringState.Scheduled) {
+ // Reschedule the table tiering with the new freshness
interval
Review Comment:
Before reschedule, we need to remove the existing `DelayedTiering`. So, we
need to
maintain
`private final Map<Long, DelayedTiering> delayedTieringByTableId = new
HashMap<>();`
- put into `delayedTieringByTableId` in method `scheduleTableTiering`
- remove it in method `removeLakeTable`,
- remove it in here, and cancel the `delayedTiering`
- remove it in `DelayedTiering#run`
##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java:
##########
@@ -106,4 +120,234 @@ void testCreateAndGetTable() throws Exception {
.isInstanceOf(TableAlreadyExistException.class)
.hasMessage("Table %s already exists.", lakeTablePath);
}
+
+ @Test
+ void testAlterAndResetTableDatalakeProperties() throws Exception {
+ AdminReadOnlyGateway gateway = getAdminOnlyGateway(true);
+ AdminGateway adminGateway = getAdminGateway();
+
+ String db1 = "test_alter_reset_datalake_db";
+ String tb1 = "tb1";
+ TablePath tablePath = TablePath.of(db1, tb1);
+ // first create a database
+ adminGateway.createDatabase(newCreateDatabaseRequest(db1,
false)).get();
+
+ // Step 1: create a table with datalake enabled and initial freshness
(5min)
+ Map<String, String> initialProperties = new HashMap<>();
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
"5min");
+ TableDescriptor tableDescriptor =
newPkTable().withProperties(initialProperties);
+ adminGateway.createTable(newCreateTableRequest(tablePath,
tableDescriptor, false)).get();
+
+ // Step 2: verify initial properties
+ GetTableInfoResponse response =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTable =
TableDescriptor.fromJsonBytes(response.getTableJson());
+
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isEqualTo("true");
+
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+ .isEqualTo("5min");
+
+ // Step 3: alter table to change datalake freshness (SET operation)
+ Map<String, String> setProperties = new HashMap<>();
+ setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
"3min");
+
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ alterTableProperties(setProperties, new
ArrayList<>()),
+ Collections.emptyList(),
+ false))
+ .get();
+
+ // Step 4: verify freshness was updated to 3min
+ GetTableInfoResponse responseAfterSet =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTableAfterSet =
+ TableDescriptor.fromJsonBytes(responseAfterSet.getTableJson());
+ assertThat(
+ gottenTableAfterSet
+ .getProperties()
+
.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+ .isEqualTo("3min");
+ assertThat(
+ gottenTableAfterSet
+ .getProperties()
+
.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isEqualTo("true");
+
+ // Step 5: reset datalake freshness property (RESET operation)
+ List<String> resetProperties = new ArrayList<>();
+ resetProperties.add(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
+
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ alterTableProperties(new HashMap<>(),
resetProperties),
+ Collections.emptyList(),
+ false))
+ .get();
+
+ // Step 6: verify freshness was removed but datalake.enabled remains
+ GetTableInfoResponse responseAfterReset =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTableAfterReset =
+
TableDescriptor.fromJsonBytes(responseAfterReset.getTableJson());
+ assertThat(
+ gottenTableAfterReset
+ .getProperties()
+
.containsKey(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+ .isFalse();
+ assertThat(
+ gottenTableAfterReset
+ .getProperties()
+
.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isEqualTo("true");
+
+ // Step 7: reset datalake enabled property
+ List<String> resetProperties2 = new ArrayList<>();
+ resetProperties2.add(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ alterTableProperties(new HashMap<>(),
resetProperties2),
+ Collections.emptyList(),
+ false))
+ .get();
+
+ // Step 8: verify datalake.enabled was also removed
+ GetTableInfoResponse responseAfterReset2 =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTableAfterReset2 =
+
TableDescriptor.fromJsonBytes(responseAfterReset2.getTableJson());
+ assertThat(
+ gottenTableAfterReset2
+ .getProperties()
+
.containsKey(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isFalse();
+
+ // cleanup
+ adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get();
+ adminGateway.dropDatabase(newDropDatabaseRequest(db1, false,
true)).get();
+ }
+
+ @Test
+ void testAlterTableDatalakeFreshnessAffectsTiering() throws Exception {
+ AdminGateway adminGateway = getAdminGateway();
+
+ String db1 = "test_tiering_freshness_db";
+ String tb1 = "tb1";
+ TablePath tablePath = TablePath.of(db1, tb1);
+ adminGateway.createDatabase(newCreateDatabaseRequest(db1,
false)).get();
+
+ // Step 1: Create a table with a large datalake freshness (10 minutes)
+ Map<String, String> initialProperties = new HashMap<>();
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
"10min");
+ TableDescriptor tableDescriptor =
newPkTable().withProperties(initialProperties);
+ adminGateway.createTable(newCreateTableRequest(tablePath,
tableDescriptor, false)).get();
+
+ // Get the table id for later verification
+ GetTableInfoResponse response =
+
adminGateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ long tableId = response.getTableId();
+
+ LakeTableTieringManager tieringManager =
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getLakeTableTieringManager();
+
+ // Wait a bit for the table to be registered in tiering manager
+ Thread.sleep(1000);
+
+ // Step 2: Try to request the table for tiering within 3 seconds,
should NOT get it
+ retry(
Review Comment:
change retry to
```suggestion
assertThat(tieringManager.requestTable()).isNull();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]