This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4871350087 IGNITE-22105 Add busy lock to RocksDbClusterStateStorage
(#3659)
4871350087 is described below
commit 48713500874a9b8115f1bece6607abc514cf692a
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Apr 25 16:58:27 2024 +0300
IGNITE-22105 Add busy lock to RocksDbClusterStateStorage (#3659)
---
.../management/ClusterManagementGroupManager.java | 93 ++++++----
.../management/raft/ClusterStateStorage.java | 13 +-
.../management/raft/CmgRaftGroupListener.java | 3 +-
.../management/raft/RaftStorageManager.java | 12 +-
.../raft/RocksDbClusterStateStorage.java | 201 +++++++++++----------
.../raft/AbstractClusterStateStorageTest.java | 80 +++-----
.../management/raft/TestClusterStateStorage.java | 167 ++++++++---------
7 files changed, 269 insertions(+), 300 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 0a5fae1cca..756d27195d 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -24,7 +24,9 @@ import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toUnmodifiableSet;
import static
org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.util.Collection;
import java.util.List;
@@ -350,42 +352,56 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
* </ol>
*/
private void onElectedAsLeader(long term) {
- LOG.info("CMG leader has been elected, executing onLeaderElected
callback");
-
- // The cluster state is broadcast via the messaging service; hence,
the future must be completed here on the leader node.
- // TODO: This needs to be reworked following the implementation of
IGNITE-18275.
- raftServiceAfterJoin()
- .thenCompose(CmgRaftService::readClusterState)
- .thenAccept(state ->
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
-
- raftServiceAfterJoin()
- .thenCompose(this::updateLogicalTopology)
- .thenCompose(service ->
service.updateLearners(term).thenApply(unused -> service))
- .thenAccept(service -> {
- // Register a listener to send ClusterState messages to
new nodes.
- TopologyService topologyService =
clusterService.topologyService();
-
- // TODO: remove listeners if leadership is lost, see
https://issues.apache.org/jira/browse/IGNITE-16842
-
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
-
- // Send the ClusterStateMessage to all members of the
physical topology. We do not wait for the send operation
- // because being unable to send ClusterState messages
should not fail the CMG service startup.
- // TODO: IGNITE-18275 - use RAFT replication instead of
message sending
- ClusterNode thisNode = topologyService.localMember();
-
- Collection<ClusterNode> otherNodes =
topologyService.allMembers().stream()
- .filter(node -> !thisNode.equals(node))
- .collect(toList());
+ if (!busyLock.enterBusy()) {
+ LOG.info("Skipping onLeaderElected callback, because the node is
stopping");
- sendClusterState(service, otherNodes);
- })
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.warn("Error when executing onLeaderElected
callback", e);
- } else {
- LOG.info("onLeaderElected callback executed
successfully");
- }
- });
+ return;
+ }
+
+ try {
+ LOG.info("CMG leader has been elected, executing onLeaderElected
callback");
+
+ // The cluster state is broadcast via the messaging service;
hence, the future must be completed here on the leader node.
+ // TODO: This needs to be reworked following the implementation of
IGNITE-18275.
+ raftServiceAfterJoin().thenAccept(service -> inBusyLock(busyLock,
() -> {
+ service.readClusterState()
+ .thenAccept(state ->
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
+
+ updateLogicalTopology(service)
+ .thenCompose(v -> inBusyLock(busyLock, () ->
service.updateLearners(term)))
+ .thenAccept(v -> inBusyLock(busyLock, () -> {
+ // Register a listener to send ClusterState
messages to new nodes.
+ TopologyService topologyService =
clusterService.topologyService();
+
+ // TODO: remove listeners if leadership is lost,
see https://issues.apache.org/jira/browse/IGNITE-16842
+
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+ // Send the ClusterStateMessage to all members of
the physical topology. We do not wait for the send operation
+ // because being unable to send ClusterState
messages should not fail the CMG service startup.
+ // TODO: IGNITE-18275 - use RAFT replication
instead of message sending
+ ClusterNode thisNode =
topologyService.localMember();
+
+ Collection<ClusterNode> otherNodes =
topologyService.allMembers().stream()
+ .filter(node -> !thisNode.equals(node))
+ .collect(toList());
+
+ sendClusterState(service, otherNodes);
+ }))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ if (unwrapCause(e) instanceof
NodeStoppingException) {
+ LOG.info("Unable to execute
onLeaderElected callback, because the node is stopping", e);
+ } else {
+ LOG.error("Error when executing
onLeaderElected callback", e);
+ }
+ } else {
+ LOG.info("onLeaderElected callback executed
successfully");
+ }
+ });
+ }));
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -393,9 +409,9 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
* physical topology during the election. Newly appeared nodes will be
added automatically after the new leader broadcasts the current
* cluster state.
*/
- private CompletableFuture<CmgRaftService>
updateLogicalTopology(CmgRaftService service) {
+ private CompletableFuture<Void> updateLogicalTopology(CmgRaftService
service) {
return service.logicalTopology()
- .thenCompose(logicalTopology -> {
+ .thenCompose(logicalTopology -> inBusyLock(busyLock, () -> {
Set<String> physicalTopologyIds =
clusterService.topologyService().allMembers()
.stream()
.map(ClusterNode::id)
@@ -407,8 +423,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
// TODO: IGNITE-18681 - respect removal timeout.
return nodesToRemove.isEmpty() ? nullCompletedFuture() :
service.removeFromCluster(nodesToRemove);
- })
- .thenApply(v -> service);
+ }));
}
private void handleCancelInit(CancelInitMessage msg) {
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorage.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorage.java
index e8a19fdf6e..f1d94b290f 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorage.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorage.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.cluster.management.raft;
import java.nio.file.Path;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
/**
@@ -69,14 +69,14 @@ public interface ClusterStateStorage extends
IgniteComponent {
void removeAll(Collection<byte[]> keys);
/**
- * Creates a cursor over a range of keys, starting with the given prefix.
+ * Creates a list containing a range of keys, starting with the given
prefix.
*
* @param prefix Key prefix.
* @param entryTransformer Entry transformation function.
* @param <T> Type of converted entry.
- * @return Cursor over a range of existing keys.
+ * @return List containing a range of existing keys.
*/
- <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[], T>
entryTransformer);
+ <T> List<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[], T>
entryTransformer);
/**
* Creates a snapshot of the storage's current state in the specified
directory.
@@ -92,9 +92,4 @@ public interface ClusterStateStorage extends IgniteComponent {
* @param snapshotPath Path to the snapshot's directory.
*/
void restoreSnapshot(Path snapshotPath);
-
- /**
- * Removes all data from the storage and frees all resources.
- */
- void destroy();
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index dbc24580bc..58495da214 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -25,6 +25,7 @@ import java.nio.file.Path;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
@@ -105,7 +106,7 @@ public class CmgRaftGroupListener implements
RaftGroupListener {
}
private HashSet<LogicalNode> getValidatedNodes() {
- Set<LogicalNode> validatedNodes = storage.getValidatedNodes();
+ List<LogicalNode> validatedNodes = storage.getValidatedNodes();
Set<LogicalNode> logicalTopologyNodes =
logicalTopology.getLogicalTopology().nodes();
var result = new HashSet<LogicalNode>(capacity(validatedNodes.size() +
logicalTopologyNodes.size()));
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
index 99c067f5e9..3543d6361c 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
@@ -18,17 +18,15 @@
package org.apache.ignite.internal.cluster.management.raft;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import java.nio.ByteBuffer;
import java.nio.file.Path;
-import java.util.Set;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
/**
@@ -103,12 +101,8 @@ class RaftStorageManager {
/**
* Returns a collection of nodes that passed the validation but have not
yet joined the logical topology.
*/
- Set<LogicalNode> getValidatedNodes() {
- Cursor<LogicalNode> cursor =
storage.getWithPrefix(VALIDATED_NODE_PREFIX, (k, v) -> fromBytes(v));
-
- try (cursor) {
- return cursor.stream().collect(toSet());
- }
+ List<LogicalNode> getValidatedNodes() {
+ return storage.getWithPrefix(VALIDATED_NODE_PREFIX, (k, v) ->
fromBytes(v));
}
/**
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
index 01fd438a13..3bafd5c00e 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
@@ -20,8 +20,11 @@ package org.apache.ignite.internal.cluster.management.raft;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -29,15 +32,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.Options;
@@ -64,6 +67,8 @@ public class RocksDbClusterStateStorage implements
ClusterStateStorage {
/** RockDB options. */
private final Options options = new Options().setCreateIfMissing(true);
+ private final WriteOptions defaultWriteOptions = new
WriteOptions().setDisableWAL(true);
+
/** RocksDb instance. */
@Nullable
private volatile RocksDB db;
@@ -72,6 +77,11 @@ public class RocksDbClusterStateStorage implements
ClusterStateStorage {
private final Object snapshotRestoreLock = new Object();
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping of the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
/**
* Creates a new instance.
*
@@ -87,18 +97,18 @@ public class RocksDbClusterStateStorage implements
ClusterStateStorage {
@Override
public CompletableFuture<Void> start() {
- try {
- // Delete existing data, relying on log playback.
- RocksDB.destroyDB(dbPath.toString(), options);
+ return inBusyLockAsync(busyLock, () -> {
+ try {
+ // Delete existing data, relying on log playback.
+ RocksDB.destroyDB(dbPath.toString(), options);
- init();
- } catch (RocksDBException e) {
- return failedFuture(new CmgStorageException("Failed to start the
storage", e));
- } catch (CmgStorageException e) {
- return failedFuture(e);
- }
+ init();
- return nullCompletedFuture();
+ return nullCompletedFuture();
+ } catch (RocksDBException e) {
+ return failedFuture(new CmgStorageException("Failed to start
the storage", e));
+ }
+ });
}
private void init() {
@@ -117,133 +127,132 @@ public class RocksDbClusterStateStorage implements
ClusterStateStorage {
@Override
public byte @Nullable [] get(byte[] key) {
- try {
- return db.get(key);
- } catch (RocksDBException e) {
- throw new CmgStorageException("Unable to get data from Rocks DB",
e);
- }
+ return inBusyLock(busyLock, () -> {
+ try {
+ return db.get(key);
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to get data from Rocks
DB", e);
+ }
+ });
}
@Override
public void put(byte[] key, byte[] value) {
- try {
- db.put(key, value);
- } catch (RocksDBException e) {
- throw new CmgStorageException("Unable to put data into Rocks DB",
e);
- }
+ inBusyLock(busyLock, () -> {
+ try {
+ db.put(defaultWriteOptions, key, value);
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to put data into Rocks
DB", e);
+ }
+ });
}
@Override
public void replaceAll(byte[] prefix, byte[] key, byte[] value) {
- try (
- var batch = new WriteBatch();
- var options = new WriteOptions();
- ) {
- byte[] endKey = RocksUtils.incrementPrefix(prefix);
+ inBusyLock(busyLock, () -> {
+ try (var batch = new WriteBatch()) {
+ byte[] endKey = RocksUtils.incrementPrefix(prefix);
- assert endKey != null : Arrays.toString(prefix);
+ assert endKey != null : Arrays.toString(prefix);
- batch.deleteRange(prefix, endKey);
+ batch.deleteRange(prefix, endKey);
- batch.put(key, value);
+ batch.put(key, value);
- db.write(options, batch);
- } catch (RocksDBException e) {
- throw new CmgStorageException("Unable to replace data in Rocks
DB", e);
- }
+ db.write(defaultWriteOptions, batch);
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to replace data in Rocks
DB", e);
+ }
+ });
}
@Override
public void remove(byte[] key) {
- try {
- db.delete(key);
- } catch (RocksDBException e) {
- throw new CmgStorageException("Unable to remove data from Rocks
DB", e);
- }
+ inBusyLock(busyLock, () -> {
+ try {
+ db.delete(defaultWriteOptions, key);
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to remove data from
Rocks DB", e);
+ }
+ });
}
@Override
public void removeAll(Collection<byte[]> keys) {
- try (
- var batch = new WriteBatch();
- var options = new WriteOptions();
- ) {
- for (byte[] key : keys) {
- batch.delete(key);
+ inBusyLock(busyLock, () -> {
+ try (var batch = new WriteBatch()) {
+ for (byte[] key : keys) {
+ batch.delete(key);
+ }
+
+ db.write(defaultWriteOptions, batch);
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to remove data from
Rocks DB", e);
}
-
- db.write(options, batch);
- } catch (RocksDBException e) {
- throw new CmgStorageException("Unable to remove data from Rocks
DB", e);
- }
+ });
}
@Override
- public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[],
byte[], T> entryTransformer) {
- byte[] upperBound = RocksUtils.incrementPrefix(prefix);
-
- Slice upperBoundSlice = upperBound == null ? null : new
Slice(upperBound);
-
- ReadOptions readOptions = new
ReadOptions().setIterateUpperBound(upperBoundSlice);
-
- RocksIterator it = db.newIterator(readOptions);
-
- it.seek(prefix);
-
- return new RocksIteratorAdapter<>(it) {
- @Override
- protected T decodeEntry(byte[] key, byte[] value) {
- return entryTransformer.apply(key, value);
- }
-
- @Override
- public void close() {
- super.close();
-
- RocksUtils.closeAll(readOptions, upperBoundSlice);
+ public <T> List<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[],
T> entryTransformer) {
+ return inBusyLock(busyLock, () -> {
+ byte[] upperBound = RocksUtils.incrementPrefix(prefix);
+
+ try (
+ Slice upperBoundSlice = upperBound == null ? null : new
Slice(upperBound);
+ ReadOptions readOptions = new
ReadOptions().setIterateUpperBound(upperBoundSlice);
+ RocksIterator it = db.newIterator(readOptions)
+ ) {
+ it.seek(prefix);
+
+ var result = new ArrayList<T>();
+
+ try {
+ RocksUtils.forEach(it, (key, value) ->
result.add(entryTransformer.apply(key, value)));
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to get data by
prefix", e);
+ }
+
+ return result;
}
- };
+ });
}
@Override
public CompletableFuture<Void> snapshot(Path snapshotPath) {
- return snapshotManager.createSnapshot(snapshotPath);
+ return inBusyLockAsync(busyLock, () ->
snapshotManager.createSnapshot(snapshotPath));
}
@Override
public void restoreSnapshot(Path snapshotPath) {
- synchronized (snapshotRestoreLock) {
- destroyDb();
+ inBusyLock(busyLock, () -> {
+ synchronized (snapshotRestoreLock) {
+ db.close();
- init();
+ db = null;
- snapshotManager.restoreSnapshot(snapshotPath);
- }
- }
+ try {
+ RocksDB.destroyDB(dbPath.toString(), options);
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to stop the RocksDB
instance", e);
+ }
- @Override
- public void destroy() {
- IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10,
TimeUnit.SECONDS);
-
- destroyDb();
+ init();
- options.close();
+ snapshotManager.restoreSnapshot(snapshotPath);
+ }
+ });
}
@Override
public void stop() {
- IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10,
TimeUnit.SECONDS);
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
- RocksUtils.closeAll(db, options);
- }
+ busyLock.block();
- private void destroyDb() {
- db.close();
+ IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10,
TimeUnit.SECONDS);
- try {
- RocksDB.destroyDB(dbPath.toString(), options);
- } catch (RocksDBException e) {
- throw new CmgStorageException("Unable to stop the RocksDB
instance", e);
- }
+ RocksUtils.closeAll(db, options, defaultWriteOptions);
}
}
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
index 7d28ee8d60..6ac92c358d 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
@@ -18,9 +18,11 @@
package org.apache.ignite.internal.cluster.management.raft;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -29,14 +31,13 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
-import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.util.Cursor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -216,11 +217,7 @@ public abstract class AbstractClusterStateStorageTest
extends IgniteAbstractTest
"key4".getBytes(UTF_8) // does not exist in storage
));
- Cursor<String> cursor = storage.getWithPrefix("key".getBytes(UTF_8),
(k, v) -> new String(v, UTF_8));
-
- try (cursor) {
- assertThat(cursor.stream().collect(toList()), contains("value3"));
- }
+ assertThat(storage.getWithPrefix("key".getBytes(UTF_8), (k, v) -> new
String(v, UTF_8)), contains("value3"));
}
/**
@@ -232,11 +229,7 @@ public abstract class AbstractClusterStateStorageTest
extends IgniteAbstractTest
storage.put("key2".getBytes(UTF_8), "value2".getBytes(UTF_8));
storage.put("foo".getBytes(UTF_8), "value3".getBytes(UTF_8));
- Cursor<String> cursor = storage.getWithPrefix("ke".getBytes(UTF_8),
(k, v) -> new String(v, UTF_8));
-
- try (cursor) {
- assertThat(cursor.stream().collect(toList()),
containsInAnyOrder("value1", "value2"));
- }
+ assertThat(storage.getWithPrefix("ke".getBytes(UTF_8), (k, v) -> new
String(v, UTF_8)), containsInAnyOrder("value1", "value2"));
}
/**
@@ -250,11 +243,7 @@ public abstract class AbstractClusterStateStorageTest
extends IgniteAbstractTest
storage.put(key1, "value1".getBytes(UTF_8));
storage.put(key2, "value2".getBytes(UTF_8));
- Cursor<String> cursor = storage.getWithPrefix(key1, (k, v) -> new
String(v, UTF_8));
-
- try (cursor) {
- assertThat(cursor.stream().collect(toList()),
containsInAnyOrder("value1"));
- }
+ assertThat(storage.getWithPrefix(key1, (k, v) -> new String(v,
UTF_8)), containsInAnyOrder("value1"));
}
/**
@@ -265,44 +254,14 @@ public abstract class AbstractClusterStateStorageTest
extends IgniteAbstractTest
storage.put("key1".getBytes(UTF_8), "value1".getBytes(UTF_8));
storage.put("key2".getBytes(UTF_8), "value2".getBytes(UTF_8));
- Cursor<String> cursor = storage.getWithPrefix("foo".getBytes(UTF_8),
(k, v) -> new String(v, UTF_8));
-
- try (cursor) {
- assertThat(cursor.stream().collect(toList()), is(empty()));
- }
- }
-
- /**
- * Tests the {@link ClusterStateStorage#destroy()} method.
- */
- @Test
- void testDestroy(TestInfo testInfo) {
- byte[] key = "key".getBytes(UTF_8);
-
- byte[] value = "value".getBytes(UTF_8);
-
- storage.put(key, value);
-
- assertThat(storage.get(key), is(equalTo(value)));
-
- storage.destroy();
-
- storage = createStorage(testNodeName(testInfo, 0));
-
- storage.start();
-
- assertThat(storage.get(key), is(nullValue()));
-
- storage.put(key, value);
-
- assertThat(storage.get(key), is(equalTo(value)));
+ assertThat(storage.getWithPrefix("foo".getBytes(UTF_8), (k, v) -> new
String(v, UTF_8)), is(empty()));
}
/**
* Tests creating and restoring snapshots.
*/
@Test
- void testSnapshot(TestInfo testInfo) throws IOException {
+ void testSnapshot(TestInfo testInfo) throws Exception {
Path snapshotDir = workDir.resolve("snapshot");
Files.createDirectory(snapshotDir);
@@ -318,9 +277,9 @@ public abstract class AbstractClusterStateStorageTest
extends IgniteAbstractTest
assertThat(storage.snapshot(snapshotDir), willCompleteSuccessfully());
- storage.destroy();
+ storage.stop();
- storage = createStorage(testNodeName(testInfo, 0));
+ storage = createStorage(testNodeName(testInfo, 1));
storage.start();
@@ -371,11 +330,26 @@ public abstract class AbstractClusterStateStorageTest
extends IgniteAbstractTest
assertThat(storage.get(keyAddedAfterSnapshotStart), is(nullValue()));
}
+ @Test
+ void throwsNodeStoppingException() throws Exception {
+ storage.stop();
+
+ assertThrowsWithCause(() -> storage.get(BYTE_EMPTY_ARRAY),
NodeStoppingException.class);
+ assertThrowsWithCause(() -> storage.put(BYTE_EMPTY_ARRAY,
BYTE_EMPTY_ARRAY), NodeStoppingException.class);
+ assertThrowsWithCause(() -> storage.remove(BYTE_EMPTY_ARRAY),
NodeStoppingException.class);
+ assertThrowsWithCause(() ->
storage.removeAll(List.of(BYTE_EMPTY_ARRAY)), NodeStoppingException.class);
+ assertThrowsWithCause(() -> storage.replaceAll(BYTE_EMPTY_ARRAY,
BYTE_EMPTY_ARRAY, BYTE_EMPTY_ARRAY), NodeStoppingException.class);
+ assertThrowsWithCause(() -> storage.getWithPrefix(BYTE_EMPTY_ARRAY,
(k, v) -> null), NodeStoppingException.class);
+ assertThrowsWithCause(() -> storage.restoreSnapshot(workDir),
NodeStoppingException.class);
+
+ assertThat(storage.snapshot(workDir),
willThrow(NodeStoppingException.class));
+ }
+
private void putKeyValue(int n) {
storage.put(key(n), ("value" + n).getBytes(UTF_8));
}
- private byte[] key(int n) {
+ private static byte[] key(int n) {
return ("key" + n).getBytes(UTF_8);
}
}
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
index 2463e761af..2fbd9647dd 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
@@ -17,9 +17,10 @@
package org.apache.ignite.internal.cluster.management.raft;
-import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
import java.io.ObjectInputStream;
@@ -33,12 +34,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
+import java.util.function.Supplier;
import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
/**
@@ -51,152 +53,131 @@ public class TestClusterStateStorage implements
ClusterStateStorage {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private volatile boolean isStarted = false;
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping of the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
@Override
public CompletableFuture<Void> start() {
- isStarted = true;
-
return nullCompletedFuture();
}
@Override
public byte @Nullable [] get(byte[] key) {
- lock.readLock().lock();
-
- try {
- return map.get(new ByteArray(key));
- } finally {
- lock.readLock().unlock();
- }
+ return inBusyReadLock(() -> map.get(new ByteArray(key)));
}
@Override
public void put(byte[] key, byte[] value) {
- lock.writeLock().lock();
-
- try {
- map.put(new ByteArray(key), value);
- } finally {
- lock.writeLock().unlock();
- }
+ inBusyWriteLock(() -> map.put(new ByteArray(key), value));
}
@Override
public void replaceAll(byte[] prefix, byte[] key, byte[] value) {
- lock.writeLock().lock();
-
- try {
+ inBusyWriteLock(() -> {
map.entrySet().removeIf(e -> startsWith(e.getKey().bytes(),
prefix));
map.put(new ByteArray(key), value);
- } finally {
- lock.writeLock().unlock();
- }
+ });
}
@Override
public void remove(byte[] key) {
- lock.writeLock().lock();
-
- try {
- map.remove(new ByteArray(key));
- } finally {
- lock.writeLock().unlock();
- }
+ inBusyWriteLock(() -> map.remove(new ByteArray(key)));
}
@Override
public void removeAll(Collection<byte[]> keys) {
- lock.writeLock().lock();
-
- try {
- for (byte[] key : keys) {
- remove(key);
- }
- } finally {
- lock.writeLock().unlock();
- }
+ inBusyWriteLock(() -> keys.forEach(this::remove));
}
@Override
- public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[],
byte[], T> entryTransformer) {
- lock.readLock().lock();
-
- try {
+ public <T> List<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[],
T> entryTransformer) {
+ return inBusyReadLock(() -> {
return map.entrySet().stream()
.filter(e -> startsWith(e.getKey().bytes(), prefix))
.map(e -> entryTransformer.apply(e.getKey().bytes(),
e.getValue()))
- .collect(collectingAndThen(toList(), data ->
Cursor.fromBareIterator(data.iterator())));
- } finally {
- lock.readLock().unlock();
- }
+ .collect(toList());
+ });
}
@Override
public CompletableFuture<Void> snapshot(Path snapshotPath) {
- List<byte[]> keys;
- List<byte[]> values;
-
- lock.readLock().lock();
-
- try {
- keys = new ArrayList<>(map.size());
- values = new ArrayList<>(map.size());
+ return inBusyLockAsync(busyLock, () -> {
+ lock.readLock().lock();
- map.forEach((k, v) -> {
- keys.add(k.bytes());
- values.add(v);
- });
- } finally {
- lock.readLock().unlock();
- }
-
- return CompletableFuture.runAsync(() -> {
- try (var out = new
ObjectOutputStream(Files.newOutputStream(snapshotPath.resolve(SNAPSHOT_FILE))))
{
- out.writeObject(keys);
- out.writeObject(values);
- } catch (Exception e) {
- throw new IgniteInternalException(e);
+ try {
+ List<byte[]> keys = new ArrayList<>(map.size());
+ List<byte[]> values = new ArrayList<>(map.size());
+
+ map.forEach((k, v) -> {
+ keys.add(k.bytes());
+ values.add(v);
+ });
+
+ return CompletableFuture.runAsync(() -> {
+ try (var out = new
ObjectOutputStream(Files.newOutputStream(snapshotPath.resolve(SNAPSHOT_FILE))))
{
+ out.writeObject(keys);
+ out.writeObject(values);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } finally {
+ lock.readLock().unlock();
}
});
}
@Override
public void restoreSnapshot(Path snapshotPath) {
- try (var in = new
ObjectInputStream(Files.newInputStream(snapshotPath.resolve(SNAPSHOT_FILE)))) {
- var keys = (List<byte[]>) in.readObject();
- var values = (List<byte[]>) in.readObject();
-
- lock.writeLock().lock();
+ inBusyWriteLock(() -> {
+ try (var in = new
ObjectInputStream(Files.newInputStream(snapshotPath.resolve(SNAPSHOT_FILE)))) {
+ var keys = (List<byte[]>) in.readObject();
+ var values = (List<byte[]>) in.readObject();
- try {
map.clear();
for (int i = 0; i < keys.size(); i++) {
map.put(new ByteArray(keys.get(i)), values.get(i));
}
- } finally {
- lock.writeLock().unlock();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- } catch (Exception e) {
- throw new IgniteInternalException(e);
- }
+ });
}
@Override
- public void destroy() {
- lock.writeLock().lock();
-
- try {
- map.clear();
- } finally {
- lock.writeLock().unlock();
+ public void stop() {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
}
+
+ busyLock.block();
}
- @Override
- public void stop() {
- isStarted = false;
+ private <T> T inBusyReadLock(Supplier<T> action) {
+ return inBusyLock(busyLock, () -> {
+ lock.readLock().lock();
+
+ try {
+ return action.get();
+ } finally {
+ lock.readLock().unlock();
+ }
+ });
+ }
+
+ private void inBusyWriteLock(Runnable action) {
+ inBusyLock(busyLock, () -> {
+ lock.writeLock().lock();
+
+ try {
+ action.run();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ });
}
}