This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e68a266c [KV] Avoid TabletServer to discard already commited kv 
snapshot (#1738)
1e68a266c is described below

commit 1e68a266c37200b1d5b11921555cc1d42c4d7b6c
Author: Yang Wang <[email protected]>
AuthorDate: Sun Sep 28 21:05:43 2025 +0800

    [KV] Avoid TabletServer to discard already commited kv snapshot (#1738)
---
 .../server/kv/snapshot/KvTabletSnapshotTarget.java | 105 +++++++++-
 .../kv/snapshot/RocksIncrementalSnapshot.java      |   3 +-
 .../org/apache/fluss/server/replica/Replica.java   |   1 +
 .../kv/snapshot/KvTabletSnapshotTargetTest.java    | 227 ++++++++++++++++++++-
 4 files changed, 322 insertions(+), 14 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
index d982d486a..c84a26b70 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
@@ -23,6 +23,8 @@ import org.apache.fluss.fs.FileSystem;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.server.SequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.BucketSnapshot;
 import org.apache.fluss.utils.CloseableRegistry;
 import org.apache.fluss.utils.ExceptionUtils;
 import org.apache.fluss.utils.FlussPaths;
@@ -30,6 +32,7 @@ import org.apache.fluss.utils.FlussPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
@@ -55,6 +58,8 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
 
     private final CompletedKvSnapshotCommitter completedKvSnapshotCommitter;
 
+    private final ZooKeeperClient zooKeeperClient;
+
     private final RocksIncrementalSnapshot rocksIncrementalSnapshot;
     private final FsPath remoteKvTabletDir;
     private final FsPath remoteSnapshotSharedDir;
@@ -82,6 +87,7 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
     KvTabletSnapshotTarget(
             TableBucket tableBucket,
             CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
+            ZooKeeperClient zooKeeperClient,
             RocksIncrementalSnapshot rocksIncrementalSnapshot,
             FsPath remoteKvTabletDir,
             Executor ioExecutor,
@@ -97,6 +103,7 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
         this(
                 tableBucket,
                 completedKvSnapshotCommitter,
+                zooKeeperClient,
                 rocksIncrementalSnapshot,
                 remoteKvTabletDir,
                 (int) 
ConfigOptions.REMOTE_FS_WRITE_BUFFER_SIZE.defaultValue().getBytes(),
@@ -114,6 +121,7 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
     public KvTabletSnapshotTarget(
             TableBucket tableBucket,
             CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
+            @Nonnull ZooKeeperClient zooKeeperClient,
             RocksIncrementalSnapshot rocksIncrementalSnapshot,
             FsPath remoteKvTabletDir,
             int snapshotWriteBufferSize,
@@ -129,6 +137,7 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
             throws IOException {
         this.tableBucket = tableBucket;
         this.completedKvSnapshotCommitter = completedKvSnapshotCommitter;
+        this.zooKeeperClient = zooKeeperClient;
         this.rocksIncrementalSnapshot = rocksIncrementalSnapshot;
         this.remoteKvTabletDir = remoteKvTabletDir;
         this.remoteSnapshotSharedDir = 
FlussPaths.remoteKvSharedDir(remoteKvTabletDir);
@@ -211,18 +220,13 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
             // commit the completed snapshot
             completedKvSnapshotCommitter.commitKvSnapshot(
                     completedSnapshot, coordinatorEpoch, bucketLeaderEpoch);
-            // notify the snapshot complete
-            rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId);
-            logOffsetOfLatestSnapshot = snapshotResult.getLogOffset();
-            snapshotSize = snapshotResult.getSnapshotSize();
-            // update LogTablet to notify the lowest offset that should be 
retained
-            updateMinRetainOffset.accept(snapshotResult.getLogOffset());
+            // update local state after successful commit
+            updateStateOnCommitSuccess(snapshotId, snapshotResult);
         } catch (Exception e) {
             Throwable t = ExceptionUtils.stripExecutionException(e);
-            snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, 
ioExecutor);
-            handleSnapshotFailure(snapshotId, snapshotLocation, t);
-            // throw the exception to make PeriodicSnapshotManager can catch 
the exception
-            throw t;
+            // handle the exception with idempotent check
+            handleSnapshotCommitException(
+                    snapshotId, snapshotResult, completedSnapshot, 
snapshotLocation, t);
         }
     }
 
@@ -249,6 +253,87 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
         return rocksIncrementalSnapshot;
     }
 
+    /**
+     * Update local state after successful snapshot completion. This includes 
notifying RocksDB
+     * about completion, updating latest snapshot offset/size, and notifying 
LogTablet about the
+     * minimum offset to retain.
+     */
+    private void updateStateOnCommitSuccess(long snapshotId, SnapshotResult 
snapshotResult) {
+        // notify the snapshot complete
+        rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId);
+        logOffsetOfLatestSnapshot = snapshotResult.getLogOffset();
+        snapshotSize = snapshotResult.getSnapshotSize();
+        // update LogTablet to notify the lowest offset that should be retained
+        updateMinRetainOffset.accept(snapshotResult.getLogOffset());
+    }
+
+    /**
+     * Handle snapshot commit exception with idempotent check. This method 
implements the fix for
+     * issue #1304 by double-checking ZooKeeper to verify if the snapshot 
actually exists before
+     * cleanup.
+     */
+    private void handleSnapshotCommitException(
+            long snapshotId,
+            SnapshotResult snapshotResult,
+            CompletedSnapshot completedSnapshot,
+            SnapshotLocation snapshotLocation,
+            Throwable t)
+            throws Throwable {
+
+        // Fix for issue: https://github.com/apache/fluss/issues/1304
+        // Tablet server try to commit kv snapshot to coordinator server,
+        // coordinator server commit the kv snapshot to zk, then failover.
+        // Tablet server will got exception from coordinator server, but 
mistake it as a fail
+        // commit although coordinator server has committed to zk, then 
discard the commited kv
+        // snapshot.
+        //
+        // Idempotent check: Double check ZK to verify if the snapshot 
actually exists before
+        // cleanup
+        try {
+            Optional<BucketSnapshot> zkSnapshot =
+                    zooKeeperClient.getTableBucketSnapshot(tableBucket, 
snapshotId);
+            if (zkSnapshot.isPresent()) {
+                // Snapshot exists in ZK, indicating the commit was actually 
successful,
+                // just response was lost
+                LOG.warn(
+                        "Snapshot {} for TableBucket {} already exists in ZK. "
+                                + "The commit was successful but response was 
lost due to coordinator failover. "
+                                + "Skipping cleanup and treating as 
successful.",
+                        snapshotId,
+                        tableBucket);
+
+                // Update local state as if the commit was successful
+                updateStateOnCommitSuccess(snapshotId, snapshotResult);
+                return; // Snapshot commit succeeded, return directly
+            } else {
+                // Snapshot does not exist in ZK, indicating the commit truly 
failed
+                LOG.warn(
+                        "Snapshot {} for TableBucket {} does not exist in ZK. "
+                                + "The commit truly failed, proceeding with 
cleanup.",
+                        snapshotId,
+                        tableBucket);
+                snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, 
ioExecutor);
+                handleSnapshotFailure(snapshotId, snapshotLocation, t);
+            }
+        } catch (Exception zkException) {
+            LOG.warn(
+                    "Failed to query ZK for snapshot {} of TableBucket {}. "
+                            + "Cannot determine actual snapshot status, 
keeping snapshot in current state "
+                            + "to avoid potential data loss.",
+                    snapshotId,
+                    tableBucket,
+                    zkException);
+            // When ZK query fails, we cannot determine the actual status.
+            // The snapshot might have succeeded or failed on the ZK side.
+            // Therefore, we must not clean up the snapshot files and not 
update local state.
+            // This avoids the risk of discarding a successfully committed 
snapshot that
+            // connectors may already be reading, which would cause data loss 
or job failure.
+        }
+
+        // throw the exception to make PeriodicSnapshotManager can catch the 
exception
+        throw t;
+    }
+
     private SnapshotRunner createSnapshotRunner(CloseableRegistry 
cancelStreamRegistry) {
         return new SnapshotRunner(rocksIncrementalSnapshot, 
cancelStreamRegistry);
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
index 0723fd488..fbfa42e1f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
@@ -341,7 +341,8 @@ public class RocksIncrementalSnapshot implements 
AutoCloseable {
 
         @Nonnull private final Map<String, KvFileHandle> confirmedSstFiles;
 
-        private PreviousSnapshot(@Nullable 
Collection<KvFileHandleAndLocalPath> confirmedSstFiles) {
+        protected PreviousSnapshot(
+                @Nullable Collection<KvFileHandleAndLocalPath> 
confirmedSstFiles) {
             this.confirmedSstFiles =
                     confirmedSstFiles != null
                             ? confirmedSstFiles.stream()
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 3b9b11c47..3b7ac5b7f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -849,6 +849,7 @@ public final class Replica {
                     new KvTabletSnapshotTarget(
                             tableBucket,
                             completedKvSnapshotCommitter,
+                            snapshotContext.getZooKeeperClient(),
                             rocksIncrementalSnapshot,
                             remoteKvTabletDir,
                             snapshotContext.getSnapshotFsWriteBufferSize(),
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
index 9b5a3b161..9f9ea078d 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
@@ -29,9 +29,12 @@ import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
 import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 import org.apache.fluss.server.testutils.KvTestUtils;
 import org.apache.fluss.server.utils.ResourceGuard;
+import org.apache.fluss.server.zk.CuratorFrameworkWithUnhandledErrorListener;
 import org.apache.fluss.server.zk.NOPErrorHandler;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.data.BucketSnapshot;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.fluss.testutils.common.AllCallbackWrapper;
 import 
org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService;
 import org.apache.fluss.utils.CloseableRegistry;
@@ -55,15 +58,19 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
+import static org.apache.fluss.server.zk.ZooKeeperUtils.startZookeeperClient;
+import static 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory.Builder;
+import static 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory.builder;
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -297,6 +304,150 @@ class KvTabletSnapshotTargetTest {
         assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(1L);
     }
 
+    @Test
+    void testIdempotentCheckWhenSnapshotExistsInZK(@TempDir Path kvTabletDir) 
throws Exception {
+        // Test case: coordinator commits to ZK successfully but response is 
lost due to failover
+        // The tablet server should detect the snapshot exists in ZK and skip 
cleanup
+        CompletedSnapshotHandleStore completedSnapshotHandleStore =
+                new ZooKeeperCompletedSnapshotHandleStore(zooKeeperClient);
+        FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile());
+
+        // Create a committer that commits to ZK first, then throws exception
+        // This simulates coordinator failover after ZK commit but before 
response
+        CompletedKvSnapshotCommitter failingAfterZkCommitCommitter =
+                (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> {
+                    // Always commit to ZK first
+                    CompletedSnapshotHandle handle =
+                            new CompletedSnapshotHandle(
+                                    snapshot.getSnapshotID(),
+                                    snapshot.getSnapshotLocation(),
+                                    snapshot.getLogOffset());
+                    completedSnapshotHandleStore.add(
+                            snapshot.getTableBucket(), 
snapshot.getSnapshotID(), handle);
+
+                    // Then throw exception - simulating coordinator failover 
after ZK commit
+                    throw new FlussException("Coordinator failover after ZK 
commit");
+                };
+
+        KvTabletSnapshotTarget kvTabletSnapshotTarget =
+                createSnapshotTargetWithCustomCommitter(
+                        remoteKvTabletDir, failingAfterZkCommitCommitter);
+
+        periodicSnapshotManager = 
createSnapshotManager(kvTabletSnapshotTarget);
+        periodicSnapshotManager.start();
+
+        RocksDB rocksDB = rocksDBExtension.getRocksDb();
+        rocksDB.put("key1".getBytes(), "val1".getBytes());
+
+        // Trigger snapshot - will commit to ZK but throw exception
+        periodicSnapshotManager.triggerSnapshot();
+        long snapshotId1 = 1;
+
+        TestRocksIncrementalSnapshot rocksIncrementalSnapshot =
+                (TestRocksIncrementalSnapshot) 
kvTabletSnapshotTarget.getRocksIncrementalSnapshot();
+
+        // The snapshot should be treated as successful due to idempotent check
+        // Even though commit threw exception, idempotent check should find it 
in ZK
+        retry(
+                Duration.ofMinutes(1),
+                () ->
+                        assertThat(rocksIncrementalSnapshot.completedSnapshots)
+                                .contains(snapshotId1));
+
+        // Verify snapshot was not cleaned up and state was updated correctly
+        FsPath snapshotPath1 = 
FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1);
+        
assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isTrue();
+        assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(1L);
+    }
+
+    @Test
+    void testIdempotentCheckWhenSnapshotNotExistsInZK(@TempDir Path 
kvTabletDir) throws Exception {
+        // Test case: genuine commit failure - snapshot should not exist in ZK 
and cleanup should
+        // occur
+        FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile());
+
+        // Create a committer that always fails - simulating genuine 
coordinator failure
+        CompletedKvSnapshotCommitter alwaysFailingCommitter =
+                (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> {
+                    throw new FlussException(
+                            "Genuine coordinator failure - snapshot not 
committed to ZK");
+                };
+
+        KvTabletSnapshotTarget kvTabletSnapshotTarget =
+                createSnapshotTargetWithCustomCommitter(remoteKvTabletDir, 
alwaysFailingCommitter);
+
+        periodicSnapshotManager = 
createSnapshotManager(kvTabletSnapshotTarget);
+        periodicSnapshotManager.start();
+
+        RocksDB rocksDB = rocksDBExtension.getRocksDb();
+        rocksDB.put("key1".getBytes(), "val1".getBytes());
+        periodicSnapshotManager.triggerSnapshot();
+
+        long snapshotId1 = 1;
+        TestRocksIncrementalSnapshot rocksIncrementalSnapshot =
+                (TestRocksIncrementalSnapshot) 
kvTabletSnapshotTarget.getRocksIncrementalSnapshot();
+
+        // The snapshot should be aborted since it genuinely failed
+        retry(
+                Duration.ofMinutes(1),
+                () -> 
assertThat(rocksIncrementalSnapshot.abortedSnapshots).contains(snapshotId1));
+
+        // Verify cleanup occurred
+        FsPath snapshotPath1 = 
FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1);
+        
assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isFalse();
+        
assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(Long.MAX_VALUE);
+    }
+
+    @Test
+    void testIdempotentCheckWhenZKQueryFails(@TempDir Path kvTabletDir) throws 
Exception {
+        // Test case: ZK query fails - should keep snapshot in current state 
to avoid data loss
+        FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile());
+
+        // Create a failing ZK client that throws exception to simulate ZK 
query failure
+        ZooKeeperClient failingZkClient = createFailingZooKeeperClient();
+
+        CompletedKvSnapshotCommitter failingCommitter =
+                (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> {
+                    throw new FlussException("Commit failed");
+                };
+
+        KvTabletSnapshotTarget kvTabletSnapshotTarget =
+                createSnapshotTargetWithCustomZkAndCommitter(
+                        remoteKvTabletDir, failingZkClient, failingCommitter);
+
+        periodicSnapshotManager = 
createSnapshotManager(kvTabletSnapshotTarget);
+        periodicSnapshotManager.start();
+
+        RocksDB rocksDB = rocksDBExtension.getRocksDb();
+        rocksDB.put("key1".getBytes(), "val1".getBytes());
+        periodicSnapshotManager.triggerSnapshot();
+
+        long snapshotId1 = 1;
+        TestRocksIncrementalSnapshot rocksIncrementalSnapshot =
+                (TestRocksIncrementalSnapshot) 
kvTabletSnapshotTarget.getRocksIncrementalSnapshot();
+
+        // Wait for snapshot processing to complete
+        // The snapshot should be created but commit will fail, then ZK query 
will fail
+        // In this case, the new logic should preserve the snapshot files (no 
cleanup)
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    // Verify that snapshot creation happened but neither 
completion nor abortion
+                    // occurred
+                    // Since both commit and ZK query failed, snapshot should 
remain in limbo state
+                    FsPath snapshotPath1 =
+                            FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, 
snapshotId1);
+                    
assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isTrue();
+                    assertThat(rocksIncrementalSnapshot.abortedSnapshots)
+                            .doesNotContain(snapshotId1);
+                    assertThat(rocksIncrementalSnapshot.completedSnapshots)
+                            .doesNotContain(snapshotId1);
+                });
+
+        // Verify local state was not updated (remain unchanged)
+        
assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(Long.MAX_VALUE);
+    }
+
     private PeriodicSnapshotManager createSnapshotManager(
             PeriodicSnapshotManager.SnapshotTarget target) {
         return new PeriodicSnapshotManager(
@@ -343,6 +494,45 @@ class KvTabletSnapshotTargetTest {
         return new KvTabletSnapshotTarget(
                 tableBucket,
                 new 
TestingStoreCompletedKvSnapshotCommitter(completedSnapshotStore),
+                zooKeeperClient,
+                rocksIncrementalSnapshot,
+                remoteKvTabletDir,
+                executor,
+                cancelStreamRegistry,
+                testingSnapshotIdCounter,
+                logOffsetGenerator::get,
+                updateMinRetainOffsetConsumer::set,
+                bucketLeaderEpochSupplier,
+                coordinatorEpochSupplier,
+                0,
+                0L);
+    }
+
+    private KvTabletSnapshotTarget createSnapshotTargetWithCustomCommitter(
+            FsPath remoteKvTabletDir, CompletedKvSnapshotCommitter 
customCommitter)
+            throws IOException {
+        return createSnapshotTargetWithCustomZkAndCommitter(
+                remoteKvTabletDir, zooKeeperClient, customCommitter);
+    }
+
+    private KvTabletSnapshotTarget 
createSnapshotTargetWithCustomZkAndCommitter(
+            FsPath remoteKvTabletDir,
+            ZooKeeperClient zkClient,
+            CompletedKvSnapshotCommitter customCommitter)
+            throws IOException {
+        TableBucket tableBucket = new TableBucket(1, 1);
+        Executor executor = Executors.directExecutor();
+        RocksIncrementalSnapshot rocksIncrementalSnapshot =
+                createIncrementalSnapshot(SnapshotFailType.NONE);
+        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
+        TestingSnapshotIDCounter testingSnapshotIdCounter = new 
TestingSnapshotIDCounter();
+        Supplier<Integer> bucketLeaderEpochSupplier = () -> 0;
+        Supplier<Integer> coordinatorEpochSupplier = () -> 0;
+
+        return new KvTabletSnapshotTarget(
+                tableBucket,
+                customCommitter,
+                zkClient,
                 rocksIncrementalSnapshot,
                 remoteKvTabletDir,
                 executor,
@@ -380,10 +570,41 @@ class KvTabletSnapshotTargetTest {
                 snapshotFailType);
     }
 
+    private ZooKeeperClient createFailingZooKeeperClient() {
+        // Create a ZooKeeperClient that throws exception on 
getTableBucketSnapshot
+        return new FailingZooKeeperClient();
+    }
+
+    private static class FailingZooKeeperClient extends ZooKeeperClient {
+
+        public FailingZooKeeperClient() {
+            // Create a new ZooKeeperClient using 
ZooKeeperUtils.startZookeeperClient
+            super(createCuratorFrameworkWrapper(), new Configuration());
+        }
+
+        private static CuratorFrameworkWithUnhandledErrorListener 
createCuratorFrameworkWrapper() {
+            Builder builder =
+                    builder()
+                            .connectString(
+                                    ZOO_KEEPER_EXTENSION_WRAPPER
+                                            .getCustomExtension()
+                                            .getConnectString())
+                            .retryPolicy(new ExponentialBackoffRetry(1000, 3));
+
+            return startZookeeperClient(builder, NOPErrorHandler.INSTANCE);
+        }
+
+        @Override
+        public Optional<BucketSnapshot> getTableBucketSnapshot(
+                TableBucket tableBucket, long snapshotId) throws Exception {
+            throw new Exception("ZK query failed");
+        }
+    }
+
     private static final class TestRocksIncrementalSnapshot extends 
RocksIncrementalSnapshot {
 
-        private final Set<Long> abortedSnapshots = new HashSet<>();
-        private final Set<Long> completedSnapshots = new HashSet<>();
+        private final Set<Long> abortedSnapshots = 
ConcurrentHashMap.newKeySet();
+        private final Set<Long> completedSnapshots = 
ConcurrentHashMap.newKeySet();
         private final SnapshotFailType snapshotFailType;
 
         public TestRocksIncrementalSnapshot(

Reply via email to