This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 852737e5b [server] Allow user to alter partition num retention. (#2964)
852737e5b is described below
commit 852737e5b7ec0a296545822b73311a50e2e349f4
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Apr 8 13:59:48 2026 +0800
[server] Allow user to alter partition num retention. (#2964)
---
.../org/apache/fluss/config/FlussConfigUtils.java | 3 +-
.../fluss/flink/catalog/FlinkCatalogITCase.java | 56 +++++++++++++++++
.../apache/fluss/server/DynamicServerConfig.java | 2 +-
.../server/coordinator/AutoPartitionManager.java | 73 ++++++++++++++--------
.../coordinator/CoordinatorEventProcessor.java | 10 +++
.../coordinator/AutoPartitionManagerTest.java | 69 ++++++++++++++++++++
6 files changed, 186 insertions(+), 27 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index b29f2a74b..bf9b1428e 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -48,7 +48,8 @@ public class FlussConfigUtils {
Arrays.asList(
ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
- ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key());
+ ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(),
+
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key());
}
public static boolean isTableStorageConfig(String key) {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index e427ed324..eebe58611 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -579,6 +579,62 @@ abstract class FlinkCatalogITCase {
assertResultsIgnoreOrder(showPartitionIterator,
expectedShowPartitionsResult, true);
}
+ @Test
+ void testAlterAutoPartitionRetention() throws Exception {
+ String tblName = "test_alter_auto_partition_retention";
+ ObjectPath objectPath = new ObjectPath(DEFAULT_DB, tblName);
+
+ // Create an auto-partitioned table with HOUR time unit and retention=3
+ tEnv.executeSql(
+ "create table "
+ + tblName
+ + " (a int, b string) partitioned by (b) "
+ + "with ('table.auto-partition.enabled' = 'true',"
+ + " 'table.auto-partition.time-unit' = 'hour',"
+ + " 'table.auto-partition.num-retention' = '3')");
+
+ TablePath tablePath = new TablePath(DEFAULT_DB, tblName);
+ FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
+
+ String datetimePattern = "yyyyMMddHH";
+ String oldPartition =
+ LocalDateTime.now()
+ .minusHours(3)
+ .format(DateTimeFormatter.ofPattern(datetimePattern));
+
+ tEnv.executeSql(
+ String.format("alter table %s add partition (b = '%s')",
tblName, oldPartition));
+ FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 3);
+ CloseableIterator<Row> showPartitionIterator =
+ tEnv.executeSql("show partitions " + tblName).collect();
+ List<String> partitions =
+ CollectionUtil.iteratorToList(showPartitionIterator).stream()
+ .map(Row::toString)
+ .collect(Collectors.toList());
+ assertThat(partitions).contains(String.format("+I[b=%s]",
oldPartition));
+
+ // Alter retention from 3 to 1
+ tEnv.executeSql(
+ "alter table " + tblName + " set
('table.auto-partition.num-retention' = '1')");
+
+ // The old partition should be dropped after the periodic check fires
+ FLUSS_CLUSTER_EXTENSION.waitUntilPartitionsDropped(
+ tablePath, Collections.singletonList(oldPartition));
+
+ // Verify the old partition is no longer listed
+ showPartitionIterator = tEnv.executeSql("show partitions " +
tblName).collect();
+ partitions =
+ CollectionUtil.iteratorToList(showPartitionIterator).stream()
+ .map(Row::toString)
+ .collect(Collectors.toList());
+ assertThat(partitions).doesNotContain(String.format("+I[b=%s]",
oldPartition));
+
+ // Verify the altered property is persisted
+ CatalogTable table = (CatalogTable) catalog.getTable(objectPath);
+
assertThat(table.getOptions().get(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key()))
+ .isEqualTo("1");
+ }
+
@Test
void testTableWithExpression() throws Exception {
// create a table with watermark and computed column
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
index d4430bb50..634a91a1c 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
@@ -164,7 +164,7 @@ class DynamicServerConfig {
}
// Build new configuration by merging initial + dynamic configs
- Map<String, String> newConfigMap = buildConfigMap(effectiveChanges);
+ Map<String, String> newConfigMap = buildConfigMap(newDynamicConfigs);
Configuration newConfig = Configuration.fromMap(newConfigMap);
// Apply changes to all registered ServerReconfigurable instances
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
index 5f1673974..695c770d4 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
@@ -136,31 +136,34 @@ public class AutoPartitionManager implements
AutoCloseable {
tableInfos.forEach(tableInfo -> addAutoPartitionTable(tableInfo,
false));
}
- public void addAutoPartitionTable(TableInfo tableInfo, boolean
forceDoAutoPartition) {
+ public void updateAutoPartitionTables(TableInfo tableInfo) {
checkNotClosed();
long tableId = tableInfo.getTableId();
+ LOG.info("Updating auto partition table [{}] (id={})",
tableInfo.getTablePath(), tableId);
Set<String> partitions =
metadataManager.getPartitions(tableInfo.getTablePath());
inLock(
lock,
() -> {
- autoPartitionTables.put(tableId, tableInfo);
- TreeMap<String, Set<String>> partitionMap =
- partitionsByTable.computeIfAbsent(
- tableInfo.getTableId(), k -> new
TreeMap<>());
- checkNotNull(partitionMap, "Partition map is null.");
- partitions.forEach(
- partitionName ->
- addPartitionToPartitionsByTable(
- tableInfo, partitionMap,
partitionName));
- if
(tableInfo.getTableConfig().getAutoPartitionStrategy().timeUnit()
- == AutoPartitionTimeUnit.DAY) {
- // get the delay minutes to create partition
- int delayMinutes =
ThreadLocalRandom.current().nextInt(60 * 23);
-
- autoCreateDayPartitionDelayMinutes.put(tableId,
delayMinutes);
- }
+ // Remove old state
+ removeAutoPartitionTableLocked(tableId);
+ // Add new state
+ addAutoPartitionTableLocked(tableInfo, partitions);
});
+ // schedule auto partition for this table immediately
+ periodicExecutor.schedule(() -> doAutoPartition(tableId, true), 0,
TimeUnit.MILLISECONDS);
+ LOG.info(
+ "Updated auto partition table [{}] (id={}) in scheduler",
+ tableInfo.getTablePath(),
+ tableId);
+ }
+
+ public void addAutoPartitionTable(TableInfo tableInfo, boolean
forceDoAutoPartition) {
+ checkNotClosed();
+ long tableId = tableInfo.getTableId();
+ Set<String> partitions =
metadataManager.getPartitions(tableInfo.getTablePath());
+ inLock(lock, () -> addAutoPartitionTableLocked(tableInfo, partitions));
+
// schedule auto partition for this table immediately
periodicExecutor.schedule(
() -> doAutoPartition(tableId, forceDoAutoPartition), 0,
TimeUnit.MILLISECONDS);
@@ -172,14 +175,7 @@ public class AutoPartitionManager implements AutoCloseable
{
public void removeAutoPartitionTable(long tableId) {
checkNotClosed();
- TableInfo tableInfo =
- inLock(
- lock,
- () -> {
- partitionsByTable.remove(tableId);
- autoCreateDayPartitionDelayMinutes.remove(tableId);
- return autoPartitionTables.remove(tableId);
- });
+ TableInfo tableInfo = inLock(lock, () ->
removeAutoPartitionTableLocked(tableId));
if (tableInfo != null) {
LOG.info(
"Removed auto partition table [{}] (id={}) from scheduler",
@@ -188,6 +184,32 @@ public class AutoPartitionManager implements AutoCloseable
{
}
}
+ /** Must be called while holding {@link #lock}. */
+ @Nullable
+ private TableInfo removeAutoPartitionTableLocked(long tableId) {
+ partitionsByTable.remove(tableId);
+ autoCreateDayPartitionDelayMinutes.remove(tableId);
+ return autoPartitionTables.remove(tableId);
+ }
+
+ /** Must be called while holding {@link #lock}. */
+ private void addAutoPartitionTableLocked(TableInfo tableInfo, Set<String>
partitions) {
+ long tableId = tableInfo.getTableId();
+ autoPartitionTables.put(tableId, tableInfo);
+ TreeMap<String, Set<String>> partitionMap =
+ partitionsByTable.computeIfAbsent(tableId, k -> new
TreeMap<>());
+ checkNotNull(partitionMap, "Partition map is null.");
+ partitions.forEach(
+ partitionName ->
+ addPartitionToPartitionsByTable(tableInfo,
partitionMap, partitionName));
+ if (tableInfo.getTableConfig().getAutoPartitionStrategy().timeUnit()
+ == AutoPartitionTimeUnit.DAY) {
+ // get the delay minutes to create partition
+ int delayMinutes = ThreadLocalRandom.current().nextInt(60 * 23);
+ autoCreateDayPartitionDelayMinutes.put(tableId, delayMinutes);
+ }
+ }
+
/**
* Try to add a partition to cache if this table is autoPartitionedTable
and partition not
* exists in cache.
@@ -312,6 +334,7 @@ public class AutoPartitionManager implements AutoCloseable {
tableId);
continue;
}
+
dropPartitions(
tablePath,
tableInfo.getPartitionKeys(),
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index 4668593ea..7223c0f01 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -121,6 +121,7 @@ import
org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode;
import org.apache.fluss.server.zk.data.ZkData.TableIdsZNode;
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
+import org.apache.fluss.utils.AutoPartitionStrategy;
import org.apache.fluss.utils.types.Tuple2;
import org.slf4j.Logger;
@@ -797,6 +798,15 @@ public class CoordinatorEventProcessor implements
EventProcessor {
newTableInfo.getTableId(), newFreshness.toMillis());
}
}
+
+ AutoPartitionStrategy autoPartitionStrategy =
+ newTableInfo.getTableConfig().getAutoPartitionStrategy();
+ if (autoPartitionStrategy.isAutoPartitionEnabled()
+ && autoPartitionStrategy.numToRetain()
+ !=
oldTableInfo.getTableConfig().getAutoPartitionStrategy().numToRetain()) {
+ autoPartitionManager.updateAutoPartitionTables(newTableInfo);
+ }
+
// more post-alter actions can be added here
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
index 569cd1209..5114ee115 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
@@ -566,6 +566,53 @@ class AutoPartitionManagerTest {
assertThat(partitionsNum).isEqualTo(3);
}
+ @Test
+ void testUpdateAutoPartitionNumRetention() throws Exception {
+ // Start at a well-known time
+ ZonedDateTime startTime =
+
LocalDateTime.parse("2024-09-10T00:00:00").atZone(ZoneId.systemDefault());
+ long startMs = startTime.toInstant().toEpochMilli();
+ ManualClock clock = new ManualClock(startMs);
+ ManuallyTriggeredScheduledExecutorService periodicExecutor =
+ new ManuallyTriggeredScheduledExecutorService();
+
+ AutoPartitionManager autoPartitionManager =
+ new AutoPartitionManager(
+ new TestingServerMetadataCache(3),
+ metadataManager,
+ new Configuration(),
+ clock,
+ periodicExecutor);
+ autoPartitionManager.start();
+
+ // Create a DAY-partitioned table with numRetention=3, numPreCreate=4
+ TableInfo table = createPartitionedTable(3, 4,
AutoPartitionTimeUnit.HOUR);
+ TablePath tablePath = table.getTablePath();
+ autoPartitionManager.addAutoPartitionTable(table, true);
+ periodicExecutor.triggerNonPeriodicScheduledTask();
+
+ // pre-create 4 partitions: 2024091000, 2024091001, 2024091002,
2024091003
+ Map<String, PartitionRegistration> partitions =
+ zookeeperClient.getPartitionRegistrations(tablePath);
+ assertThat(partitions.keySet())
+ .containsExactlyInAnyOrder("2024091000", "2024091001",
"2024091002", "2024091003");
+
+ // Now update the table to numRetention=1 (more aggressive retention)
+ TableInfo updatedTable =
+ createUpdatedTableInfo(table, /* numRetention= */ 1, /*
numPreCreate= */ 4);
+ autoPartitionManager.updateAutoPartitionTables(updatedTable);
+ // Advance clock by 4 hours to trigger retention drops and new
pre-creations
+ clock.advanceTime(Duration.ofHours(4));
+ periodicExecutor.triggerNonPeriodicScheduledTask();
+
+ partitions = zookeeperClient.getPartitionRegistrations(tablePath);
+ // current partition is "2024091004", retain 1 => keep only
2024091003..2024091004
+ // pre-create 4 from current => 2024091004..2024091007 (already exist)
+ assertThat(partitions.keySet())
+ .containsExactlyInAnyOrder(
+ "2024091003", "2024091004", "2024091005",
"2024091006", "2024091007");
+ }
+
private static class TestParams {
final AutoPartitionTimeUnit timeUnit;
final boolean multiplePartitionKeys;
@@ -840,4 +887,26 @@ class AutoPartitionManagerTest {
zookeeperClient.registerTable(tablePath, registration);
return tableInfo;
}
+
+ /** Creates a new TableInfo with updated numRetention and numPreCreate,
reusing the original. */
+ private TableInfo createUpdatedTableInfo(
+ TableInfo original, int newNumRetention, int newNumPreCreate) {
+ Configuration newProperties = new
Configuration(original.getProperties());
+ newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION,
newNumRetention);
+ newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE,
newNumPreCreate);
+ return new TableInfo(
+ original.getTablePath(),
+ original.getTableId(),
+ original.getSchemaId(),
+ original.getSchema(),
+ original.getBucketKeys(),
+ original.getPartitionKeys(),
+ original.getNumBuckets(),
+ newProperties,
+ original.getCustomProperties(),
+ original.getRemoteDataDir(),
+ original.getComment().orElse(null),
+ original.getCreatedTime(),
+ System.currentTimeMillis());
+ }
}