This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit ceef4535cb5533c5761e24e834da9ca2bc4f425a
Author: Liebing <[email protected]>
AuthorDate: Mon Feb 9 17:07:35 2026 +0800

    [server] Support alter table config `table.log.tiered.local-segments` 
(#2508)
    
    Co-authored-by: Liebing <[email protected]>
---
 .../fluss/client/admin/FlussAdminITCase.java       | 74 ++++++++++++++++++++++
 .../org/apache/fluss/config/FlussConfigUtils.java  |  3 +-
 .../org/apache/fluss/server/log/LogTablet.java     | 10 ++-
 .../org/apache/fluss/server/replica/Replica.java   | 21 ++++++
 .../fluss/server/replica/ReplicaManager.java       | 20 +++++-
 .../server/log/remote/RemoteLogManagerTest.java    | 59 +++++++++++++++++
 6 files changed, 183 insertions(+), 4 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 9936f9430..f2c23f2ae 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -70,6 +70,8 @@ import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
+import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.ServerTags;
 import org.apache.fluss.types.DataTypeChecks;
@@ -1636,4 +1638,76 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                         "Server tag PERMANENT_OFFLINE not exists for server 2, 
the current "
                                 + "server tag of this server is 
TEMPORARY_OFFLINE.");
     }
+
+    @Test
+    void testAlterTableTieredLogLocalSegments() throws Exception {
+        // 1. Create table with default config = 2
+        TablePath tablePath = TablePath.of("test_db", 
"test_alter_tiered_segments");
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(DEFAULT_SCHEMA)
+                        .comment("test table for tiered log segments")
+                        .distributedBy(3)
+                        .build();
+
+        admin.createTable(tablePath, tableDescriptor, false).get();
+
+        // 2. Verify initial config (metadata level)
+        TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+        
assertThat(tableInfo.getTableConfig().getTieredLogLocalSegments()).isEqualTo(2);
+
+        // 3. Get LogTablet from TabletServer and verify initial state
+        long tableId = tableInfo.getTableId();
+        TableBucket tableBucket = new TableBucket(tableId, 0); // Get first 
bucket
+
+        // Wait and get leader replica
+        Replica replica = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+        LogTablet logTablet = replica.getLogTablet();
+
+        // Verify initial LogTablet internal state
+        assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2);
+
+        // 4. Modify to 5 via Admin API
+        List<TableChange> tableChanges = new ArrayList<>();
+        
tableChanges.add(TableChange.set(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(),
 "5"));
