platinumhamburg commented on code in PR #1991:
URL: https://github.com/apache/fluss/pull/1991#discussion_r2730424892
##########
fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java:
##########
@@ -545,11 +570,27 @@ private void registerLakeTieringMetrics() {
: logTablet.localMaxTimestamp() -
logTablet.getLakeMaxTimestamp());
}
- private void onBecomeNewFollower() {
+ private void onBecomeNewFollower(int standbyReplica) {
if (isKvTable()) {
- // it should be from leader to follower, we need to destroy the kv
tablet
- dropKv();
+ boolean isNowStandby = (standbyReplica == localTabletServerId);
+ boolean wasLeader = isLeader();
+ boolean wasStandby = this.isStandbyReplica;
+ checkNotNull(kvSnapshotManager);
+ if (isNowStandby) {
+ kvSnapshotManager.becomeStandby();
+ becomeStandby();
+ } else {
+ // to be new follower.
+ kvSnapshotManager.becomeFollower();
+ if (wasStandby || wasLeader) {
+ // standby -> leader or leader -> leader
+ dropKv();
Review Comment:
When a replica leaves standby (or leader) role, `dropKv()` deletes local KV
data. However, the standby download cache inside `KvSnapshotManager`
(`downloadedSstFiles`, `downloadedMiscFiles`, `standbySnapshotSize`) is never
cleared. If the replica later becomes standby again,
`incrementalDownloadSnapshot()` will reuse stale sets and skip downloads for
files that no longer exist.
##########
fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java:
##########
@@ -73,6 +73,7 @@ public class MetricNames {
public static final String SERVER_LOGICAL_STORAGE_KV_SIZE = "kvSize";
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE =
"localSize";
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE =
"remoteLogSize";
+ public static final String SERVER_PHYSICAL_STORAGE_STANDBY_SIZE =
"standbySize";
Review Comment:
Add a brief comment and doc entry clarifying what `standbySize` represents.
##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -758,13 +768,33 @@ public void notifyKvSnapshotOffset(
"notifyKvSnapshotOffset");
// update the snapshot offset.
TableBucket tb =
notifyKvSnapshotOffsetData.getTableBucket();
- LogTablet logTablet =
getReplicaOrException(tb).getLogTablet();
- logTablet.updateMinRetainOffset(
- notifyKvSnapshotOffsetData.getMinRetainOffset());
+ Replica replica = getReplicaOrException(tb);
+ if (notifyKvSnapshotOffsetData.getSnapshotId() != null) {
+ // try to download the snapshot for standby replica.
+ try {
+
replica.downloadSnapshot(notifyKvSnapshotOffsetData.getSnapshotId());
Review Comment:
Snapshot download runs under `replicaStateChangeLock`. Long downloads can
block leader/follower transitions and other state changes.
##########
fluss-server/src/test/java/org/apache/fluss/server/replica/KvSnapshotITCase.java:
##########
Review Comment:
- No test verifying standby demotion and re-standby.
- No test verifying failure and recovery paths (download failure, readiness)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]