This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 2fc820dff [lake] Support alter table.datalake.freshness (#2365)
2fc820dff is described below
commit 2fc820dff9ab7e6f5b27ab07ffb5d17787a414f1
Author: zhaomin1423 <[email protected]>
AuthorDate: Tue Jan 20 15:35:14 2026 +0800
[lake] Support alter table.datalake.freshness (#2365)
---
.../org/apache/fluss/config/FlussConfigUtils.java | 6 +-
.../server/coordinator/CoordinatorService.java | 5 +
.../coordinator/LakeTableTieringManager.java | 69 +++++-
.../fluss/server/coordinator/MetadataManager.java | 25 ++-
.../server/coordinator/LakeTableManagerITCase.java | 241 +++++++++++++++++++++
website/docs/engine-flink/ddl.md | 5 +
6 files changed, 339 insertions(+), 12 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index fa9c4274c..af4044458 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -21,7 +21,7 @@ import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.VisibleForTesting;
import java.lang.reflect.Field;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,7 +42,9 @@ public class FlussConfigUtils {
TABLE_OPTIONS = extractConfigOptions("table.");
CLIENT_OPTIONS = extractConfigOptions("client.");
ALTERABLE_TABLE_OPTIONS =
-
Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+ Arrays.asList(
+ ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
+ ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
}
public static boolean isTableStorageConfig(String key) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 70b359e4d..94d56dd33 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -919,6 +919,11 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
return
lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat();
}
+ @VisibleForTesting
+ public LakeTableTieringManager getLakeTableTieringManager() {
+ return lakeTableTieringManager;
+ }
+
private void validateHeartbeatRequest(
PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) {
if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
index 165ae7b84..c25c77969 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
@@ -130,6 +130,9 @@ public class LakeTableTieringManager implements
AutoCloseable {
// from table_id -> last heartbeat time by the tiering service
private final Map<Long, Long> liveTieringTableIds;
+ // table_id -> delayed tiering task
+ private final Map<Long, DelayedTiering> delayedTieringByTableId;
+
private final Lock lock = new ReentrantLock();
public LakeTableTieringManager() {
@@ -159,6 +162,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
this::checkTieringServiceTimeout, 0, 15, TimeUnit.SECONDS);
this.tableTierEpoch = new HashMap<>();
this.tableLastTieredTime = new HashMap<>();
+ this.delayedTieringByTableId = new HashMap<>();
}
public void initWithLakeTables(List<Tuple2<TableInfo, Long>>
tableInfoWithTieredTime) {
@@ -205,10 +209,17 @@ public class LakeTableTieringManager implements
AutoCloseable {
// the table has been dropped, return directly
return;
}
+ // Before reschedule, remove the existing DelayedTiering if present
+ DelayedTiering existingDelayedTiering =
delayedTieringByTableId.remove(tableId);
+ if (existingDelayedTiering != null) {
+ existingDelayedTiering.cancel();
+ }
long delayMs = freshnessInterval - (clock.milliseconds() -
lastTieredTime);
// if the delayMs is < 0, the DelayedTiering will be triggered at once
without
// adding into timing wheel.
- lakeTieringScheduleTimer.add(new DelayedTiering(tableId, delayMs));
+ DelayedTiering delayedTiering = new DelayedTiering(tableId, delayMs);
+ delayedTieringByTableId.put(tableId, delayedTiering);
+ lakeTieringScheduleTimer.add(delayedTiering);
}
public void removeLakeTable(long tableId) {
@@ -221,6 +232,53 @@ public class LakeTableTieringManager implements
AutoCloseable {
tieringStates.remove(tableId);
liveTieringTableIds.remove(tableId);
tableTierEpoch.remove(tableId);
+ // Remove and cancel the delayed tiering task if present
+ DelayedTiering delayedTiering =
delayedTieringByTableId.remove(tableId);
+ if (delayedTiering != null) {
+ delayedTiering.cancel();
+ }
+ });
+ }
+
+ /**
+ * Update the lake freshness for a table. This method should be called
when the table's datalake
+ * freshness property is changed via ALTER TABLE.
+ *
+ * @param tableId the table id
+ * @param newFreshnessMs the new freshness interval in milliseconds
+ */
+ public void updateTableLakeFreshness(long tableId, long newFreshnessMs) {
+ inLock(
+ lock,
+ () -> {
+ Long currentFreshness = tableLakeFreshness.get(tableId);
+ if (currentFreshness == null) {
+ // the table is not a lake table or has been dropped,
skip update
+ LOG.warn(
+ "Cannot update lake freshness for table {} as
it's not tracked by lake tiering manager.",
+ tableId);
+ return;
+ }
+
+ if (currentFreshness.equals(newFreshnessMs)) {
+ // no change, skip update
+ return;
+ }
+
+ tableLakeFreshness.put(tableId, newFreshnessMs);
+ LOG.info(
+ "Updated lake freshness for table {} from {} ms to
{} ms.",
+ tableId,
+ currentFreshness,
+ newFreshnessMs);
+
+ // If the table is in Scheduled state, we need to
reschedule it with the new
+ // freshness
+ TieringState currentState = tieringStates.get(tableId);
+ if (currentState == TieringState.Scheduled) {
+ // Reschedule the table tiering with the new freshness
interval
+ scheduleTableTiering(tableId);
+ }
});
}
@@ -458,9 +516,12 @@ public class LakeTableTieringManager implements
AutoCloseable {
public void run() {
inLock(
lock,
- () ->
- // to pending state
- doHandleStateChange(tableId,
TieringState.Pending));
+ () -> {
+ // to pending state
+ doHandleStateChange(tableId, TieringState.Pending);
+ // Remove from map after execution
+ delayedTieringByTableId.remove(tableId);
+ });
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 05c5d51de..f728e91da 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -60,12 +60,14 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -516,12 +518,9 @@ public class MetadataManager {
TableRegistration newTableRegistration,
LakeTableTieringManager lakeTableTieringManager) {
- boolean toEnableDataLake =
- !isDataLakeEnabled(oldTableDescriptor)
- && isDataLakeEnabled(newTableRegistration.properties);
- boolean toDisableDataLake =
- isDataLakeEnabled(oldTableDescriptor)
- && !isDataLakeEnabled(newTableRegistration.properties);
+ boolean dataLakeEnabled =
isDataLakeEnabled(newTableRegistration.properties);
+ boolean toEnableDataLake = !isDataLakeEnabled(oldTableDescriptor) &&
dataLakeEnabled;
+ boolean toDisableDataLake = isDataLakeEnabled(oldTableDescriptor) &&
!dataLakeEnabled;
if (toEnableDataLake) {
TableInfo newTableInfo =
newTableRegistration.toTableInfo(tablePath, schemaInfo);
@@ -529,6 +528,20 @@ public class MetadataManager {
lakeTableTieringManager.addNewLakeTable(newTableInfo);
} else if (toDisableDataLake) {
lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId);
+ } else if (dataLakeEnabled) {
+ // The table is still a lake table, check if freshness has changed
+ Duration oldFreshness =
+ Configuration.fromMap(oldTableDescriptor.getProperties())
+ .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
+ Duration newFreshness =
+ Configuration.fromMap(newTableRegistration.properties)
+ .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
+
+ // Check if freshness has changed
+ if (!Objects.equals(oldFreshness, newFreshness)) {
+ lakeTableTieringManager.updateTableLakeFreshness(
+ newTableRegistration.tableId, newFreshness.toMillis());
+ }
}
// more post-alter actions can be added here
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
index fb653b7d6..2472d5041 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
@@ -19,23 +19,37 @@ package org.apache.fluss.server.coordinator;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.gateway.AdminGateway;
+import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
+import org.apache.fluss.rpc.messages.GetTableInfoResponse;
+import org.apache.fluss.rpc.messages.PbAlterConfig;
+import org.apache.fluss.server.entity.LakeTieringTableInfo;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.types.DataTypes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableRequest;
+import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateDatabaseRequest;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateTableRequest;
+import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropDatabaseRequest;
+import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newGetTableInfoRequest;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -106,4 +120,231 @@ class LakeTableManagerITCase {
.isInstanceOf(TableAlreadyExistException.class)
.hasMessage("Table %s already exists.", lakeTablePath);
}
+
+ @Test
+ void testAlterAndResetTableDatalakeProperties() throws Exception {
+ AdminReadOnlyGateway gateway = getAdminOnlyGateway(true);
+ AdminGateway adminGateway = getAdminGateway();
+
+ String db1 = "test_alter_reset_datalake_db";
+ String tb1 = "tb1";
+ TablePath tablePath = TablePath.of(db1, tb1);
+ // first create a database
+ adminGateway.createDatabase(newCreateDatabaseRequest(db1,
false)).get();
+
+ // Step 1: create a table with datalake enabled and initial freshness
(5min)
+ Map<String, String> initialProperties = new HashMap<>();
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
"5min");
+ TableDescriptor tableDescriptor =
newPkTable().withProperties(initialProperties);
+ adminGateway.createTable(newCreateTableRequest(tablePath,
tableDescriptor, false)).get();
+
+ // Step 2: verify initial properties
+ GetTableInfoResponse response =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTable =
TableDescriptor.fromJsonBytes(response.getTableJson());
+
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isEqualTo("true");
+
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+ .isEqualTo("5min");
+
+ // Step 3: alter table to change datalake freshness (SET operation)
+ Map<String, String> setProperties = new HashMap<>();
+ setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
"3min");
+
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ alterTableProperties(setProperties, new
ArrayList<>()),
+ Collections.emptyList(),
+ false))
+ .get();
+
+ // Step 4: verify freshness was updated to 3min
+ GetTableInfoResponse responseAfterSet =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTableAfterSet =
+ TableDescriptor.fromJsonBytes(responseAfterSet.getTableJson());
+ assertThat(
+ gottenTableAfterSet
+ .getProperties()
+
.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+ .isEqualTo("3min");
+ assertThat(
+ gottenTableAfterSet
+ .getProperties()
+
.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isEqualTo("true");
+
+ // Step 5: reset datalake freshness property (RESET operation)
+ List<String> resetProperties = new ArrayList<>();
+ resetProperties.add(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
+
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ alterTableProperties(new HashMap<>(),
resetProperties),
+ Collections.emptyList(),
+ false))
+ .get();
+
+ // Step 6: verify freshness was removed but datalake.enabled remains
+ GetTableInfoResponse responseAfterReset =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTableAfterReset =
+
TableDescriptor.fromJsonBytes(responseAfterReset.getTableJson());
+ assertThat(
+ gottenTableAfterReset
+ .getProperties()
+
.containsKey(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+ .isFalse();
+ assertThat(
+ gottenTableAfterReset
+ .getProperties()
+
.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isEqualTo("true");
+
+ // Step 7: reset datalake enabled property
+ List<String> resetProperties2 = new ArrayList<>();
+ resetProperties2.add(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ alterTableProperties(new HashMap<>(),
resetProperties2),
+ Collections.emptyList(),
+ false))
+ .get();
+
+ // Step 8: verify datalake.enabled was also removed
+ GetTableInfoResponse responseAfterReset2 =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTableAfterReset2 =
+
TableDescriptor.fromJsonBytes(responseAfterReset2.getTableJson());
+ assertThat(
+ gottenTableAfterReset2
+ .getProperties()
+
.containsKey(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isFalse();
+
+ // cleanup
+ adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get();
+ adminGateway.dropDatabase(newDropDatabaseRequest(db1, false,
true)).get();
+ }
+
+ @Test
+ void testAlterTableDatalakeFreshnessAffectsTiering() throws Exception {
+ AdminGateway adminGateway = getAdminGateway();
+
+ String db1 = "test_tiering_freshness_db";
+ String tb1 = "tb1";
+ TablePath tablePath = TablePath.of(db1, tb1);
+ adminGateway.createDatabase(newCreateDatabaseRequest(db1,
false)).get();
+
+ // Step 1: Create a table with a large datalake freshness (10 minutes)
+ Map<String, String> initialProperties = new HashMap<>();
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
"10min");
+ TableDescriptor tableDescriptor =
newPkTable().withProperties(initialProperties);
+ adminGateway.createTable(newCreateTableRequest(tablePath,
tableDescriptor, false)).get();
+
+ // Get the table id for later verification
+ GetTableInfoResponse response =
+
adminGateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ long tableId = response.getTableId();
+
+ LakeTableTieringManager tieringManager =
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getLakeTableTieringManager();
+
+ // Wait a bit for the table to be registered in tiering manager
+ Thread.sleep(1000);
+
+ // Step 2: Try to request the table for tiering within 3 seconds,
should NOT get it
+ retry(
+ Duration.ofSeconds(3),
+ () -> {
+ assertThat(tieringManager.requestTable()).isNull();
+ });
+
+ // Step 3: Change freshness to a very small value (100ms)
+ Map<String, String> setProperties = new HashMap<>();
+ setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
"100ms");
+
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ alterTableProperties(setProperties, new
ArrayList<>()),
+ Collections.emptyList(),
+ false))
+ .get();
+
+ // Step 4: Now retry requesting the table, should get it within 3
seconds
+ retry(
+ Duration.ofSeconds(3),
+ () -> {
+ LakeTieringTableInfo table = tieringManager.requestTable();
+ assertThat(table).isNotNull();
+ assertThat(table.tableId()).isEqualTo(tableId);
+ assertThat(table.tablePath()).isEqualTo(tablePath);
+ });
+
+ // cleanup
+ adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get();
+ adminGateway.dropDatabase(newDropDatabaseRequest(db1, false,
true)).get();
+ }
+
+ private AdminReadOnlyGateway getAdminOnlyGateway(boolean
isCoordinatorServer) {
+ if (isCoordinatorServer) {
+ return getAdminGateway();
+ } else {
+ return FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(0);
+ }
+ }
+
+ private AdminGateway getAdminGateway() {
+ return FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
+ }
+
+ private static TableDescriptor newPkTable() {
+ return TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .withComment("a comment")
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .build())
+ .comment("first table")
+ .distributedBy(3, "a")
+ .build();
+ }
+
+ private static List<PbAlterConfig> alterTableProperties(
+ Map<String, String> setProperties, List<String> resetProperties) {
+ List<PbAlterConfig> res = new ArrayList<>();
+
+ for (Map.Entry<String, String> entry : setProperties.entrySet()) {
+ PbAlterConfig info = new PbAlterConfig();
+ info.setConfigKey(entry.getKey());
+ info.setConfigValue(entry.getValue());
+ info.setOpType(AlterConfigOpType.SET.value());
+ res.add(info);
+ }
+
+ for (String resetProperty : resetProperties) {
+ PbAlterConfig info = new PbAlterConfig();
+ info.setConfigKey(resetProperty);
+ info.setOpType(AlterConfigOpType.DELETE.value());
+ res.add(info);
+ }
+
+ return res;
+ }
}
diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md
index 40e12a9c1..29330832e 100644
--- a/website/docs/engine-flink/ddl.md
+++ b/website/docs/engine-flink/ddl.md
@@ -265,9 +265,14 @@ When using SET to modify [Storage
Options](engine-flink/options.md#storage-optio
- All [Read Options](engine-flink/options.md#read-options), [Write
Options](engine-flink/options.md#write-options), [Lookup
Options](engine-flink/options.md#lookup-options) and [Other
Options](engine-flink/options.md#other-options) except `bootstrap.servers`.
- The following [Storage Options](engine-flink/options.md#storage-options):
- `table.datalake.enabled`: Enable or disable lakehouse storage for the
table.
+ - `table.datalake.freshness`: Set the data freshness for lakehouse storage.
```sql title="Flink SQL"
+-- Enable lakehouse storage for the table
ALTER TABLE my_table SET ('table.datalake.enabled' = 'true');
+
+-- Set the freshness to 5 minutes for lakehouse storage
+ALTER TABLE my_table SET ('table.datalake.freshness' = '5min');
```
**Limits**