This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 852737e5b [server] Allow user to alter partition num retention. (#2964)
852737e5b is described below

commit 852737e5b7ec0a296545822b73311a50e2e349f4
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Apr 8 13:59:48 2026 +0800

    [server] Allow user to alter partition num retention. (#2964)
---
 .../org/apache/fluss/config/FlussConfigUtils.java  |  3 +-
 .../fluss/flink/catalog/FlinkCatalogITCase.java    | 56 +++++++++++++++++
 .../apache/fluss/server/DynamicServerConfig.java   |  2 +-
 .../server/coordinator/AutoPartitionManager.java   | 73 ++++++++++++++--------
 .../coordinator/CoordinatorEventProcessor.java     | 10 +++
 .../coordinator/AutoPartitionManagerTest.java      | 69 ++++++++++++++++++++
 6 files changed, 186 insertions(+), 27 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index b29f2a74b..bf9b1428e 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -48,7 +48,8 @@ public class FlussConfigUtils {
                 Arrays.asList(
                         ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
                         ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
-                        ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key());
+                        ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(),
+                        
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key());
     }
 
     public static boolean isTableStorageConfig(String key) {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index e427ed324..eebe58611 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -579,6 +579,62 @@ abstract class FlinkCatalogITCase {
         assertResultsIgnoreOrder(showPartitionIterator, 
expectedShowPartitionsResult, true);
     }
 
+    @Test
+    void testAlterAutoPartitionRetention() throws Exception {
+        String tblName = "test_alter_auto_partition_retention";
+        ObjectPath objectPath = new ObjectPath(DEFAULT_DB, tblName);
+
+        // Create an auto-partitioned table with HOUR time unit and retention=3
+        tEnv.executeSql(
+                "create table "
+                        + tblName
+                        + " (a int, b string) partitioned by (b) "
+                        + "with ('table.auto-partition.enabled' = 'true',"
+                        + " 'table.auto-partition.time-unit' = 'hour',"
+                        + " 'table.auto-partition.num-retention' = '3')");
+
+        TablePath tablePath = new TablePath(DEFAULT_DB, tblName);
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
+
+        String datetimePattern = "yyyyMMddHH";
+        String oldPartition =
+                LocalDateTime.now()
+                        .minusHours(3)
+                        .format(DateTimeFormatter.ofPattern(datetimePattern));
+
+        tEnv.executeSql(
+                String.format("alter table %s add partition (b = '%s')", 
tblName, oldPartition));
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 3);
+        CloseableIterator<Row> showPartitionIterator =
+                tEnv.executeSql("show partitions " + tblName).collect();
+        List<String> partitions =
+                CollectionUtil.iteratorToList(showPartitionIterator).stream()
+                        .map(Row::toString)
+                        .collect(Collectors.toList());
+        assertThat(partitions).contains(String.format("+I[b=%s]", 
oldPartition));
+
+        // Alter retention from 3 to 1
+        tEnv.executeSql(
+                "alter table " + tblName + " set 
('table.auto-partition.num-retention' = '1')");
+
+        // The old partition should be dropped after the periodic check fires
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionsDropped(
+                tablePath, Collections.singletonList(oldPartition));
+
+        // Verify the old partition is no longer listed
+        showPartitionIterator = tEnv.executeSql("show partitions " + 
tblName).collect();
+        partitions =
+                CollectionUtil.iteratorToList(showPartitionIterator).stream()
+                        .map(Row::toString)
+                        .collect(Collectors.toList());
+        assertThat(partitions).doesNotContain(String.format("+I[b=%s]", 
oldPartition));
+
+        // Verify the altered property is persisted
+        CatalogTable table = (CatalogTable) catalog.getTable(objectPath);
+        
assertThat(table.getOptions().get(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key()))
+                .isEqualTo("1");
+    }
+
     @Test
     void testTableWithExpression() throws Exception {
         // create a table with watermark and computed column
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
index d4430bb50..634a91a1c 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
@@ -164,7 +164,7 @@ class DynamicServerConfig {
         }
 
         // Build new configuration by merging initial + dynamic configs
-        Map<String, String> newConfigMap = buildConfigMap(effectiveChanges);
+        Map<String, String> newConfigMap = buildConfigMap(newDynamicConfigs);
         Configuration newConfig = Configuration.fromMap(newConfigMap);
 
         // Apply changes to all registered ServerReconfigurable instances
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
index 5f1673974..695c770d4 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
@@ -136,31 +136,34 @@ public class AutoPartitionManager implements 
AutoCloseable {
         tableInfos.forEach(tableInfo -> addAutoPartitionTable(tableInfo, 
false));
     }
 
-    public void addAutoPartitionTable(TableInfo tableInfo, boolean 
forceDoAutoPartition) {
+    public void updateAutoPartitionTables(TableInfo tableInfo) {
         checkNotClosed();
         long tableId = tableInfo.getTableId();
+        LOG.info("Updating auto partition table [{}] (id={})", 
tableInfo.getTablePath(), tableId);
         Set<String> partitions = 
metadataManager.getPartitions(tableInfo.getTablePath());
         inLock(
                 lock,
                 () -> {
-                    autoPartitionTables.put(tableId, tableInfo);
-                    TreeMap<String, Set<String>> partitionMap =
-                            partitionsByTable.computeIfAbsent(
-                                    tableInfo.getTableId(), k -> new 
TreeMap<>());
-                    checkNotNull(partitionMap, "Partition map is null.");
-                    partitions.forEach(
-                            partitionName ->
-                                    addPartitionToPartitionsByTable(
-                                            tableInfo, partitionMap, 
partitionName));
-                    if 
(tableInfo.getTableConfig().getAutoPartitionStrategy().timeUnit()
-                            == AutoPartitionTimeUnit.DAY) {
-                        // get the delay minutes to create partition
-                        int delayMinutes = 
ThreadLocalRandom.current().nextInt(60 * 23);
-
-                        autoCreateDayPartitionDelayMinutes.put(tableId, 
delayMinutes);
-                    }
+                    // Remove old state
+                    removeAutoPartitionTableLocked(tableId);
+                    // Add new state
+                    addAutoPartitionTableLocked(tableInfo, partitions);
                 });
 
+        // schedule auto partition for this table immediately
+        periodicExecutor.schedule(() -> doAutoPartition(tableId, true), 0, 
TimeUnit.MILLISECONDS);
+        LOG.info(
+                "Updated auto partition table [{}] (id={}) in scheduler",
+                tableInfo.getTablePath(),
+                tableId);
+    }
+
+    public void addAutoPartitionTable(TableInfo tableInfo, boolean 
forceDoAutoPartition) {
+        checkNotClosed();
+        long tableId = tableInfo.getTableId();
+        Set<String> partitions = 
metadataManager.getPartitions(tableInfo.getTablePath());
+        inLock(lock, () -> addAutoPartitionTableLocked(tableInfo, partitions));
+
         // schedule auto partition for this table immediately
         periodicExecutor.schedule(
                 () -> doAutoPartition(tableId, forceDoAutoPartition), 0, 
TimeUnit.MILLISECONDS);
@@ -172,14 +175,7 @@ public class AutoPartitionManager implements AutoCloseable 
{
 
     public void removeAutoPartitionTable(long tableId) {
         checkNotClosed();
-        TableInfo tableInfo =
-                inLock(
-                        lock,
-                        () -> {
-                            partitionsByTable.remove(tableId);
-                            autoCreateDayPartitionDelayMinutes.remove(tableId);
-                            return autoPartitionTables.remove(tableId);
-                        });
+        TableInfo tableInfo = inLock(lock, () -> 
removeAutoPartitionTableLocked(tableId));
         if (tableInfo != null) {
             LOG.info(
                     "Removed auto partition table [{}] (id={}) from scheduler",
@@ -188,6 +184,32 @@ public class AutoPartitionManager implements AutoCloseable 
{
         }
     }
 
+    /** Must be called while holding {@link #lock}. */
+    @Nullable
+    private TableInfo removeAutoPartitionTableLocked(long tableId) {
+        partitionsByTable.remove(tableId);
+        autoCreateDayPartitionDelayMinutes.remove(tableId);
+        return autoPartitionTables.remove(tableId);
+    }
+
+    /** Must be called while holding {@link #lock}. */
+    private void addAutoPartitionTableLocked(TableInfo tableInfo, Set<String> 
partitions) {
+        long tableId = tableInfo.getTableId();
+        autoPartitionTables.put(tableId, tableInfo);
+        TreeMap<String, Set<String>> partitionMap =
+                partitionsByTable.computeIfAbsent(tableId, k -> new 
TreeMap<>());
+        checkNotNull(partitionMap, "Partition map is null.");
+        partitions.forEach(
+                partitionName ->
+                        addPartitionToPartitionsByTable(tableInfo, 
partitionMap, partitionName));
+        if (tableInfo.getTableConfig().getAutoPartitionStrategy().timeUnit()
+                == AutoPartitionTimeUnit.DAY) {
+            // get the delay minutes to create partition
+            int delayMinutes = ThreadLocalRandom.current().nextInt(60 * 23);
+            autoCreateDayPartitionDelayMinutes.put(tableId, delayMinutes);
+        }
+    }
+
     /**
      * Try to add a partition to cache if this table is autoPartitionedTable 
and partition not
      * exists in cache.
@@ -312,6 +334,7 @@ public class AutoPartitionManager implements AutoCloseable {
                         tableId);
                 continue;
             }
+
             dropPartitions(
                     tablePath,
                     tableInfo.getPartitionKeys(),
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index 4668593ea..7223c0f01 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -121,6 +121,7 @@ import 
org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode;
 import org.apache.fluss.server.zk.data.ZkData.TableIdsZNode;
 import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
 import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
+import org.apache.fluss.utils.AutoPartitionStrategy;
 import org.apache.fluss.utils.types.Tuple2;
 
 import org.slf4j.Logger;
@@ -797,6 +798,15 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                         newTableInfo.getTableId(), newFreshness.toMillis());
             }
         }
+
+        AutoPartitionStrategy autoPartitionStrategy =
+                newTableInfo.getTableConfig().getAutoPartitionStrategy();
+        if (autoPartitionStrategy.isAutoPartitionEnabled()
+                && autoPartitionStrategy.numToRetain()
+                        != 
oldTableInfo.getTableConfig().getAutoPartitionStrategy().numToRetain()) {
+            autoPartitionManager.updateAutoPartitionTables(newTableInfo);
+        }
+
         // more post-alter actions can be added here
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
index 569cd1209..5114ee115 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
@@ -566,6 +566,53 @@ class AutoPartitionManagerTest {
         assertThat(partitionsNum).isEqualTo(3);
     }
 
+    @Test
+    void testUpdateAutoPartitionNumRetention() throws Exception {
+        // Start at a well-known time
+        ZonedDateTime startTime =
+                
LocalDateTime.parse("2024-09-10T00:00:00").atZone(ZoneId.systemDefault());
+        long startMs = startTime.toInstant().toEpochMilli();
+        ManualClock clock = new ManualClock(startMs);
+        ManuallyTriggeredScheduledExecutorService periodicExecutor =
+                new ManuallyTriggeredScheduledExecutorService();
+
+        AutoPartitionManager autoPartitionManager =
+                new AutoPartitionManager(
+                        new TestingServerMetadataCache(3),
+                        metadataManager,
+                        new Configuration(),
+                        clock,
+                        periodicExecutor);
+        autoPartitionManager.start();
+
+        // Create a DAY-partitioned table with numRetention=3, numPreCreate=4
+        TableInfo table = createPartitionedTable(3, 4, 
AutoPartitionTimeUnit.HOUR);
+        TablePath tablePath = table.getTablePath();
+        autoPartitionManager.addAutoPartitionTable(table, true);
+        periodicExecutor.triggerNonPeriodicScheduledTask();
+
+        // pre-create 4 partitions: 2024091000, 2024091001, 2024091002, 
2024091003
+        Map<String, PartitionRegistration> partitions =
+                zookeeperClient.getPartitionRegistrations(tablePath);
+        assertThat(partitions.keySet())
+                .containsExactlyInAnyOrder("2024091000", "2024091001", 
"2024091002", "2024091003");
+
+        // Now update the table to numRetention=1 (more aggressive retention)
+        TableInfo updatedTable =
+                createUpdatedTableInfo(table, /* numRetention= */ 1, /* 
numPreCreate= */ 4);
+        autoPartitionManager.updateAutoPartitionTables(updatedTable);
+        // Advance clock by 4 hours to trigger retention drops and new 
pre-creations
+        clock.advanceTime(Duration.ofHours(4));
+        periodicExecutor.triggerNonPeriodicScheduledTask();
+
+        partitions = zookeeperClient.getPartitionRegistrations(tablePath);
+        // current partition is "2024091004", retain 1 => keep only 
2024091003..2024091004
+        // pre-create 4 from current => 2024091004..2024091007 (already exist)
+        assertThat(partitions.keySet())
+                .containsExactlyInAnyOrder(
+                        "2024091003", "2024091004", "2024091005", 
"2024091006", "2024091007");
+    }
+
     private static class TestParams {
         final AutoPartitionTimeUnit timeUnit;
         final boolean multiplePartitionKeys;
@@ -840,4 +887,26 @@ class AutoPartitionManagerTest {
         zookeeperClient.registerTable(tablePath, registration);
         return tableInfo;
     }
+
+    /** Creates a new TableInfo with updated numRetention and numPreCreate, 
reusing the original. */
+    private TableInfo createUpdatedTableInfo(
+            TableInfo original, int newNumRetention, int newNumPreCreate) {
+        Configuration newProperties = new 
Configuration(original.getProperties());
+        newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, 
newNumRetention);
+        newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, 
newNumPreCreate);
+        return new TableInfo(
+                original.getTablePath(),
+                original.getTableId(),
+                original.getSchemaId(),
+                original.getSchema(),
+                original.getBucketKeys(),
+                original.getPartitionKeys(),
+                original.getNumBuckets(),
+                newProperties,
+                original.getCustomProperties(),
+                original.getRemoteDataDir(),
+                original.getComment().orElse(null),
+                original.getCreatedTime(),
+                System.currentTimeMillis());
+    }
 }

Reply via email to