+        admin.alterTable(tablePath, tableChanges, false).get();
+
+        // 5. Verify metadata has been updated
+        tableInfo = admin.getTableInfo(tablePath).get();
+        
assertThat(tableInfo.getTableConfig().getTieredLogLocalSegments()).isEqualTo(5);
+
+        // 6. Wait for config to propagate to TabletServer (via 
UpdateMetadataRequest)
+        // Use retry mechanism to ensure config has propagated
+        waitUntil(
+                () -> logTablet.getTieredLogLocalSegments() == 5,
+                Duration.ofSeconds(30),
+                "Waiting for config to propagate to TabletServer");
+
+        // 7. Verify LogTablet internal state has been updated
+        assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(5);
+
+        // 8. Reset to default value
+        tableChanges.clear();
+        
tableChanges.add(TableChange.reset(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key()));
+        admin.alterTable(tablePath, tableChanges, false).get();
+
+        // 9. Verify metadata reset success (config should be removed, using 
default value 2)
+        tableInfo = admin.getTableInfo(tablePath).get();
+        TableDescriptor td = tableInfo.toTableDescriptor();
+        assertThat(
+                        td.getProperties()
+                                
.containsKey(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key()))
+                .isFalse();
+
+        // 10. Wait for config reset to propagate, verify LogTablet uses 
default value 2
+        waitUntil(
+                () -> logTablet.getTieredLogLocalSegments() == 2,
+                Duration.ofSeconds(30),
+                "Waiting for config reset to propagate to TabletServer");
+
+        assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2);
+
+        // 11. Cleanup
+        admin.dropTable(tablePath, false).get();
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index af4044458..08b97256c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -44,7 +44,8 @@ public class FlussConfigUtils {
         ALTERABLE_TABLE_OPTIONS =
                 Arrays.asList(
                         ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
-                        ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
+                        ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
+                        ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key());
     }
 
     public static boolean isTableStorageConfig(String key) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 3f333dc10..371d3c128 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -100,7 +100,7 @@ public final class LogTablet {
     private final Scheduler scheduler;
     private final ScheduledFuture<?> writerExpireCheck;
     private final LogFormat logFormat;
-    private final int tieredLogLocalSegments;
+    private volatile int tieredLogLocalSegments;
     private final Clock clock;
     private final boolean isChangeLog;
 
@@ -526,6 +526,14 @@ public final class LogTablet {
         this.isDataLakeEnabled = isDataLakeEnabled;
     }
 
+    public void updateTieredLogLocalSegments(int tieredLogLocalSegments) {
+        this.tieredLogLocalSegments = tieredLogLocalSegments;
+    }
+
+    public int getTieredLogLocalSegments() {
+        return tieredLogLocalSegments;
+    }
+
     public void updateLakeTableSnapshotId(long snapshotId) {
         if (snapshotId > this.lakeTableSnapshotId) {
             this.lakeTableSnapshotId = snapshotId;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index b884ed88b..536f8b2be 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -593,6 +593,27 @@ public final class Replica {
                 isDataLakeEnabled);
     }
 
+    /**
+     * Update the number of log segments to retain in local storage. This 
method is called when the
+     * table configuration is altered.
+     *
+     * @param tieredLogLocalSegments the new number of segments to retain 
locally
+     */
+    public void updateTieredLogLocalSegments(int tieredLogLocalSegments) {
+        int oldValue = logTablet.getTieredLogLocalSegments();
+        if (oldValue == tieredLogLocalSegments) {
+            return;
+        }
+
+        logTablet.updateTieredLogLocalSegments(tieredLogLocalSegments);
+
+        LOG.info(
+                "Replica for {} tieredLogLocalSegments changed from {} to {}",
+                tableBucket,
+                oldValue,
+                tieredLogLocalSegments);
+    }
+
     private void createKv() {
         try {
             // create a closeable registry for the closable related to kv
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index d8ddb92c4..23ac2df71 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -476,16 +476,24 @@ public class ReplicaManager {
 
     private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
         Map<Long, Boolean> tableIdToLakeFlag = new HashMap<>();
+        Map<Long, Integer> tableIdToTieredLogLocalSegments = new HashMap<>();
+
         for (TableMetadata tableMetadata : 
clusterMetadata.getTableMetadataList()) {
             TableInfo tableInfo = tableMetadata.getTableInfo();
+            long tableId = tableInfo.getTableId();
+
+            // Collect datalake enabled configuration
             if (tableInfo.getTableConfig().getDataLakeFormat().isPresent()) {
-                long tableId = tableInfo.getTableId();
                 boolean dataLakeEnabled = 
tableInfo.getTableConfig().isDataLakeEnabled();
                 tableIdToLakeFlag.put(tableId, dataLakeEnabled);
             }
+
+            // Collect tiered log local segments configuration
+            int tieredLogLocalSegments = 
tableInfo.getTableConfig().getTieredLogLocalSegments();
+            tableIdToTieredLogLocalSegments.put(tableId, 
tieredLogLocalSegments);
         }
 
-        if (tableIdToLakeFlag.isEmpty()) {
+        if (tableIdToLakeFlag.isEmpty() && 
tableIdToTieredLogLocalSegments.isEmpty()) {
             return;
         }
 
@@ -494,9 +502,17 @@ public class ReplicaManager {
             if (hostedReplica instanceof OnlineReplica) {
                 Replica replica = ((OnlineReplica) hostedReplica).getReplica();
                 long tableId = replica.getTableBucket().getTableId();
+
+                // Update datalake enabled configuration
                 if (tableIdToLakeFlag.containsKey(tableId)) {
                     
replica.updateIsDataLakeEnabled(tableIdToLakeFlag.get(tableId));
                 }
+
+                // Update tiered log local segments configuration
+                if (tableIdToTieredLogLocalSegments.containsKey(tableId)) {
+                    replica.updateTieredLogLocalSegments(
+                            tableIdToTieredLogLocalSegments.get(tableId));
+                }
             }
         }
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
index 4d06411bc..2834505c9 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
@@ -605,6 +605,65 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
                 .isEqualTo(-1L);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testAlterTableTieredLogLocalSegments(boolean partitionedTable) throws 
Exception {
+        // 1. Create table with initial config tieredLogLocalSegments = 2
+        long tableId =
+                registerTableInZkClient(
+                        DATA1_TABLE_PATH,
+                        DATA1_SCHEMA,
+                        200L,
+                        Collections.emptyList(),
+                        Collections.singletonMap(
+                                
ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), "2"));
+        TableBucket tb = makeTableBucket(tableId, partitionedTable);
+        makeLogTableAsLeader(tb, partitionedTable);
+
+        Replica replica = replicaManager.getReplicaOrException(tb);
+        LogTablet logTablet = replica.getLogTablet();
+
+        // Verify initial config
+        assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2);
+
+        // 2. Generate 10 segments, upload 9 to remote (excluding active 
segment)
+        addMultiSegmentsToLogTablet(logTablet, 10);
+        remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+
+        // Verify upload success
+        List<RemoteLogSegment> remoteSegments = 
remoteLogManager.relevantRemoteLogSegments(tb, 0L);
+        assertThat(remoteSegments).hasSize(9);
+
+        // Verify 2 local segments retained
+        assertThat(logTablet.getSegments()).hasSize(2);
+
+        // 3. Directly update config via Replica (simulating metadata 
propagation)
+        replica.updateTieredLogLocalSegments(5);
+
+        // Verify LogTablet internal state has been updated
+        assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(5);
+
+        // 4. Generate more segments and trigger cleanup, verify new config 
takes effect
+        addMultiSegmentsToLogTablet(logTablet, 10);
+        remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+
+        // Should retain 5 local segments
+        assertThat(logTablet.getSegments()).hasSize(5);
+
+        // 5. Modify config to 3 again, verify multiple modifications work
+        replica.updateTieredLogLocalSegments(3);
+
+        // Verify LogTablet internal state updated again
+        assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(3);
+
+        // Generate more segments and verify new config takes effect
+        addMultiSegmentsToLogTablet(logTablet, 5);
+        remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+
+        // Should retain 3 local segments
+        assertThat(logTablet.getSegments()).hasSize(3);
+    }
+
     private TableBucket makeTableBucket(boolean partitionTable) {
         return makeTableBucket(DATA1_TABLE_ID, partitionTable);
     }

Reply via email to