platinumhamburg commented on code in PR #1991:
URL: https://github.com/apache/fluss/pull/1991#discussion_r2739466542


##########
fluss-server/src/test/java/org/apache/fluss/server/replica/KvSnapshotITCase.java:
##########
@@ -188,6 +190,434 @@ void testKvSnapshotAndDelete() throws Exception {
         checkDirsDeleted(bucketKvSnapshotDirs, tablePathMap);
     }
 
+    @Test
+    void testStandbyReplicaDownloadLatestSnapshot() throws Exception {
+        TablePath tablePath = TablePath.of("test_db", "test_table_standby");
+        long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
DATA1_TABLE_DESCRIPTOR_PK);
+        TableBucket tb0 = new TableBucket(tableId, 0);
+
+        FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb0);
+        // get the leader server
+        int leaderServer = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb0);
+
+        // put one kv batch
+        KvRecordBatch kvRecordBatch =
+                genKvRecordBatch(
+                        Tuple2.of("k1", new Object[] {1, "k1"}),
+                        Tuple2.of("k2", new Object[] {2, "k2"}));
+        PutKvRequest putKvRequest = newPutKvRequest(tableId, 0, -1, 
kvRecordBatch);
+
+        TabletServerGateway leaderGateway =
+                
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderServer);
+        leaderGateway.putKv(putKvRequest).get();
+
+        // wait for snapshot is available
+        final long snapshot1Id = 0;
+        CompletedSnapshot completedSnapshot =
+                waitValue(
+                                () -> completedSnapshotHandleStore.get(tb0, 
snapshot1Id),
+                                Duration.ofMinutes(2),
+                                "Fail to wait for the snapshot 0 for bucket " 
+ tb0)
+                        .retrieveCompleteSnapshot();
+        // check snapshot
+        List<Tuple2<byte[], byte[]>> expectedKeyValues =
+                getKeyValuePairs(
+                        genKvRecords(
+                                Tuple2.of("k1", new Object[] {1, "k1"}),
+                                Tuple2.of("k2", new Object[] {2, "k2"})));
+        checkSnapshot(completedSnapshot, expectedKeyValues, 2);
+
+        // check the standby replica contains the latest snapshot.
+        int standbyServer = FLUSS_CLUSTER_EXTENSION.waitAndGetStandby(tb0);
+        TabletServer standbyTs = 
FLUSS_CLUSTER_EXTENSION.getTabletServerById(standbyServer);
+        Replica replica = 
standbyTs.getReplicaManager().getReplicaOrException(tb0);
+        assertThat(replica.isStandby()).isTrue();
+
+        KvSnapshotManager kvSnapshotManager = replica.getKvSnapshotManager();
+        assertThat(kvSnapshotManager).isNotNull();
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    
assertThat(kvSnapshotManager.getDownloadedSstFiles()).isNotEmpty();
+                    
assertThat(kvSnapshotManager.getDownloadedMiscFiles()).isNotEmpty();
+                });
+
+        // put kv batch again
+        kvRecordBatch =
+                genKvRecordBatch(
+                        Tuple2.of("k1", new Object[] {1, "k11"}),
+                        Tuple2.of("k2", null),
+                        Tuple2.of("k3", new Object[] {3, "k3"}));
+        putKvRequest = newPutKvRequest(tableId, 0, 1, kvRecordBatch);
+        leaderGateway.putKv(putKvRequest).get();
+
+        // wait for next snapshot is available
+        final long snapshot2Id = 1;
+        completedSnapshot =
+                waitValue(
+                                () -> completedSnapshotHandleStore.get(tb0, 
snapshot2Id),
+                                Duration.ofMinutes(2),
+                                "Fail to wait for the snapshot 0 for bucket " 
+ tb0)
+                        .retrieveCompleteSnapshot();
+
+        // check snapshot
+        expectedKeyValues =
+                getKeyValuePairs(
+                        genKvRecords(
+                                Tuple2.of("k1", new Object[] {1, "k11"}),
+                                Tuple2.of("k3", new Object[] {3, "k3"})));
+        checkSnapshot(completedSnapshot, expectedKeyValues, 6);
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    
assertThat(kvSnapshotManager.getDownloadedSstFiles()).isNotEmpty();
+                    
assertThat(kvSnapshotManager.getDownloadedMiscFiles()).isNotEmpty();
+                });
+    }
+
+    @Test
+    void testStandbyPromotedToLeaderOnLeaderShutdown() throws Exception {
+        TablePath tablePath = TablePath.of("test_db", 
"test_table_standby_promote");
+        long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
DATA1_TABLE_DESCRIPTOR_PK);
+        TableBucket tb0 = new TableBucket(tableId, 0);
+
+        FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb0);
+        int leaderServer = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb0);
+
+        // put some data
+        KvRecordBatch kvRecordBatch =
+                genKvRecordBatch(
+                        Tuple2.of("k1", new Object[] {1, "k1"}),
+                        Tuple2.of("k2", new Object[] {2, "k2"}));
+        PutKvRequest putKvRequest = newPutKvRequest(tableId, 0, -1, 
kvRecordBatch);
+
+        TabletServerGateway leaderGateway =
+                
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderServer);
+        leaderGateway.putKv(putKvRequest).get();
+
+        // wait for snapshot to be available
+        final long snapshot1Id = 0;
+        CompletedSnapshot completedSnapshot =
+                waitValue(
+                                () -> completedSnapshotHandleStore.get(tb0, 
snapshot1Id),
+                                Duration.ofMinutes(2),
+                                "Fail to wait for the snapshot 0 for bucket " 
+ tb0)
+                        .retrieveCompleteSnapshot();
+
+        // check snapshot
+        List<Tuple2<byte[], byte[]>> expectedKeyValues =
+                getKeyValuePairs(
+                        genKvRecords(
+                                Tuple2.of("k1", new Object[] {1, "k1"}),
+                                Tuple2.of("k2", new Object[] {2, "k2"})));
+        checkSnapshot(completedSnapshot, expectedKeyValues, 2);
+
+        // wait for standby to be ready
+        int standbyServer = FLUSS_CLUSTER_EXTENSION.waitAndGetStandby(tb0);
+        TabletServer standbyTs = 
FLUSS_CLUSTER_EXTENSION.getTabletServerById(standbyServer);
+        Replica standbyReplica = 
standbyTs.getReplicaManager().getReplicaOrException(tb0);
+        assertThat(standbyReplica.isStandby()).isTrue();
+
+        KvSnapshotManager standbySnapshotManager = 
standbyReplica.getKvSnapshotManager();
+        assertThat(standbySnapshotManager).isNotNull();
+        retry(
+                Duration.ofMinutes(1),
+                () -> 
assertThat(standbySnapshotManager.getDownloadedSstFiles()).isNotEmpty());
+
+        // shutdown leader tablet server to trigger standby -> leader promotion
+        FLUSS_CLUSTER_EXTENSION.stopTabletServer(leaderServer);
+
+        // wait for the standby to become leader
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    int newLeader = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb0);
+                    assertThat(newLeader).isEqualTo(standbyServer);
+                });
+
+        // verify the new leader (former standby) can serve read/write requests
+        TabletServerGateway newLeaderGateway =
+                
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(standbyServer);
+
+        // put more data
+        kvRecordBatch =
+                genKvRecordBatch(
+                        Tuple2.of("k3", new Object[] {3, "k3"}),
+                        Tuple2.of("k4", new Object[] {4, "k4"}));
+        putKvRequest = newPutKvRequest(tableId, 0, 1, kvRecordBatch);
+        newLeaderGateway.putKv(putKvRequest).get();
+
+        // wait for new snapshot
+        final long snapshot2Id = 1;
+        completedSnapshot =
+                waitValue(
+                                () -> completedSnapshotHandleStore.get(tb0, 
snapshot2Id),
+                                Duration.ofMinutes(2),
+                                "Fail to wait for the snapshot 1 for bucket " 
+ tb0)
+                        .retrieveCompleteSnapshot();
+
+        // check snapshot contains all data
+        expectedKeyValues =
+                getKeyValuePairs(
+                        genKvRecords(
+                                Tuple2.of("k1", new Object[] {1, "k1"}),
+                                Tuple2.of("k2", new Object[] {2, "k2"}),
+                                Tuple2.of("k3", new Object[] {3, "k3"}),
+                                Tuple2.of("k4", new Object[] {4, "k4"})));
+        checkSnapshot(completedSnapshot, expectedKeyValues, 4);
+
+        // restart the shutdown server to restore cluster state
+        FLUSS_CLUSTER_EXTENSION.startTabletServer(leaderServer, true);
+    }
+
+    @Test
+    void testStandbyIncrementalSnapshotDownload() throws Exception {
+        TablePath tablePath = TablePath.of("test_db", 
"test_table_incremental");
+        long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
DATA1_TABLE_DESCRIPTOR_PK);
+        TableBucket tb0 = new TableBucket(tableId, 0);
+
+        FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb0);
+        int leaderServer = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb0);
+
+        // put first batch of data
+        KvRecordBatch kvRecordBatch =
+                genKvRecordBatch(
+                        Tuple2.of("k1", new Object[] {1, "k1"}),
+                        Tuple2.of("k2", new Object[] {2, "k2"}));
+        PutKvRequest putKvRequest = newPutKvRequest(tableId, 0, -1, 
kvRecordBatch);
+
+        TabletServerGateway leaderGateway =
+                
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderServer);
+        leaderGateway.putKv(putKvRequest).get();
+
+        // wait for first snapshot
+        waitValue(
+                () -> completedSnapshotHandleStore.get(tb0, 0),
+                Duration.ofMinutes(2),
+                "Fail to wait for snapshot 0");
+
+        // wait for standby to download first snapshot
+        int standbyServer = FLUSS_CLUSTER_EXTENSION.waitAndGetStandby(tb0);
+        TabletServer standbyTs = 
FLUSS_CLUSTER_EXTENSION.getTabletServerById(standbyServer);
+        Replica standbyReplica = 
standbyTs.getReplicaManager().getReplicaOrException(tb0);
+        KvSnapshotManager standbySnapshotManager = 
standbyReplica.getKvSnapshotManager();
+
+        retry(
+                Duration.ofMinutes(1),
+                () -> 
assertThat(standbySnapshotManager.getDownloadedSstFiles()).isNotEmpty());
+
+        // record the sst files after first snapshot
+        int firstSnapshotSstCount = 
standbySnapshotManager.getDownloadedSstFiles().size();
+
+        // put second batch of data (this should create new sst files)
+        kvRecordBatch =
+                genKvRecordBatch(
+                        Tuple2.of("k3", new Object[] {3, "k3"}),
+                        Tuple2.of("k4", new Object[] {4, "k4"}),
+                        Tuple2.of("k5", new Object[] {5, "k5"}));
+        putKvRequest = newPutKvRequest(tableId, 0, 1, kvRecordBatch);
+        leaderGateway.putKv(putKvRequest).get();
+
+        // wait for second snapshot
+        waitValue(
+                () -> completedSnapshotHandleStore.get(tb0, 1),
+                Duration.ofMinutes(2),
+                "Fail to wait for snapshot 1");
+
+        // wait for standby to download incremental snapshot
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    // should have more or equal sst files after incremental 
download
+                    
assertThat(standbySnapshotManager.getDownloadedSstFiles().size())
+                            .isGreaterThanOrEqualTo(firstSnapshotSstCount);
+                });
+
+        // verify standby snapshot size is updated
+        
assertThat(standbySnapshotManager.getStandbySnapshotSize()).isGreaterThan(0);
+    }
+
+    @Test
+    void testStandbyDemotionAndReStandby() throws Exception {
+        TablePath tablePath = TablePath.of("test_db", 
"test_table_standby_demotion");
+        long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
DATA1_TABLE_DESCRIPTOR_PK);
+        TableBucket tb0 = new TableBucket(tableId, 0);
+
+        FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb0);
+        int leaderServer = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb0);
+
+        // put some data and wait for snapshot
+        KvRecordBatch kvRecordBatch =
+                genKvRecordBatch(
+                        Tuple2.of("k1", new Object[] {1, "k1"}),
+                        Tuple2.of("k2", new Object[] {2, "k2"}));
+        PutKvRequest putKvRequest = newPutKvRequest(tableId, 0, -1, 
kvRecordBatch);
+        TabletServerGateway leaderGateway =
+                
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderServer);
+        leaderGateway.putKv(putKvRequest).get();
+
+        // wait for snapshot
+        waitValue(
+                () -> completedSnapshotHandleStore.get(tb0, 0),
+                Duration.ofMinutes(2),
+                "Fail to wait for snapshot 0");
+
+        // verify standby replica has downloaded snapshot
+        int standbyServer = FLUSS_CLUSTER_EXTENSION.waitAndGetStandby(tb0);
+        TabletServer standbyTs = 
FLUSS_CLUSTER_EXTENSION.getTabletServerById(standbyServer);
+        Replica standbyReplica = 
standbyTs.getReplicaManager().getReplicaOrException(tb0);
+        assertThat(standbyReplica.isStandby()).isTrue();
+
+        KvSnapshotManager standbySnapshotManager = 
standbyReplica.getKvSnapshotManager();
+        retry(
+                Duration.ofMinutes(1),
+                () -> 
assertThat(standbySnapshotManager.getDownloadedSstFiles()).isNotEmpty());
+
+        // get current leader and isr
+        LeaderAndIsr leaderAndIsr = 
FLUSS_CLUSTER_EXTENSION.waitLeaderAndIsrReady(tb0);
+        List<Integer> replicas = new ArrayList<>(leaderAndIsr.isr());
+
+        // demote standby to regular follower by sending notifyLeaderAndIsr 
without standby
+        LeaderAndIsr newLeaderAndIsr =
+                new LeaderAndIsr(
+                        leaderAndIsr.leader(),
+                        leaderAndIsr.leaderEpoch() + 1,
+                        leaderAndIsr.isr(),
+                        Collections.emptyList(), // no standby replicas
+                        leaderAndIsr.coordinatorEpoch(),
+                        leaderAndIsr.bucketEpoch());
+
+        FLUSS_CLUSTER_EXTENSION.notifyLeaderAndIsr(
+                standbyServer, tablePath, tb0, newLeaderAndIsr, replicas);
+
+        // verify the replica is no longer standby
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    Replica replica = 
standbyTs.getReplicaManager().getReplicaOrException(tb0);
+                    assertThat(replica.isStandby()).isFalse();
+                    // verify kv tablet is dropped when demoted from standby
+                    assertThat(replica.getKvTablet()).isNull();
+                    // verify standby download cache is cleared
+                    KvSnapshotManager snapshotManager = 
replica.getKvSnapshotManager();
+                    
assertThat(snapshotManager.getDownloadedSstFiles()).isNull();
+                    
assertThat(snapshotManager.getDownloadedMiscFiles()).isNull();
+                    
assertThat(snapshotManager.getStandbySnapshotSize()).isEqualTo(0);
+                });
+
+        // put more data and create another snapshot
+        kvRecordBatch =
+                genKvRecordBatch(
+                        Tuple2.of("k3", new Object[] {3, "k3"}),
+                        Tuple2.of("k4", new Object[] {4, "k4"}));
+        putKvRequest = newPutKvRequest(tableId, 0, 1, kvRecordBatch);
+        leaderGateway.putKv(putKvRequest).get();
+
+        waitValue(
+                () -> completedSnapshotHandleStore.get(tb0, 1),
+                Duration.ofMinutes(2),
+                "Fail to wait for snapshot 1");
+
+        // re-promote the replica back to standby by sending 
notifyLeaderAndIsr with standby
+        LeaderAndIsr reStandbyLeaderAndIsr =
+                new LeaderAndIsr(
+                        newLeaderAndIsr.leader(),
+                        newLeaderAndIsr.leaderEpoch() + 1,
+                        newLeaderAndIsr.isr(),
+                        Collections.singletonList(standbyServer),
+                        newLeaderAndIsr.coordinatorEpoch(),
+                        newLeaderAndIsr.bucketEpoch()); // re-add as standby
+
+        FLUSS_CLUSTER_EXTENSION.notifyLeaderAndIsr(
+                standbyServer, tablePath, tb0, reStandbyLeaderAndIsr, 
replicas);
+
+        // verify the replica is standby again and downloads the latest 
snapshot
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    Replica replica = 
standbyTs.getReplicaManager().getReplicaOrException(tb0);
+                    assertThat(replica.isStandby()).isTrue();
+                    KvSnapshotManager snapshotManager = 
replica.getKvSnapshotManager();
+                    assertThat(snapshotManager).isNotNull();
+                    
assertThat(snapshotManager.getDownloadedSstFiles()).isNotEmpty();
+                    // verify it has the latest snapshot data
+                    
assertThat(snapshotManager.getStandbySnapshotSize()).isGreaterThan(0);
+                });
+    }
+
+    @Test
+    void testStandbySnapshotDownloadFailureAndRecovery() throws Exception {
+        TablePath tablePath = TablePath.of("test_db", 
"test_table_download_failure");
+        long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
DATA1_TABLE_DESCRIPTOR_PK);
+        TableBucket tb0 = new TableBucket(tableId, 0);
+
+        FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb0);
+        int leaderServer = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb0);
+
+        // put data and create snapshot
+        KvRecordBatch kvRecordBatch =
+                genKvRecordBatch(
+                        Tuple2.of("k1", new Object[] {1, "k1"}),
+                        Tuple2.of("k2", new Object[] {2, "k2"}));
+        PutKvRequest putKvRequest = newPutKvRequest(tableId, 0, -1, 
kvRecordBatch);
+        TabletServerGateway leaderGateway =
+                
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderServer);
+        leaderGateway.putKv(putKvRequest).get();
+
+        // wait for first snapshot
+        CompletedSnapshot snapshot0 =
+                waitValue(
+                                () -> completedSnapshotHandleStore.get(tb0, 0),
+                                Duration.ofMinutes(2),
+                                "Fail to wait for snapshot 0")
+                        .retrieveCompleteSnapshot();
+
+        // get standby server and verify it's marked as standby even if 
download fails initially
+        int standbyServer = FLUSS_CLUSTER_EXTENSION.waitAndGetStandby(tb0);
+        TabletServer standbyTs = 
FLUSS_CLUSTER_EXTENSION.getTabletServerById(standbyServer);
+        Replica standbyReplica = 
standbyTs.getReplicaManager().getReplicaOrException(tb0);
+
+        // verify replica is marked as standby (even if download might fail)
+        retry(Duration.ofMinutes(1), () -> 
assertThat(standbyReplica.isStandby()).isTrue());
+
+        // wait for standby to eventually download snapshot (may retry on 
failures)
+        KvSnapshotManager standbySnapshotManager = 
standbyReplica.getKvSnapshotManager();
+        assertThat(standbySnapshotManager).isNotNull();
+
+        // verify snapshot is eventually downloaded successfully
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    
assertThat(standbySnapshotManager.getDownloadedSstFiles()).isNotEmpty();

