This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5c2376209 [server] Support alter table config
`table.log.tiered.local-segments` (#2508)
5c2376209 is described below
commit 5c2376209d0a700c5ecc4587c751d71754244cea
Author: Liebing <[email protected]>
AuthorDate: Mon Feb 9 17:07:35 2026 +0800
[server] Support alter table config `table.log.tiered.local-segments`
(#2508)
Co-authored-by: Liebing <[email protected]>
---
.../fluss/client/admin/FlussAdminITCase.java | 74 ++++++++++++++++++++++
.../org/apache/fluss/config/FlussConfigUtils.java | 3 +-
.../org/apache/fluss/server/log/LogTablet.java | 10 ++-
.../org/apache/fluss/server/replica/Replica.java | 21 ++++++
.../fluss/server/replica/ReplicaManager.java | 20 +++++-
.../server/log/remote/RemoteLogManagerTest.java | 59 +++++++++++++++++
6 files changed, 183 insertions(+), 4 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 9936f9430..f2c23f2ae 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -70,6 +70,8 @@ import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
+import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.ServerTags;
import org.apache.fluss.types.DataTypeChecks;
@@ -1636,4 +1638,76 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
"Server tag PERMANENT_OFFLINE not exists for server 2,
the current "
+ "server tag of this server is
TEMPORARY_OFFLINE.");
}
+
+ @Test
+ void testAlterTableTieredLogLocalSegments() throws Exception {
+ // 1. Create table with default config = 2
+ TablePath tablePath = TablePath.of("test_db",
"test_alter_tiered_segments");
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(DEFAULT_SCHEMA)
+ .comment("test table for tiered log segments")
+ .distributedBy(3)
+ .build();
+
+ admin.createTable(tablePath, tableDescriptor, false).get();
+
+ // 2. Verify initial config (metadata level)
+ TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+
assertThat(tableInfo.getTableConfig().getTieredLogLocalSegments()).isEqualTo(2);
+
+ // 3. Get LogTablet from TabletServer and verify initial state
+ long tableId = tableInfo.getTableId();
+ TableBucket tableBucket = new TableBucket(tableId, 0); // Get first
bucket
+
+ // Wait and get leader replica
+ Replica replica =
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+ LogTablet logTablet = replica.getLogTablet();
+
+ // Verify initial LogTablet internal state
+ assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2);
+
+ // 4. Modify to 5 via Admin API
+ List<TableChange> tableChanges = new ArrayList<>();
+
tableChanges.add(TableChange.set(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(),
"5"));
+ admin.alterTable(tablePath, tableChanges, false).get();
+
+ // 5. Verify metadata has been updated
+ tableInfo = admin.getTableInfo(tablePath).get();
+
assertThat(tableInfo.getTableConfig().getTieredLogLocalSegments()).isEqualTo(5);
+
+ // 6. Wait for config to propagate to TabletServer (via
UpdateMetadataRequest)
+ // Use retry mechanism to ensure config has propagated
+ waitUntil(
+ () -> logTablet.getTieredLogLocalSegments() == 5,
+ Duration.ofSeconds(30),
+ "Waiting for config to propagate to TabletServer");
+
+ // 7. Verify LogTablet internal state has been updated
+ assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(5);
+
+ // 8. Reset to default value
+ tableChanges.clear();
+
tableChanges.add(TableChange.reset(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key()));
+ admin.alterTable(tablePath, tableChanges, false).get();
+
+ // 9. Verify metadata reset success (config should be removed, using
default value 2)
+ tableInfo = admin.getTableInfo(tablePath).get();
+ TableDescriptor td = tableInfo.toTableDescriptor();
+ assertThat(
+ td.getProperties()
+
.containsKey(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key()))
+ .isFalse();
+
+ // 10. Wait for config reset to propagate, verify LogTablet uses
default value 2
+ waitUntil(
+ () -> logTablet.getTieredLogLocalSegments() == 2,
+ Duration.ofSeconds(30),
+ "Waiting for config reset to propagate to TabletServer");
+
+ assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2);
+
+ // 11. Cleanup
+ admin.dropTable(tablePath, false).get();
+ }
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index af4044458..08b97256c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -44,7 +44,8 @@ public class FlussConfigUtils {
ALTERABLE_TABLE_OPTIONS =
Arrays.asList(
ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
- ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
+ ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
+ ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key());
}
public static boolean isTableStorageConfig(String key) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 3f333dc10..371d3c128 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -100,7 +100,7 @@ public final class LogTablet {
private final Scheduler scheduler;
private final ScheduledFuture<?> writerExpireCheck;
private final LogFormat logFormat;
- private final int tieredLogLocalSegments;
+ private volatile int tieredLogLocalSegments;
private final Clock clock;
private final boolean isChangeLog;
@@ -526,6 +526,14 @@ public final class LogTablet {
this.isDataLakeEnabled = isDataLakeEnabled;
}
+ public void updateTieredLogLocalSegments(int tieredLogLocalSegments) {
+ this.tieredLogLocalSegments = tieredLogLocalSegments;
+ }
+
+ public int getTieredLogLocalSegments() {
+ return tieredLogLocalSegments;
+ }
+
public void updateLakeTableSnapshotId(long snapshotId) {
if (snapshotId > this.lakeTableSnapshotId) {
this.lakeTableSnapshotId = snapshotId;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index b884ed88b..536f8b2be 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -593,6 +593,27 @@ public final class Replica {
isDataLakeEnabled);
}
+ /**
+ * Update the number of log segments to retain in local storage. This
method is called when the
+ * table configuration is altered.
+ *
+ * @param tieredLogLocalSegments the new number of segments to retain
locally
+ */
+ public void updateTieredLogLocalSegments(int tieredLogLocalSegments) {
+ int oldValue = logTablet.getTieredLogLocalSegments();
+ if (oldValue == tieredLogLocalSegments) {
+ return;
+ }
+
+ logTablet.updateTieredLogLocalSegments(tieredLogLocalSegments);
+
+ LOG.info(
+ "Replica for {} tieredLogLocalSegments changed from {} to {}",
+ tableBucket,
+ oldValue,
+ tieredLogLocalSegments);
+ }
+
private void createKv() {
try {
// create a closeable registry for the closable related to kv
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index d8ddb92c4..23ac2df71 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -476,16 +476,24 @@ public class ReplicaManager {
private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
Map<Long, Boolean> tableIdToLakeFlag = new HashMap<>();
+ Map<Long, Integer> tableIdToTieredLogLocalSegments = new HashMap<>();
+
for (TableMetadata tableMetadata :
clusterMetadata.getTableMetadataList()) {
TableInfo tableInfo = tableMetadata.getTableInfo();
+ long tableId = tableInfo.getTableId();
+
+ // Collect datalake enabled configuration
if (tableInfo.getTableConfig().getDataLakeFormat().isPresent()) {
- long tableId = tableInfo.getTableId();
boolean dataLakeEnabled =
tableInfo.getTableConfig().isDataLakeEnabled();
tableIdToLakeFlag.put(tableId, dataLakeEnabled);
}
+
+ // Collect tiered log local segments configuration
+ int tieredLogLocalSegments =
tableInfo.getTableConfig().getTieredLogLocalSegments();
+ tableIdToTieredLogLocalSegments.put(tableId,
tieredLogLocalSegments);
}
- if (tableIdToLakeFlag.isEmpty()) {
+ if (tableIdToLakeFlag.isEmpty() &&
tableIdToTieredLogLocalSegments.isEmpty()) {
return;
}
@@ -494,9 +502,17 @@ public class ReplicaManager {
if (hostedReplica instanceof OnlineReplica) {
Replica replica = ((OnlineReplica) hostedReplica).getReplica();
long tableId = replica.getTableBucket().getTableId();
+
+ // Update datalake enabled configuration
if (tableIdToLakeFlag.containsKey(tableId)) {
replica.updateIsDataLakeEnabled(tableIdToLakeFlag.get(tableId));
}
+
+ // Update tiered log local segments configuration
+ if (tableIdToTieredLogLocalSegments.containsKey(tableId)) {
+ replica.updateTieredLogLocalSegments(
+ tableIdToTieredLogLocalSegments.get(tableId));
+ }
}
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
index 4d06411bc..2834505c9 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
@@ -605,6 +605,65 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
.isEqualTo(-1L);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testAlterTableTieredLogLocalSegments(boolean partitionedTable) throws
Exception {
+ // 1. Create table with initial config tieredLogLocalSegments = 2
+ long tableId =
+ registerTableInZkClient(
+ DATA1_TABLE_PATH,
+ DATA1_SCHEMA,
+ 200L,
+ Collections.emptyList(),
+ Collections.singletonMap(
+
ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), "2"));
+ TableBucket tb = makeTableBucket(tableId, partitionedTable);
+ makeLogTableAsLeader(tb, partitionedTable);
+
+ Replica replica = replicaManager.getReplicaOrException(tb);
+ LogTablet logTablet = replica.getLogTablet();
+
+ // Verify initial config
+ assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2);
+
+ // 2. Generate 10 segments, upload 9 to remote (excluding active
segment)
+ addMultiSegmentsToLogTablet(logTablet, 10);
+ remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+
+ // Verify upload success
+ List<RemoteLogSegment> remoteSegments =
remoteLogManager.relevantRemoteLogSegments(tb, 0L);
+ assertThat(remoteSegments).hasSize(9);
+
+ // Verify 2 local segments retained
+ assertThat(logTablet.getSegments()).hasSize(2);
+
+ // 3. Directly update config via Replica (simulating metadata
propagation)
+ replica.updateTieredLogLocalSegments(5);
+
+ // Verify LogTablet internal state has been updated
+ assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(5);
+
+ // 4. Generate more segments and trigger cleanup, verify new config
takes effect
+ addMultiSegmentsToLogTablet(logTablet, 10);
+ remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+
+ // Should retain 5 local segments
+ assertThat(logTablet.getSegments()).hasSize(5);
+
+ // 5. Modify config to 3 again, verify multiple modifications work
+ replica.updateTieredLogLocalSegments(3);
+
+ // Verify LogTablet internal state updated again
+ assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(3);
+
+ // Generate more segments and verify new config takes effect
+ addMultiSegmentsToLogTablet(logTablet, 5);
+ remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+
+ // Should retain 3 local segments
+ assertThat(logTablet.getSegments()).hasSize(3);
+ }
+
private TableBucket makeTableBucket(boolean partitionTable) {
return makeTableBucket(DATA1_TABLE_ID, partitionTable);
}