This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit ceef4535cb5533c5761e24e834da9ca2bc4f425a Author: Liebing <[email protected]> AuthorDate: Mon Feb 9 17:07:35 2026 +0800 [server] Support alter table config `table.log.tiered.local-segments` (#2508) Co-authored-by: Liebing <[email protected]> --- .../fluss/client/admin/FlussAdminITCase.java | 74 ++++++++++++++++++++++ .../org/apache/fluss/config/FlussConfigUtils.java | 3 +- .../org/apache/fluss/server/log/LogTablet.java | 10 ++- .../org/apache/fluss/server/replica/Replica.java | 21 ++++++ .../fluss/server/replica/ReplicaManager.java | 20 +++++- .../server/log/remote/RemoteLogManagerTest.java | 59 +++++++++++++++++ 6 files changed, 183 insertions(+), 4 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 9936f9430..f2c23f2ae 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -70,6 +70,8 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle; +import org.apache.fluss.server.log.LogTablet; +import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.types.DataTypeChecks; @@ -1636,4 +1638,76 @@ class FlussAdminITCase extends ClientToServerITCaseBase { "Server tag PERMANENT_OFFLINE not exists for server 2, the current " + "server tag of this server is TEMPORARY_OFFLINE."); } + + @Test + void testAlterTableTieredLogLocalSegments() throws Exception { + // 1. Create table with default config = 2 + TablePath tablePath = TablePath.of("test_db", "test_alter_tiered_segments"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("test table for tiered log segments") + .distributedBy(3) + .build(); + + admin.createTable(tablePath, tableDescriptor, false).get(); + + // 2. Verify initial config (metadata level) + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getTableConfig().getTieredLogLocalSegments()).isEqualTo(2); + + // 3. Get LogTablet from TabletServer and verify initial state + long tableId = tableInfo.getTableId(); + TableBucket tableBucket = new TableBucket(tableId, 0); // Get first bucket + + // Wait and get leader replica + Replica replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket); + LogTablet logTablet = replica.getLogTablet(); + + // Verify initial LogTablet internal state + assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2); + + // 4. Modify to 5 via Admin API + List<TableChange> tableChanges = new ArrayList<>(); + tableChanges.add(TableChange.set(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), "5")); + admin.alterTable(tablePath, tableChanges, false).get(); + + // 5. Verify metadata has been updated + tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getTableConfig().getTieredLogLocalSegments()).isEqualTo(5); + + // 6. Wait for config to propagate to TabletServer (via UpdateMetadataRequest) + // Use retry mechanism to ensure config has propagated + waitUntil( + () -> logTablet.getTieredLogLocalSegments() == 5, + Duration.ofSeconds(30), + "Waiting for config to propagate to TabletServer"); + + // 7. Verify LogTablet internal state has been updated + assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(5); + + // 8. Reset to default value + tableChanges.clear(); + tableChanges.add(TableChange.reset(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key())); + admin.alterTable(tablePath, tableChanges, false).get(); + + // 9. Verify metadata reset success (config should be removed, using default value 2) + tableInfo = admin.getTableInfo(tablePath).get(); + TableDescriptor td = tableInfo.toTableDescriptor(); + assertThat( + td.getProperties() + .containsKey(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key())) + .isFalse(); + + // 10. Wait for config reset to propagate, verify LogTablet uses default value 2 + waitUntil( + () -> logTablet.getTieredLogLocalSegments() == 2, + Duration.ofSeconds(30), + "Waiting for config reset to propagate to TabletServer"); + + assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2); + + // 11. Cleanup + admin.dropTable(tablePath, false).get(); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index af4044458..08b97256c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -44,7 +44,8 @@ public class FlussConfigUtils { ALTERABLE_TABLE_OPTIONS = Arrays.asList( ConfigOptions.TABLE_DATALAKE_ENABLED.key(), - ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()); + ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), + ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key()); } public static boolean isTableStorageConfig(String key) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index 3f333dc10..371d3c128 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -100,7 +100,7 @@ public final class LogTablet { private final Scheduler scheduler; private final ScheduledFuture<?> writerExpireCheck; private final LogFormat logFormat; - private final int tieredLogLocalSegments; + private volatile int tieredLogLocalSegments; private final Clock clock; private final boolean isChangeLog; @@ -526,6 +526,14 @@ public final class LogTablet { this.isDataLakeEnabled = isDataLakeEnabled; } + public void updateTieredLogLocalSegments(int tieredLogLocalSegments) { + this.tieredLogLocalSegments = tieredLogLocalSegments; + } + + public int getTieredLogLocalSegments() { + return tieredLogLocalSegments; + } + public void updateLakeTableSnapshotId(long snapshotId) { if (snapshotId > this.lakeTableSnapshotId) { this.lakeTableSnapshotId = snapshotId; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index b884ed88b..536f8b2be 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -593,6 +593,27 @@ public final class Replica { isDataLakeEnabled); } + /** + * Update the number of log segments to retain in local storage. This method is called when the + * table configuration is altered. + * + * @param tieredLogLocalSegments the new number of segments to retain locally + */ + public void updateTieredLogLocalSegments(int tieredLogLocalSegments) { + int oldValue = logTablet.getTieredLogLocalSegments(); + if (oldValue == tieredLogLocalSegments) { + return; + } + + logTablet.updateTieredLogLocalSegments(tieredLogLocalSegments); + + LOG.info( + "Replica for {} tieredLogLocalSegments changed from {} to {}", + tableBucket, + oldValue, + tieredLogLocalSegments); + } + private void createKv() { try { // create a closeable registry for the closable related to kv diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index d8ddb92c4..23ac2df71 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -476,16 +476,24 @@ public class ReplicaManager { private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) { Map<Long, Boolean> tableIdToLakeFlag = new HashMap<>(); + Map<Long, Integer> tableIdToTieredLogLocalSegments = new HashMap<>(); + for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) { TableInfo tableInfo = tableMetadata.getTableInfo(); + long tableId = tableInfo.getTableId(); + + // Collect datalake enabled configuration if (tableInfo.getTableConfig().getDataLakeFormat().isPresent()) { - long tableId = tableInfo.getTableId(); boolean dataLakeEnabled = tableInfo.getTableConfig().isDataLakeEnabled(); tableIdToLakeFlag.put(tableId, dataLakeEnabled); } + + // Collect tiered log local segments configuration + int tieredLogLocalSegments = tableInfo.getTableConfig().getTieredLogLocalSegments(); + tableIdToTieredLogLocalSegments.put(tableId, tieredLogLocalSegments); } - if (tableIdToLakeFlag.isEmpty()) { + if (tableIdToLakeFlag.isEmpty() && tableIdToTieredLogLocalSegments.isEmpty()) { return; } @@ -494,9 +502,17 @@ public class ReplicaManager { if (hostedReplica instanceof OnlineReplica) { Replica replica = ((OnlineReplica) hostedReplica).getReplica(); long tableId = replica.getTableBucket().getTableId(); + + // Update datalake enabled configuration if (tableIdToLakeFlag.containsKey(tableId)) { replica.updateIsDataLakeEnabled(tableIdToLakeFlag.get(tableId)); } + + // Update tiered log local segments configuration + if (tableIdToTieredLogLocalSegments.containsKey(tableId)) { + replica.updateTieredLogLocalSegments( + tableIdToTieredLogLocalSegments.get(tableId)); + } } } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java index 4d06411bc..2834505c9 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java @@ -605,6 +605,65 @@ class RemoteLogManagerTest extends RemoteLogTestBase { .isEqualTo(-1L); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testAlterTableTieredLogLocalSegments(boolean partitionedTable) throws Exception { + // 1. Create table with initial config tieredLogLocalSegments = 2 + long tableId = + registerTableInZkClient( + DATA1_TABLE_PATH, + DATA1_SCHEMA, + 200L, + Collections.emptyList(), + Collections.singletonMap( + ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), "2")); + TableBucket tb = makeTableBucket(tableId, partitionedTable); + makeLogTableAsLeader(tb, partitionedTable); + + Replica replica = replicaManager.getReplicaOrException(tb); + LogTablet logTablet = replica.getLogTablet(); + + // Verify initial config + assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2); + + // 2. Generate 10 segments, upload 9 to remote (excluding active segment) + addMultiSegmentsToLogTablet(logTablet, 10); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + // Verify upload success + List<RemoteLogSegment> remoteSegments = remoteLogManager.relevantRemoteLogSegments(tb, 0L); + assertThat(remoteSegments).hasSize(9); + + // Verify 2 local segments retained + assertThat(logTablet.getSegments()).hasSize(2); + + // 3. Directly update config via Replica (simulating metadata propagation) + replica.updateTieredLogLocalSegments(5); + + // Verify LogTablet internal state has been updated + assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(5); + + // 4. Generate more segments and trigger cleanup, verify new config takes effect + addMultiSegmentsToLogTablet(logTablet, 10); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + // Should retain 5 local segments + assertThat(logTablet.getSegments()).hasSize(5); + + // 5. Modify config to 3 again, verify multiple modifications work + replica.updateTieredLogLocalSegments(3); + + // Verify LogTablet internal state updated again + assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(3); + + // Generate more segments and verify new config takes effect + addMultiSegmentsToLogTablet(logTablet, 5); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + // Should retain 3 local segments + assertThat(logTablet.getSegments()).hasSize(3); + } + private TableBucket makeTableBucket(boolean partitionTable) { return makeTableBucket(DATA1_TABLE_ID, partitionTable); }