Review Comment:
   The current test assertion lacks sufficient robustness. We must verify that, 
following various failure scenarios, the snapshot maintained by the standby 
node (provided it retains the standby role) is fully valid. Specifically, the 
snapshot must:
   (a) comprise a complete file set—not merely a partial subset;
   (b) contain SST files that permit successful database opening;
   (c) enable the node to transition seamlessly to the leader role;
   (d) support correct recovery after the role transition; and
   (e) guarantee zero data loss throughout the process.



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotManager.java:
##########
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.snapshot;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.fs.FileSystemSafetyNet;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
+import org.apache.fluss.utils.CloseableRegistry;
+import org.apache.fluss.utils.FileUtils;
+import org.apache.fluss.utils.MathUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.server.kv.snapshot.RocksIncrementalSnapshot.SST_FILE_SUFFIX;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/* This file is based on source code of Apache Flink Project 
(https://flink.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * For a leader replica of PrimaryKey Table, it is a stateless snapshot 
manager which will trigger
+ * upload kv snapshot periodically. It'll use a {@link 
ScheduledExecutorService} to schedule the
+ * snapshot initialization and a {@link ExecutorService} to complete async 
phase of snapshot.
+ *
+ * <p>For a standby replica of PrimaryKey Table, it will trigger by the
+ * NotifyKvSnapshotOffsetRequest to incremental download sst files for remote 
to keep the data up to
+ * the latest kv snapshot.
+ *
+ * <p>For a follower replica of PrimaryKey Table, it will do nothing.
+ */
+public class KvSnapshotManager implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotManager.class);
+
+    /** Number of consecutive snapshot failures. */
+    private final AtomicInteger numberOfConsecutiveFailures;
+
+    /** Whether upload snapshot is started. */
+    private volatile boolean isLeader = false;
+
+    private final long initialDelay;
+    /** The table bucket that the snapshot manager is for. */
+    private final TableBucket tableBucket;
+
+    private final File tabletDir;
+
+    private final SnapshotContext snapshotContext;
+    private final Clock clock;
+
+    /** The target on which the snapshot will be done. */
+    private @Nullable UploadSnapshotTarget uploadSnapshotTarget;
+
+    /**
+     * The scheduled snapshot task.
+     *
+     * <p>Since all reads and writes of {@code scheduledTask} are protected by 
synchronized, the
+     * volatile modifier is not necessary here.
+     */
+    private ScheduledFuture<?> scheduledTask = null;
+
+    /** The sst files downloaded for standby replicas. */
+    private @Nullable Set<Path> downloadedSstFiles;
+
+    private @Nullable Set<Path> downloadedMiscFiles;
+    private long standbySnapshotSize;
+
+    protected KvSnapshotManager(
+            TableBucket tableBucket, File tabletDir, SnapshotContext 
snapshotContext, Clock clock) {
+        this.tableBucket = tableBucket;
+        this.tabletDir = tabletDir;
+        this.snapshotContext = snapshotContext;
+        this.numberOfConsecutiveFailures = new AtomicInteger(0);
+        this.initialDelay =
+                snapshotContext.getSnapshotIntervalMs() > 0
+                        ? MathUtils.murmurHash(tableBucket.hashCode())
+                                % snapshotContext.getSnapshotIntervalMs()
+                        : 0;
+        this.clock = clock;
+        this.uploadSnapshotTarget = null;
+        this.downloadedSstFiles = null;
+        this.downloadedMiscFiles = null;
+        this.standbySnapshotSize = 0;
+    }
+
+    public static KvSnapshotManager create(
+            TableBucket tableBucket, File tabletDir, SnapshotContext 
snapshotContext, Clock clock) {
+        return new KvSnapshotManager(tableBucket, tabletDir, snapshotContext, 
clock);
+    }
+
+    public void becomeLeader() {
+        isLeader = true;
+        // Clear standby download cache when leaving standby role
+        clearStandbyDownloadCache();
+    }
+
+    public void becomeFollower() {
+        isLeader = false;
+        // Clear standby download cache when leaving standby role
+        clearStandbyDownloadCache();
+    }
+
+    public void becomeStandby() {
+        isLeader = false;
+        // Clear standby download cache when new added to standby role
+        clearStandbyDownloadCache();
+
+        // make db dir.
+        Path kvDbPath = 
tabletDir.toPath().resolve(RocksDBKvBuilder.DB_INSTANCE_DIR_STRING);
+        if (!kvDbPath.toFile().exists()) {
+            kvDbPath.toFile().mkdirs();
+        }
+    }
+
+    @VisibleForTesting
+    public @Nullable Set<Path> getDownloadedSstFiles() {
+        return downloadedSstFiles;
+    }
+
+    @VisibleForTesting
+    public @Nullable Set<Path> getDownloadedMiscFiles() {
+        return downloadedMiscFiles;
+    }
+
+    /**
+     * Clear the standby download cache.
+     *
+     * <p>This method should be called when a replica leaves the standby role 
(becomes a regular
+     * follower or leader). It clears the cached state of downloaded SST 
files, misc files, and
+     * snapshot size. This ensures that if the replica becomes standby again 
later, it will perform
+     * a fresh download based on the actual local files, rather than reusing 
stale cache that
+     * references deleted files.
+     */
+    private void clearStandbyDownloadCache() {
+        downloadedSstFiles = null;
+        downloadedMiscFiles = null;
+        standbySnapshotSize = 0;
+        LOG.info(
+                "Cleared standby download cache for table bucket {}, will 
reload from local files on next standby promotion",
+                tableBucket);
+    }
+
+    /**
+     * The guardedExecutor is an executor that uses to trigger upload snapshot.
+     *
+     * <p>It's expected to be passed with a guarded executor to prevent any 
concurrent modification
+     * to KvTablet during trigger snapshotting.
+     */
+    public void startPeriodicUploadSnapshot(
+            Executor guardedExecutor, UploadSnapshotTarget 
uploadSnapshotTarget) {
+        this.uploadSnapshotTarget = uploadSnapshotTarget;
+
+        // disable periodic snapshot when periodicMaterializeDelay is not 
positive
+        if (snapshotContext.getSnapshotIntervalMs() > 0) {
+            LOG.info("TableBucket {} starts periodic snapshot", tableBucket);
+            scheduleNextSnapshot(initialDelay, guardedExecutor);
+        }
+    }
+
+    public void downloadSnapshot(long snapshotId) throws Exception {
+        CompletedSnapshot completedSnapshot =
+                snapshotContext.getCompletedSnapshotProvider(tableBucket, 
snapshotId);
+        incrementalDownloadSnapshot(completedSnapshot);
+        standbySnapshotSize = completedSnapshot.getSnapshotSize();
+    }
+
+    /**
+     * download the latest snapshot.
+     *
+     * <p>For a standby replica, it will download the latest snapshot to keep 
the data up to the
+     * latest kv snapshot.
+     *
+     * @return the latest snapshot
+     */
+    public Optional<CompletedSnapshot> downloadLatestSnapshot() throws 
Exception {
+        // download the latest snapshot.
+        Optional<CompletedSnapshot> latestSnapshot = getLatestSnapshot();
+        if (latestSnapshot.isPresent()) {
+            incrementalDownloadSnapshot(latestSnapshot.get());
+        }
+        return latestSnapshot;
+    }
+
+    private void incrementalDownloadSnapshot(CompletedSnapshot 
completedSnapshot) throws Exception {
+        if (downloadedSstFiles == null || downloadedMiscFiles == null) {
+            // first try to load all ready exists sst files.
+            downloadedSstFiles = new HashSet<>();
+            downloadedMiscFiles = new HashSet<>();
+            loadKvLocalFiles(downloadedSstFiles, downloadedMiscFiles);

Review Comment:
   While reviewing the test code, I’ve developed serious concerns about the 
current snapshot download implementation. Let me elaborate on these concerns:
   
   - KVSnapshot emphasizes atomicity and completeness: This means that for any 
given snapshot ID, there must be a deterministic and complete set of files. 
Partial or incomplete snapshots are strictly disallowed.
   
   - Snapshots are discrete, not incremental: There is no smooth, continuous 
transition from one snapshot to another. Intermediate, incomplete states may 
not support consistent startup of RocksDB.
   
   - Log recovery cannot function correctly during an incomplete snapshot 
transition: The current log recovery mechanism lacks precise mapping of 
recovery offsets, making it impossible to reliably recover from a partially 
downloaded snapshot.
   
   - Real-world interruptions are not adequately handled: In practice, 
downloads can be interrupted at any point. However, neither the current 
implementation nor the existing tests fully validate how the system recovers 
from such intermediate, inconsistent states.
   
   Recommendations for Implementation:
   - Use a temporary directory for all file downloads and snapshot operations: 
Avoid performing any writes directly in the actual KvTablet directory. Instead, 
stage everything in a temporary directory.
   - Atomically "commit" only after full consistency is achieved: Only when the 
temporary directory contains a complete, consistent snapshot—matching the 
expected file list for the given snapshot ID, and ideally verified with 
checksums—should it be atomically moved (e.g., via rename) into the final 
location.
   
   Recommendations for Testing:
   - Add test cases that explicitly validate snapshot consistency: Ensure that 
every snapshot loaded by the system meets the required completeness and 
integrity criteria.
   - Introduce fault injection in tests: Simulate real-world failure scenarios 
(e.g., abrupt process termination during download) to verify that snapshot 
integrity is never compromised, even under adverse conditions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to