This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 71cd440c99 IGNITE-24113 Use separate persistent log storages per Raft
node in ItNodeTest (#4969)
71cd440c99 is described below
commit 71cd440c99b4e5d59e1bea2e90317ec4cb11e99b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Dec 25 14:05:37 2024 +0400
IGNITE-24113 Use separate persistent log storages per Raft node in
ItNodeTest (#4969)
---
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 21 +++-----
.../jraft/core/PersistentLogStorageFactories.java | 58 ++++++++++++++++++++++
.../apache/ignite/raft/jraft/core/TestCluster.java | 13 ++---
3 files changed, 70 insertions(+), 22 deletions(-)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index a70b0469cd..e0e0cee115 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -214,7 +214,8 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
private final List<FixedThreadsExecutorGroup> appendEntriesExecutors = new
ArrayList<>();
- private @Nullable DefaultLogStorageFactory persistentLogStorageFactory;
+ private PersistentLogStorageFactories persistentLogStorageFactories;
+
/** Test info. */
private TestInfo testInfo;
@@ -242,6 +243,8 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
testStartMs = Utils.monotonicMs();
dumpThread.interrupt(); // reset dump timeout
+
+ persistentLogStorageFactories = new
PersistentLogStorageFactories(dataPath);
}
@AfterEach
@@ -267,9 +270,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
TestUtils.assertAllJraftThreadsStopped();
- if (persistentLogStorageFactory != null) {
- assertThat(persistentLogStorageFactory.stopAsync(),
willCompleteSuccessfully());
- }
+ persistentLogStorageFactories.shutdown();
log.info(">>>>>>>>>>>>>>> End test method: " +
testInfo.getDisplayName() + ", cost:"
+ (Utils.monotonicMs() - testStartMs) + " ms.");
@@ -4097,9 +4098,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
testInfo
);
- persistentLogStorageFactory = startPersistentLogStorageFactory();
-
- cluster.setRaftServiceFactory(new
IgniteJraftServiceFactory(persistentLogStorageFactory));
+ cluster.setRaftServiceFactories(peerId -> new
IgniteJraftServiceFactory(persistentLogStorageFactories.factoryFor(peerId)));
for (TestPeer peer : peers) {
assertTrue(cluster.start(peer));
@@ -4161,9 +4160,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
testInfo
);
- persistentLogStorageFactory = startPersistentLogStorageFactory();
-
- cluster.setRaftServiceFactory(new
IgniteJraftServiceFactory(persistentLogStorageFactory));
+ cluster.setRaftServiceFactories(peerId -> new
IgniteJraftServiceFactory(persistentLogStorageFactories.factoryFor(peerId)));
for (TestPeer peer : peers) {
assertTrue(cluster.start(peer));
@@ -4201,8 +4198,6 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
@Test
public void
applicationOfLongBatchInStateMachineDoesNotPreventFastShutdown() throws
Exception {
- persistentLogStorageFactory = startPersistentLogStorageFactory();
-
CompletableFuture<Void> allowExecutionFuture = new
CompletableFuture<>();
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 2);
@@ -4211,7 +4206,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
// start being executed as a single long batch.
TestCluster cluster = createClusterOf(peers);
- cluster.setRaftServiceFactory(new
IgniteJraftServiceFactory(persistentLogStorageFactory));
+ cluster.setRaftServiceFactories(peerId -> new
IgniteJraftServiceFactory(persistentLogStorageFactories.factoryFor(peerId)));
cluster.setStateMachineFactory(peerId -> new MockStateMachine(peerId) {
@Override
protected void executeCommand(Iterator iterator) {
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/PersistentLogStorageFactories.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/PersistentLogStorageFactories.java
new file mode 100644
index 0000000000..10aca8dd1d
--- /dev/null
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/PersistentLogStorageFactories.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.core;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.raft.storage.LogStorageFactory;
+import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+
+class PersistentLogStorageFactories {
+ private final String dataPath;
+
+ private final Map<PeerId, LogStorageFactory> factories = new
ConcurrentHashMap<>();
+
+ PersistentLogStorageFactories(String dataPath) {
+ this.dataPath = dataPath;
+ }
+
+ LogStorageFactory factoryFor(PeerId peerId) {
+ return factories.computeIfAbsent(peerId,
this::startPersistentLogStorageFactory);
+ }
+
+ private DefaultLogStorageFactory startPersistentLogStorageFactory(PeerId
peerId) {
+ Path path =
Path.of(dataPath).resolve("logs").resolve(peerId.getConsistentId() + "-" +
peerId.getIdx());
+ DefaultLogStorageFactory persistentLogStorageFactory = new
DefaultLogStorageFactory(path);
+
+ assertThat(persistentLogStorageFactory.startAsync(new
ComponentContext()), willCompleteSuccessfully());
+
+ return persistentLogStorageFactory;
+ }
+
+ void shutdown() {
+ for (LogStorageFactory factory : factories.values()) {
+ assertThat(factory.stopAsync(), willCompleteSuccessfully());
+ }
+ }
+}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 83d32396ea..8f42635714 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -61,7 +61,6 @@ import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftGroupService;
-import org.apache.ignite.raft.jraft.StateMachine;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.NodeOptions;
@@ -105,18 +104,14 @@ public class TestCluster {
/** Test info. */
private final TestInfo testInfo;
- private JRaftServiceFactory raftServiceFactory = new
TestJRaftServiceFactory();
+ private Function<PeerId, JRaftServiceFactory> raftServiceFactories =
peerId -> new TestJRaftServiceFactory();
private LinkedHashSet<TestPeer> learners;
private JraftGroupEventsListener raftGrpEvtsLsnr;
- public JRaftServiceFactory getRaftServiceFactory() {
- return this.raftServiceFactory;
- }
-
- public void setRaftServiceFactory(JRaftServiceFactory raftServiceFactory) {
- this.raftServiceFactory = raftServiceFactory;
+ public void setRaftServiceFactories(Function<PeerId, JRaftServiceFactory>
raftServiceFactories) {
+ this.raftServiceFactories = raftServiceFactories;
}
public LinkedHashSet<PeerId> getLearners() {
@@ -222,7 +217,7 @@ public class TestCluster {
nodeOptions.setEnableMetrics(enableMetrics);
nodeOptions.setSnapshotThrottle(snapshotThrottle);
nodeOptions.setSnapshotIntervalSecs(snapshotIntervalSecs);
- nodeOptions.setServiceFactory(this.raftServiceFactory);
+
nodeOptions.setServiceFactory(this.raftServiceFactories.apply(peer.getPeerId()));
if (clock != null) {
nodeOptions.setClock(clock);
}