This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 77761fb7c IGNITE-17303 RocksDB snapshots might include writes added 
after snapshot creation start (#924)
77761fb7c is described below

commit 77761fb7c1f7b7c669ea8525741bb4d10a72f030
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Jul 5 19:06:13 2022 +0400

    IGNITE-17303 RocksDB snapshots might include writes added after snapshot 
creation start (#924)
---
 .../raft/AbstractClusterStateStorageTest.java      | 39 ++++++++++++++++++++++
 .../raft/ConcurrentMapClusterStateStorage.java     | 16 ++++-----
 .../raft/client/service/RaftGroupListener.java     |  2 ++
 .../rocksdb/snapshot/RocksSnapshotManager.java     | 21 +++++++-----
 4 files changed, 62 insertions(+), 16 deletions(-)

diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
index cc4fe1b80..881defe3f 100644
--- 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.Cursor;
@@ -269,4 +270,42 @@ public abstract class AbstractClusterStateStorageTest {
         assertThat(storage.get(key1), is(value1));
         assertThat(storage.get(key2), is(value2));
     }
+
+    /**
+     * Tests that writes coming after a snapshot is started do not get 
reflected in the snapshot.
+     *
+     * @throws Exception If something goes wrong.
+     */
+    @Test
+    void snapshotShouldNotContainWritesAddedAfterItsStart() throws Exception {
+        final int entriesInSnapshot = 100_000;
+
+        for (int i = 0; i < entriesInSnapshot; i++) {
+            putKeyValue(i);
+        }
+
+        Path snapshotDirPath = workDir.resolve("snapshot");
+        Files.createDirectories(snapshotDirPath);
+
+        CompletableFuture<Void> snapshotFuture = 
storage.snapshot(snapshotDirPath);
+
+        for (int i = entriesInSnapshot; i < entriesInSnapshot + 1000; i++) {
+            putKeyValue(i);
+        }
+
+        snapshotFuture.join();
+
+        storage.restoreSnapshot(snapshotDirPath);
+
+        byte[] keyAddedAfterSnapshotStart = key(entriesInSnapshot);
+        assertThat(storage.get(keyAddedAfterSnapshotStart), is(nullValue()));
+    }
+
+    private void putKeyValue(int n) {
+        storage.put(key(n), ("value" + n).getBytes(UTF_8));
+    }
+
+    private byte[] key(int n) {
+        return ("key" + n).getBytes(UTF_8);
+    }
 }
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
index ef89843ef..5681d7d03 100644
--- 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
@@ -111,16 +111,16 @@ public class ConcurrentMapClusterStateStorage implements 
ClusterStateStorage {
 
     @Override
     public CompletableFuture<Void> snapshot(Path snapshotPath) {
-        return CompletableFuture.runAsync(() -> {
-            try (var out = new 
ObjectOutputStream(Files.newOutputStream(snapshotPath.resolve(SNAPSHOT_FILE)))) 
{
-                var keys = new ArrayList<byte[]>(map.size());
-                var values = new ArrayList<byte[]>(map.size());
+        var keys = new ArrayList<byte[]>(map.size());
+        var values = new ArrayList<byte[]>(map.size());
 
-                map.forEach((k, v) -> {
-                    keys.add(k.bytes());
-                    values.add(v);
-                });
+        map.forEach((k, v) -> {
+            keys.add(k.bytes());
+            values.add(v);
+        });
 
+        return CompletableFuture.runAsync(() -> {
+            try (var out = new 
ObjectOutputStream(Files.newOutputStream(snapshotPath.resolve(SNAPSHOT_FILE)))) 
{
                 out.writeObject(keys);
                 out.writeObject(values);
             } catch (Exception e) {
diff --git 
a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
 
b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
index 8035d392f..c23f842db 100644
--- 
a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
+++ 
b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
@@ -54,6 +54,8 @@ public interface RaftGroupListener {
 
     /**
      * The callback to save a snapshot. The execution should be asynchronous 
to avoid blocking of STM updates.
+     * But the snapshot coordinates (or copy-of-data-to-include-in-snapshot) 
must be taken synchronously before starting the asynchronous
+     * snapshotting process.
      *
      * @param path    Snapshot directory to store data.
      * @param doneClo The closure to call on finish. Pass the not null 
exception if the snapshot has not been created or null on successful
diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
index 6c8728804..b5a204526 100644
--- 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.Function;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -77,8 +78,11 @@ public class RocksSnapshotManager {
     public CompletableFuture<Void> createSnapshot(Path snapshotDir) {
         Path tmpPath = Paths.get(snapshotDir.toString() + TMP_SUFFIX);
 
-        return CompletableFuture.supplyAsync(db::getSnapshot, executor)
-                .thenComposeAsync(snapshot -> {
+        // The snapshot reference must be taken synchronously, otherwise we 
might let more writes sneak into the snapshot than needed.
+        Snapshot snapshot = db.getSnapshot();
+
+        return CompletableFuture.supplyAsync(
+                () -> {
                     createTmpSnapshotDir(tmpPath);
 
                     // Create futures for capturing SST snapshots of the 
column families
@@ -86,19 +90,20 @@ public class RocksSnapshotManager {
                             .map(cf -> createSstFileAsync(cf, snapshot, 
tmpPath))
                             .toArray(CompletableFuture[]::new);
 
-                    return CompletableFuture.allOf(sstFutures).thenApply(v -> 
snapshot);
+                    return CompletableFuture.allOf(sstFutures);
                 }, executor)
-                .whenCompleteAsync((snapshot, e) -> {
-                    if (e != null) {
-                        return;
-                    }
-
+                .thenCompose(Function.identity())
+                .whenCompleteAsync((ignored, e) -> {
                     db.releaseSnapshot(snapshot);
 
                     // Snapshot is not actually closed here, because a 
Snapshot instance doesn't own a pointer, the
                     // database does. Calling close to maintain the 
AutoCloseable semantics
                     snapshot.close();
 
+                    if (e != null) {
+                        return;
+                    }
+
                     // Delete snapshot directory if it already exists
                     IgniteUtils.deleteIfExists(snapshotDir);
 

Reply via email to