This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4871350087 IGNITE-22105 Add busy lock to RocksDbClusterStateStorage 
(#3659)
4871350087 is described below

commit 48713500874a9b8115f1bece6607abc514cf692a
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Apr 25 16:58:27 2024 +0300

    IGNITE-22105 Add busy lock to RocksDbClusterStateStorage (#3659)
---
 .../management/ClusterManagementGroupManager.java  |  93 ++++++----
 .../management/raft/ClusterStateStorage.java       |  13 +-
 .../management/raft/CmgRaftGroupListener.java      |   3 +-
 .../management/raft/RaftStorageManager.java        |  12 +-
 .../raft/RocksDbClusterStateStorage.java           | 201 +++++++++++----------
 .../raft/AbstractClusterStateStorageTest.java      |  80 +++-----
 .../management/raft/TestClusterStateStorage.java   | 167 ++++++++---------
 7 files changed, 269 insertions(+), 300 deletions(-)

diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 0a5fae1cca..756d27195d 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -24,7 +24,9 @@ import static java.util.stream.Collectors.toSet;
 import static java.util.stream.Collectors.toUnmodifiableSet;
 import static 
org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import java.util.Collection;
 import java.util.List;
@@ -350,42 +352,56 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
      * </ol>
      */
     private void onElectedAsLeader(long term) {
-        LOG.info("CMG leader has been elected, executing onLeaderElected 
callback");
-
-        // The cluster state is broadcast via the messaging service; hence, 
the future must be completed here on the leader node.
-        // TODO: This needs to be reworked following the implementation of 
IGNITE-18275.
-        raftServiceAfterJoin()
-                .thenCompose(CmgRaftService::readClusterState)
-                .thenAccept(state -> 
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
-
-        raftServiceAfterJoin()
-                .thenCompose(this::updateLogicalTopology)
-                .thenCompose(service -> 
service.updateLearners(term).thenApply(unused -> service))
-                .thenAccept(service -> {
-                    // Register a listener to send ClusterState messages to 
new nodes.
-                    TopologyService topologyService = 
clusterService.topologyService();
-
-                    // TODO: remove listeners if leadership is lost, see 
https://issues.apache.org/jira/browse/IGNITE-16842
-                    
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
-
-                    // Send the ClusterStateMessage to all members of the 
physical topology. We do not wait for the send operation
-                    // because being unable to send ClusterState messages 
should not fail the CMG service startup.
-                    // TODO: IGNITE-18275 - use RAFT replication instead of 
message sending
-                    ClusterNode thisNode = topologyService.localMember();
-
-                    Collection<ClusterNode> otherNodes = 
topologyService.allMembers().stream()
-                            .filter(node -> !thisNode.equals(node))
-                            .collect(toList());
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping onLeaderElected callback, because the node is 
stopping");
 
-                    sendClusterState(service, otherNodes);
-                })
-                .whenComplete((v, e) -> {
-                    if (e != null) {
-                        LOG.warn("Error when executing onLeaderElected 
callback", e);
-                    } else {
-                        LOG.info("onLeaderElected callback executed 
successfully");
-                    }
-                });
+            return;
+        }
+
+        try {
+            LOG.info("CMG leader has been elected, executing onLeaderElected 
callback");
+
+            // The cluster state is broadcast via the messaging service; 
hence, the future must be completed here on the leader node.
+            // TODO: This needs to be reworked following the implementation of 
IGNITE-18275.
+            raftServiceAfterJoin().thenAccept(service -> inBusyLock(busyLock, 
() -> {
+                service.readClusterState()
+                        .thenAccept(state -> 
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
+
+                updateLogicalTopology(service)
+                        .thenCompose(v -> inBusyLock(busyLock, () -> 
service.updateLearners(term)))
+                        .thenAccept(v -> inBusyLock(busyLock, () -> {
+                            // Register a listener to send ClusterState 
messages to new nodes.
+                            TopologyService topologyService = 
clusterService.topologyService();
+
+                            // TODO: remove listeners if leadership is lost, 
see https://issues.apache.org/jira/browse/IGNITE-16842
+                            
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+                            // Send the ClusterStateMessage to all members of 
the physical topology. We do not wait for the send operation
+                            // because being unable to send ClusterState 
messages should not fail the CMG service startup.
+                            // TODO: IGNITE-18275 - use RAFT replication 
instead of message sending
+                            ClusterNode thisNode = 
topologyService.localMember();
+
+                            Collection<ClusterNode> otherNodes = 
topologyService.allMembers().stream()
+                                    .filter(node -> !thisNode.equals(node))
+                                    .collect(toList());
+
+                            sendClusterState(service, otherNodes);
+                        }))
+                        .whenComplete((v, e) -> {
+                            if (e != null) {
+                                if (unwrapCause(e) instanceof 
NodeStoppingException) {
+                                    LOG.info("Unable to execute 
onLeaderElected callback, because the node is stopping", e);
+                                } else {
+                                    LOG.error("Error when executing 
onLeaderElected callback", e);
+                                }
+                            } else {
+                                LOG.info("onLeaderElected callback executed 
successfully");
+                            }
+                        });
+            }));
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -393,9 +409,9 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
      * physical topology during the election. Newly appeared nodes will be 
added automatically after the new leader broadcasts the current
      * cluster state.
      */
-    private CompletableFuture<CmgRaftService> 
updateLogicalTopology(CmgRaftService service) {
+    private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
         return service.logicalTopology()
-                .thenCompose(logicalTopology -> {
+                .thenCompose(logicalTopology -> inBusyLock(busyLock, () -> {
                     Set<String> physicalTopologyIds = 
clusterService.topologyService().allMembers()
                             .stream()
                             .map(ClusterNode::id)
@@ -407,8 +423,7 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
 
                     // TODO: IGNITE-18681 - respect removal timeout.
                     return nodesToRemove.isEmpty() ? nullCompletedFuture() : 
service.removeFromCluster(nodesToRemove);
-                })
-                .thenApply(v -> service);
+                }));
     }
 
     private void handleCancelInit(CancelInitMessage msg) {
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorage.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorage.java
index e8a19fdf6e..f1d94b290f 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorage.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorage.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.cluster.management.raft;
 
 import java.nio.file.Path;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -69,14 +69,14 @@ public interface ClusterStateStorage extends 
IgniteComponent {
     void removeAll(Collection<byte[]> keys);
 
     /**
-     * Creates a cursor over a range of keys, starting with the given prefix.
+     * Creates a list containing a range of keys, starting with the given 
prefix.
      *
      * @param prefix Key prefix.
      * @param entryTransformer Entry transformation function.
      * @param <T> Type of converted entry.
-     * @return Cursor over a range of existing keys.
+     * @return List containing a range of existing keys.
      */
-    <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[], T> 
entryTransformer);
+    <T> List<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[], T> 
entryTransformer);
 
     /**
      * Creates a snapshot of the storage's current state in the specified 
directory.
@@ -92,9 +92,4 @@ public interface ClusterStateStorage extends IgniteComponent {
      * @param snapshotPath Path to the snapshot's directory.
      */
     void restoreSnapshot(Path snapshotPath);
-
-    /**
-     * Removes all data from the storage and frees all resources.
-     */
-    void destroy();
 }
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index dbc24580bc..58495da214 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -25,6 +25,7 @@ import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
@@ -105,7 +106,7 @@ public class CmgRaftGroupListener implements 
RaftGroupListener {
     }
 
     private HashSet<LogicalNode> getValidatedNodes() {
-        Set<LogicalNode> validatedNodes = storage.getValidatedNodes();
+        List<LogicalNode> validatedNodes = storage.getValidatedNodes();
         Set<LogicalNode> logicalTopologyNodes = 
logicalTopology.getLogicalTopology().nodes();
 
         var result = new HashSet<LogicalNode>(capacity(validatedNodes.size() + 
logicalTopologyNodes.size()));
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
index 99c067f5e9..3543d6361c 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
@@ -18,17 +18,15 @@
 package org.apache.ignite.internal.cluster.management.raft;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
-import java.util.Set;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.cluster.management.ClusterState;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -103,12 +101,8 @@ class RaftStorageManager {
     /**
      * Returns a collection of nodes that passed the validation but have not 
yet joined the logical topology.
      */
-    Set<LogicalNode> getValidatedNodes() {
-        Cursor<LogicalNode> cursor = 
storage.getWithPrefix(VALIDATED_NODE_PREFIX, (k, v) -> fromBytes(v));
-
-        try (cursor) {
-            return cursor.stream().collect(toSet());
-        }
+    List<LogicalNode> getValidatedNodes() {
+        return storage.getWithPrefix(VALIDATED_NODE_PREFIX, (k, v) -> 
fromBytes(v));
     }
 
     /**
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
index 01fd438a13..3bafd5c00e 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
@@ -20,8 +20,11 @@ package org.apache.ignite.internal.cluster.management.raft;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -29,15 +32,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.Options;
@@ -64,6 +67,8 @@ public class RocksDbClusterStateStorage implements 
ClusterStateStorage {
     /** RockDB options. */
     private final Options options = new Options().setCreateIfMissing(true);
 
+    private final WriteOptions defaultWriteOptions = new 
WriteOptions().setDisableWAL(true);
+
     /** RocksDb instance. */
     @Nullable
     private volatile RocksDB db;
@@ -72,6 +77,11 @@ public class RocksDbClusterStateStorage implements 
ClusterStateStorage {
 
     private final Object snapshotRestoreLock = new Object();
 
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
     /**
      * Creates a new instance.
      *
@@ -87,18 +97,18 @@ public class RocksDbClusterStateStorage implements 
ClusterStateStorage {
 
     @Override
     public CompletableFuture<Void> start() {
-        try {
-            // Delete existing data, relying on log playback.
-            RocksDB.destroyDB(dbPath.toString(), options);
+        return inBusyLockAsync(busyLock, () -> {
+            try {
+                // Delete existing data, relying on log playback.
+                RocksDB.destroyDB(dbPath.toString(), options);
 
-            init();
-        } catch (RocksDBException e) {
-            return failedFuture(new CmgStorageException("Failed to start the 
storage", e));
-        } catch (CmgStorageException e) {
-            return failedFuture(e);
-        }
+                init();
 
-        return nullCompletedFuture();
+                return nullCompletedFuture();
+            } catch (RocksDBException e) {
+                return failedFuture(new CmgStorageException("Failed to start 
the storage", e));
+            }
+        });
     }
 
     private void init() {
@@ -117,133 +127,132 @@ public class RocksDbClusterStateStorage implements 
ClusterStateStorage {
 
     @Override
     public byte @Nullable [] get(byte[] key) {
-        try {
-            return db.get(key);
-        } catch (RocksDBException e) {
-            throw new CmgStorageException("Unable to get data from Rocks DB", 
e);
-        }
+        return inBusyLock(busyLock, () -> {
+            try {
+                return db.get(key);
+            } catch (RocksDBException e) {
+                throw new CmgStorageException("Unable to get data from Rocks 
DB", e);
+            }
+        });
     }
 
     @Override
     public void put(byte[] key, byte[] value) {
-        try {
-            db.put(key, value);
-        } catch (RocksDBException e) {
-            throw new CmgStorageException("Unable to put data into Rocks DB", 
e);
-        }
+        inBusyLock(busyLock, () -> {
+            try {
+                db.put(defaultWriteOptions, key, value);
+            } catch (RocksDBException e) {
+                throw new CmgStorageException("Unable to put data into Rocks 
DB", e);
+            }
+        });
     }
 
     @Override
     public void replaceAll(byte[] prefix, byte[] key, byte[] value) {
-        try (
-                var batch = new WriteBatch();
-                var options = new WriteOptions();
-        ) {
-            byte[] endKey = RocksUtils.incrementPrefix(prefix);
+        inBusyLock(busyLock, () -> {
+            try (var batch = new WriteBatch()) {
+                byte[] endKey = RocksUtils.incrementPrefix(prefix);
 
-            assert endKey != null : Arrays.toString(prefix);
+                assert endKey != null : Arrays.toString(prefix);
 
-            batch.deleteRange(prefix, endKey);
+                batch.deleteRange(prefix, endKey);
 
-            batch.put(key, value);
+                batch.put(key, value);
 
-            db.write(options, batch);
-        } catch (RocksDBException e) {
-            throw new CmgStorageException("Unable to replace data in Rocks 
DB", e);
-        }
+                db.write(defaultWriteOptions, batch);
+            } catch (RocksDBException e) {
+                throw new CmgStorageException("Unable to replace data in Rocks 
DB", e);
+            }
+        });
     }
 
     @Override
     public void remove(byte[] key) {
-        try {
-            db.delete(key);
-        } catch (RocksDBException e) {
-            throw new CmgStorageException("Unable to remove data from Rocks 
DB", e);
-        }
+        inBusyLock(busyLock, () -> {
+            try {
+                db.delete(defaultWriteOptions, key);
+            } catch (RocksDBException e) {
+                throw new CmgStorageException("Unable to remove data from 
Rocks DB", e);
+            }
+        });
     }
 
     @Override
     public void removeAll(Collection<byte[]> keys) {
-        try (
-                var batch = new WriteBatch();
-                var options = new WriteOptions();
-        ) {
-            for (byte[] key : keys) {
-                batch.delete(key);
+        inBusyLock(busyLock, () -> {
+            try (var batch = new WriteBatch()) {
+                for (byte[] key : keys) {
+                    batch.delete(key);
+                }
+
+                db.write(defaultWriteOptions, batch);
+            } catch (RocksDBException e) {
+                throw new CmgStorageException("Unable to remove data from 
Rocks DB", e);
             }
-
-            db.write(options, batch);
-        } catch (RocksDBException e) {
-            throw new CmgStorageException("Unable to remove data from Rocks 
DB", e);
-        }
+        });
     }
 
     @Override
-    public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], 
byte[], T> entryTransformer) {
-        byte[] upperBound = RocksUtils.incrementPrefix(prefix);
-
-        Slice upperBoundSlice = upperBound == null ? null : new 
Slice(upperBound);
-
-        ReadOptions readOptions = new 
ReadOptions().setIterateUpperBound(upperBoundSlice);
-
-        RocksIterator it = db.newIterator(readOptions);
-
-        it.seek(prefix);
-
-        return new RocksIteratorAdapter<>(it) {
-            @Override
-            protected T decodeEntry(byte[] key, byte[] value) {
-                return entryTransformer.apply(key, value);
-            }
-
-            @Override
-            public void close() {
-                super.close();
-
-                RocksUtils.closeAll(readOptions, upperBoundSlice);
+    public <T> List<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[], 
T> entryTransformer) {
+        return inBusyLock(busyLock, () -> {
+            byte[] upperBound = RocksUtils.incrementPrefix(prefix);
+
+            try (
+                    Slice upperBoundSlice = upperBound == null ? null : new 
Slice(upperBound);
+                    ReadOptions readOptions = new 
ReadOptions().setIterateUpperBound(upperBoundSlice);
+                    RocksIterator it = db.newIterator(readOptions)
+            ) {
+                it.seek(prefix);
+
+                var result = new ArrayList<T>();
+
+                try {
+                    RocksUtils.forEach(it, (key, value) -> 
result.add(entryTransformer.apply(key, value)));
+                } catch (RocksDBException e) {
+                    throw new CmgStorageException("Unable to get data by 
prefix", e);
+                }
+
+                return result;
             }
-        };
+        });
     }
 
     @Override
     public CompletableFuture<Void> snapshot(Path snapshotPath) {
-        return snapshotManager.createSnapshot(snapshotPath);
+        return inBusyLockAsync(busyLock, () -> 
snapshotManager.createSnapshot(snapshotPath));
     }
 
     @Override
     public void restoreSnapshot(Path snapshotPath) {
-        synchronized (snapshotRestoreLock) {
-            destroyDb();
+        inBusyLock(busyLock, () -> {
+            synchronized (snapshotRestoreLock) {
+                db.close();
 
-            init();
+                db = null;
 
-            snapshotManager.restoreSnapshot(snapshotPath);
-        }
-    }
+                try {
+                    RocksDB.destroyDB(dbPath.toString(), options);
+                } catch (RocksDBException e) {
+                    throw new CmgStorageException("Unable to stop the RocksDB 
instance", e);
+                }
 
-    @Override
-    public void destroy() {
-        IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, 
TimeUnit.SECONDS);
-
-        destroyDb();
+                init();
 
-        options.close();
+                snapshotManager.restoreSnapshot(snapshotPath);
+            }
+        });
     }
 
     @Override
     public void stop() {
-        IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, 
TimeUnit.SECONDS);
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
 
-        RocksUtils.closeAll(db, options);
-    }
+        busyLock.block();
 
-    private void destroyDb() {
-        db.close();
+        IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, 
TimeUnit.SECONDS);
 
-        try {
-            RocksDB.destroyDB(dbPath.toString(), options);
-        } catch (RocksDBException e) {
-            throw new CmgStorageException("Unable to stop the RocksDB 
instance", e);
-        }
+        RocksUtils.closeAll(db, options, defaultWriteOptions);
     }
 }
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
index 7d28ee8d60..6ac92c358d 100644
--- 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.cluster.management.raft;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -29,14 +31,13 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
-import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.util.Cursor;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -216,11 +217,7 @@ public abstract class AbstractClusterStateStorageTest 
extends IgniteAbstractTest
                 "key4".getBytes(UTF_8) // does not exist in storage
         ));
 
-        Cursor<String> cursor = storage.getWithPrefix("key".getBytes(UTF_8), 
(k, v) -> new String(v, UTF_8));
-
-        try (cursor) {
-            assertThat(cursor.stream().collect(toList()), contains("value3"));
-        }
+        assertThat(storage.getWithPrefix("key".getBytes(UTF_8), (k, v) -> new 
String(v, UTF_8)), contains("value3"));
     }
 
     /**
@@ -232,11 +229,7 @@ public abstract class AbstractClusterStateStorageTest 
extends IgniteAbstractTest
         storage.put("key2".getBytes(UTF_8), "value2".getBytes(UTF_8));
         storage.put("foo".getBytes(UTF_8), "value3".getBytes(UTF_8));
 
-        Cursor<String> cursor = storage.getWithPrefix("ke".getBytes(UTF_8), 
(k, v) -> new String(v, UTF_8));
-
-        try (cursor) {
-            assertThat(cursor.stream().collect(toList()), 
containsInAnyOrder("value1", "value2"));
-        }
+        assertThat(storage.getWithPrefix("ke".getBytes(UTF_8), (k, v) -> new 
String(v, UTF_8)), containsInAnyOrder("value1", "value2"));
     }
 
     /**
@@ -250,11 +243,7 @@ public abstract class AbstractClusterStateStorageTest 
extends IgniteAbstractTest
         storage.put(key1, "value1".getBytes(UTF_8));
         storage.put(key2, "value2".getBytes(UTF_8));
 
-        Cursor<String> cursor = storage.getWithPrefix(key1, (k, v) -> new 
String(v, UTF_8));
-
-        try (cursor) {
-            assertThat(cursor.stream().collect(toList()), 
containsInAnyOrder("value1"));
-        }
+        assertThat(storage.getWithPrefix(key1, (k, v) -> new String(v, 
UTF_8)), containsInAnyOrder("value1"));
     }
 
     /**
@@ -265,44 +254,14 @@ public abstract class AbstractClusterStateStorageTest 
extends IgniteAbstractTest
         storage.put("key1".getBytes(UTF_8), "value1".getBytes(UTF_8));
         storage.put("key2".getBytes(UTF_8), "value2".getBytes(UTF_8));
 
-        Cursor<String> cursor = storage.getWithPrefix("foo".getBytes(UTF_8), 
(k, v) -> new String(v, UTF_8));
-
-        try (cursor) {
-            assertThat(cursor.stream().collect(toList()), is(empty()));
-        }
-    }
-
-    /**
-     * Tests the {@link ClusterStateStorage#destroy()} method.
-     */
-    @Test
-    void testDestroy(TestInfo testInfo) {
-        byte[] key = "key".getBytes(UTF_8);
-
-        byte[] value = "value".getBytes(UTF_8);
-
-        storage.put(key, value);
-
-        assertThat(storage.get(key), is(equalTo(value)));
-
-        storage.destroy();
-
-        storage = createStorage(testNodeName(testInfo, 0));
-
-        storage.start();
-
-        assertThat(storage.get(key), is(nullValue()));
-
-        storage.put(key, value);
-
-        assertThat(storage.get(key), is(equalTo(value)));
+        assertThat(storage.getWithPrefix("foo".getBytes(UTF_8), (k, v) -> new 
String(v, UTF_8)), is(empty()));
     }
 
     /**
      * Tests creating and restoring snapshots.
      */
     @Test
-    void testSnapshot(TestInfo testInfo) throws IOException {
+    void testSnapshot(TestInfo testInfo) throws Exception {
         Path snapshotDir = workDir.resolve("snapshot");
 
         Files.createDirectory(snapshotDir);
@@ -318,9 +277,9 @@ public abstract class AbstractClusterStateStorageTest 
extends IgniteAbstractTest
 
         assertThat(storage.snapshot(snapshotDir), willCompleteSuccessfully());
 
-        storage.destroy();
+        storage.stop();
 
-        storage = createStorage(testNodeName(testInfo, 0));
+        storage = createStorage(testNodeName(testInfo, 1));
 
         storage.start();
 
@@ -371,11 +330,26 @@ public abstract class AbstractClusterStateStorageTest 
extends IgniteAbstractTest
         assertThat(storage.get(keyAddedAfterSnapshotStart), is(nullValue()));
     }
 
+    @Test
+    void throwsNodeStoppingException() throws Exception {
+        storage.stop();
+
+        assertThrowsWithCause(() -> storage.get(BYTE_EMPTY_ARRAY), 
NodeStoppingException.class);
+        assertThrowsWithCause(() -> storage.put(BYTE_EMPTY_ARRAY, 
BYTE_EMPTY_ARRAY), NodeStoppingException.class);
+        assertThrowsWithCause(() -> storage.remove(BYTE_EMPTY_ARRAY), 
NodeStoppingException.class);
+        assertThrowsWithCause(() -> 
storage.removeAll(List.of(BYTE_EMPTY_ARRAY)), NodeStoppingException.class);
+        assertThrowsWithCause(() -> storage.replaceAll(BYTE_EMPTY_ARRAY, 
BYTE_EMPTY_ARRAY, BYTE_EMPTY_ARRAY), NodeStoppingException.class);
+        assertThrowsWithCause(() -> storage.getWithPrefix(BYTE_EMPTY_ARRAY, 
(k, v) -> null), NodeStoppingException.class);
+        assertThrowsWithCause(() -> storage.restoreSnapshot(workDir), 
NodeStoppingException.class);
+
+        assertThat(storage.snapshot(workDir), 
willThrow(NodeStoppingException.class));
+    }
+
     private void putKeyValue(int n) {
         storage.put(key(n), ("value" + n).getBytes(UTF_8));
     }
 
-    private byte[] key(int n) {
+    private static byte[] key(int n) {
         return ("key" + n).getBytes(UTF_8);
     }
 }
diff --git 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
index 2463e761af..2fbd9647dd 100644
--- 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
+++ 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
@@ -17,9 +17,10 @@
 
 package org.apache.ignite.internal.cluster.management.raft;
 
-import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
 
 import java.io.ObjectInputStream;
@@ -33,12 +34,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiFunction;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -51,152 +53,131 @@ public class TestClusterStateStorage implements 
ClusterStateStorage {
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private volatile boolean isStarted = false;
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
 
     @Override
     public CompletableFuture<Void> start() {
-        isStarted = true;
-
         return nullCompletedFuture();
     }
 
     @Override
     public byte @Nullable [] get(byte[] key) {
-        lock.readLock().lock();
-
-        try {
-            return map.get(new ByteArray(key));
-        } finally {
-            lock.readLock().unlock();
-        }
+        return inBusyReadLock(() -> map.get(new ByteArray(key)));
     }
 
     @Override
     public void put(byte[] key, byte[] value) {
-        lock.writeLock().lock();
-
-        try {
-            map.put(new ByteArray(key), value);
-        } finally {
-            lock.writeLock().unlock();
-        }
+        inBusyWriteLock(() -> map.put(new ByteArray(key), value));
     }
 
     @Override
     public void replaceAll(byte[] prefix, byte[] key, byte[] value) {
-        lock.writeLock().lock();
-
-        try {
+        inBusyWriteLock(() -> {
             map.entrySet().removeIf(e -> startsWith(e.getKey().bytes(), 
prefix));
 
             map.put(new ByteArray(key), value);
-        } finally {
-            lock.writeLock().unlock();
-        }
+        });
     }
 
     @Override
     public void remove(byte[] key) {
-        lock.writeLock().lock();
-
-        try {
-            map.remove(new ByteArray(key));
-        } finally {
-            lock.writeLock().unlock();
-        }
+        inBusyWriteLock(() -> map.remove(new ByteArray(key)));
     }
 
     @Override
     public void removeAll(Collection<byte[]> keys) {
-        lock.writeLock().lock();
-
-        try {
-            for (byte[] key : keys) {
-                remove(key);
-            }
-        } finally {
-            lock.writeLock().unlock();
-        }
+        inBusyWriteLock(() -> keys.forEach(this::remove));
     }
 
     @Override
-    public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], 
byte[], T> entryTransformer) {
-        lock.readLock().lock();
-
-        try {
+    public <T> List<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[], 
T> entryTransformer) {
+        return inBusyReadLock(() -> {
             return map.entrySet().stream()
                     .filter(e -> startsWith(e.getKey().bytes(), prefix))
                     .map(e -> entryTransformer.apply(e.getKey().bytes(), 
e.getValue()))
-                    .collect(collectingAndThen(toList(), data -> 
Cursor.fromBareIterator(data.iterator())));
-        } finally {
-            lock.readLock().unlock();
-        }
+                    .collect(toList());
+        });
     }
 
     @Override
     public CompletableFuture<Void> snapshot(Path snapshotPath) {
-        List<byte[]> keys;
-        List<byte[]> values;
-
-        lock.readLock().lock();
-
-        try {
-            keys = new ArrayList<>(map.size());
-            values = new ArrayList<>(map.size());
+        return inBusyLockAsync(busyLock, () -> {
+            lock.readLock().lock();
 
-            map.forEach((k, v) -> {
-                keys.add(k.bytes());
-                values.add(v);
-            });
-        } finally {
-            lock.readLock().unlock();
-        }
-
-        return CompletableFuture.runAsync(() -> {
-            try (var out = new 
ObjectOutputStream(Files.newOutputStream(snapshotPath.resolve(SNAPSHOT_FILE)))) 
{
-                out.writeObject(keys);
-                out.writeObject(values);
-            } catch (Exception e) {
-                throw new IgniteInternalException(e);
+            try {
+                List<byte[]> keys = new ArrayList<>(map.size());
+                List<byte[]> values = new ArrayList<>(map.size());
+
+                map.forEach((k, v) -> {
+                    keys.add(k.bytes());
+                    values.add(v);
+                });
+
+                return CompletableFuture.runAsync(() -> {
+                    try (var out = new 
ObjectOutputStream(Files.newOutputStream(snapshotPath.resolve(SNAPSHOT_FILE)))) 
{
+                        out.writeObject(keys);
+                        out.writeObject(values);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } finally {
+                lock.readLock().unlock();
             }
         });
     }
 
     @Override
     public void restoreSnapshot(Path snapshotPath) {
-        try (var in = new 
ObjectInputStream(Files.newInputStream(snapshotPath.resolve(SNAPSHOT_FILE)))) {
-            var keys = (List<byte[]>) in.readObject();
-            var values = (List<byte[]>) in.readObject();
-
-            lock.writeLock().lock();
+        inBusyWriteLock(() -> {
+            try (var in = new 
ObjectInputStream(Files.newInputStream(snapshotPath.resolve(SNAPSHOT_FILE)))) {
+                var keys = (List<byte[]>) in.readObject();
+                var values = (List<byte[]>) in.readObject();
 
-            try {
                 map.clear();
 
                 for (int i = 0; i < keys.size(); i++) {
                     map.put(new ByteArray(keys.get(i)), values.get(i));
                 }
-            } finally {
-                lock.writeLock().unlock();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
-        } catch (Exception e) {
-            throw new IgniteInternalException(e);
-        }
+        });
     }
 
     @Override
-    public void destroy() {
-        lock.writeLock().lock();
-
-        try {
-            map.clear();
-        } finally {
-            lock.writeLock().unlock();
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
         }
+
+        busyLock.block();
     }
 
-    @Override
-    public void stop() {
-        isStarted = false;
+    private <T> T inBusyReadLock(Supplier<T> action) {
+        return inBusyLock(busyLock, () -> {
+            lock.readLock().lock();
+
+            try {
+                return action.get();
+            } finally {
+                lock.readLock().unlock();
+            }
+        });
+    }
+
+    private void inBusyWriteLock(Runnable action) {
+        inBusyLock(busyLock, () -> {
+            lock.writeLock().lock();
+
+            try {
+                action.run();
+            } finally {
+                lock.writeLock().unlock();
+            }
+        });
     }
 }


Reply via email to