luoyuxia commented on code in PR #2378:
URL: https://github.com/apache/fluss/pull/2378#discussion_r2710830265
##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -459,9 +460,28 @@ public void maybeUpdateMetadataCache(int coordinatorEpoch,
ClusterMetadata clust
// check or apply coordinator epoch.
validateAndApplyCoordinatorEpoch(coordinatorEpoch,
"updateMetadataCache");
metadataCache.updateClusterMetadata(clusterMetadata);
+ updateReplicaTableConfig(clusterMetadata);
});
}
+ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
+ for (TableMetadata tableMetadata :
clusterMetadata.getTableMetadataList()) {
+ TableInfo tableInfo = tableMetadata.getTableInfo();
+ long tableId = tableInfo.getTableId();
+ boolean dataLakeEnabled =
tableInfo.getTableConfig().isDataLakeEnabled();
Review Comment:
nit:
```
if (tableInfo.getTableConfig().getDataLakeFormat().isPresent()) {
boolean dataLakeEnabled =
tableInfo.getTableConfig().isDataLakeEnabled();
for (Map.Entry<TableBucket, HostedReplica> entry :
allReplicas.entrySet()) {
HostedReplica hostedReplica = entry.getValue();
if (hostedReplica instanceof OnlineReplica) {
Replica replica = ((OnlineReplica)
hostedReplica).getReplica();
if (replica.getTableBucket().getTableId() ==
tableId) {
replica.updateIsDataLakeEnabled(dataLakeEnabled);
}
}
}
}
```
?
add another lake format check to avoid loop replicas for all tables?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -720,6 +723,47 @@ private void processSchemaChange(SchemaChangeEvent
schemaChangeEvent) {
null);
}
+ private void processTableRegistrationChange(TableRegistrationChangeEvent
event) {
+ TablePath tablePath = event.getTablePath();
+ Long tableId = coordinatorContext.getTableIdByPath(tablePath);
+
+ // Skip if the table is not yet registered in coordinator context.
+ // Should not happen in normal cases.
+ if (tableId == null) {
+ return;
Review Comment:
nit: add warning log?
##########
fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java:
##########
@@ -202,6 +214,233 @@ void testFollowerFetchAlreadyMoveToRemoteLog(boolean
withWriterId) throws Except
FLUSS_CLUSTER_EXTENSION.waitUntilReplicaExpandToIsr(tb, follower);
}
+ @Test
+ void testRemoteLogTTLWithLakeDisabled() throws Exception {
Review Comment:
Could we just combine these three test methods into one test method
**Stage A: Lake Disabled (Initial)**
Action: Create a table with TABLE_DATALAKE_ENABLED = false.
Expectation: Expired remote log segments are deleted physically as soon as
the TTL is reached.
**Stage B: Dynamic Enable & Retention**
Action: Dynamically alter the table to set TABLE_DATALAKE_ENABLED = true,
produce new data, and advance the clock.
Expectation: Expired segments are NOT deleted because the lakeLogEndOffset
has not yet reached the segment offsets. This ensures data safety for the lake
tiering.
**Stage C: Lake Progress Update (Cleanup)**
Action: Simulate lake tiering completion by calling
logTablet.updateLakeLogEndOffset(...).
Expectation: The cleanup coordinator now allows the deletion of expired
segments that have been successfully tiered to the lake.
**Stage D: Dynamic Disable (Revert to Direct Cleanup)**
Action: Dynamically alter the table back to TABLE_DATALAKE_ENABLED = false
and produce new data.
Expectation: The system resumes the direct physical deletion of expired
segments, ignoring the lake offset status.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]