This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ab3ea58 IGNITE-15222 Added test that range command works when leader
is changed. Fixes #458
ab3ea58 is described below
commit ab3ea5870a7fceb103f49dc39af927a0c0022654
Author: Mirza Aliev <[email protected]>
AuthorDate: Fri Dec 10 20:52:39 2021 +0300
IGNITE-15222 Added test that range command works when leader is changed.
Fixes #458
Signed-off-by: Slava Koptilin <[email protected]>
---
modules/metastorage-client/pom.xml | 7 +
.../client/ItMetaStorageRaftGroupTest.java | 479 +++++++++++++++++++++
.../client/ItMetaStorageServiceTest.java | 28 +-
3 files changed, 487 insertions(+), 27 deletions(-)
diff --git a/modules/metastorage-client/pom.xml
b/modules/metastorage-client/pom.xml
index b2008f9..f311dd6 100644
--- a/modules/metastorage-client/pom.xml
+++ b/modules/metastorage-client/pom.xml
@@ -107,5 +107,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <!-- Logging in tests -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
new file mode 100644
index 0000000..a5f2375
--- /dev/null
+++
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.client;
+
+import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
+import static
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.when;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Meta storage client tests.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(MockitoExtension.class)
+public class ItMetaStorageRaftGroupTest {
+ /** The logger. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(ItMetaStorageServiceTest.class);
+
+ /** Base network port. */
+ private static final int NODE_PORT_BASE = 20_000;
+
+ /** Nodes. */
+ private static final int NODES = 3;
+
+ /** Meta Storage raft group name. */
+ private static final String METASTORAGE_RAFT_GROUP_NAME =
"METASTORAGE_RAFT_GROUP";
+
+ /** Factory. */
+ private static final RaftMessagesFactory FACTORY = new
RaftMessagesFactory();
+
+ /** Network factory. */
+ private static final TestScaleCubeClusterServiceFactory NETWORK_FACTORY =
new TestScaleCubeClusterServiceFactory();
+
+ private static final MessageSerializationRegistry SERIALIZATION_REGISTRY =
new MessageSerializationRegistryImpl();
+
+ /** Expected server result entry. */
+ private static final org.apache.ignite.internal.metastorage.server.Entry
EXPECTED_SRV_RESULT_ENTRY1 =
+ new org.apache.ignite.internal.metastorage.server.Entry(
+ new byte[] {1},
+ new byte[] {2},
+ 10,
+ 2
+ );
+
+ /** Expected server result entry. */
+ private static final org.apache.ignite.internal.metastorage.server.Entry
EXPECTED_SRV_RESULT_ENTRY2 =
+ new org.apache.ignite.internal.metastorage.server.Entry(
+ new byte[] {3},
+ new byte[] {4},
+ 11,
+ 3
+ );
+
+ /** Expected server result entry. */
+ private static final EntryImpl EXPECTED_RESULT_ENTRY1 =
+ new EntryImpl(
+ new ByteArray(new byte[] {1}),
+ new byte[] {2},
+ 10,
+ 2
+ );
+
+ /** Expected server result entry. */
+ private static final EntryImpl EXPECTED_RESULT_ENTRY2 =
+ new EntryImpl(
+ new ByteArray(new byte[] {3}),
+ new byte[] {4},
+ 11,
+ 3
+ );
+
+ /** Cluster. */
+ private final ArrayList<ClusterService> cluster = new ArrayList<>();
+
+ /** First meta storage raft server. */
+ private RaftServer metaStorageRaftSrv1;
+
+ /** Second meta storage raft server. */
+ private RaftServer metaStorageRaftSrv2;
+
+ /** Third meta storage raft server. */
+ private RaftServer metaStorageRaftSrv3;
+
+ /** First meta storage raft group service. */
+ private RaftGroupService metaStorageRaftGrpSvc1;
+
+ /** Second meta storage raft group service. */
+ private RaftGroupService metaStorageRaftGrpSvc2;
+
+ /** Third meta storage raft group service. */
+ private RaftGroupService metaStorageRaftGrpSvc3;
+
+ /** Mock Metastorage storage. */
+ @Mock
+ private KeyValueStorage mockStorage;
+
+ @WorkDirectory
+ private Path dataPath;
+
+ /** Executor for raft group services. */
+ private ScheduledExecutorService executor;
+
+ /**
+ * Run {@code NODES} cluster nodes.
+ */
+ @BeforeEach
+ public void beforeTest(TestInfo testInfo) {
+ List<NetworkAddress> localAddresses =
findLocalAddresses(NODE_PORT_BASE, NODE_PORT_BASE + NODES);
+
+ var nodeFinder = new StaticNodeFinder(localAddresses);
+
+ localAddresses.stream()
+ .map(
+ addr -> ClusterServiceTestUtils.clusterService(
+ testInfo,
+ addr.port(),
+ nodeFinder,
+ SERIALIZATION_REGISTRY,
+ NETWORK_FACTORY
+ )
+ )
+ .forEach(clusterService -> {
+ clusterService.start();
+ cluster.add(clusterService);
+ });
+
+ for (ClusterService node : cluster) {
+ assertTrue(waitForTopology(node, NODES, 1000));
+ }
+
+ LOG.info("Cluster started.");
+
+ executor = new ScheduledThreadPoolExecutor(20, new
NamedThreadFactory(Loza.CLIENT_POOL_NAME));
+ }
+
+ /**
+ * Shutdown raft server and stop all cluster nodes.
+ *
+ * @throws Exception If failed to shutdown raft server,
+ */
+ @AfterEach
+ public void afterTest() throws Exception {
+ if (metaStorageRaftSrv3 != null) {
+ metaStorageRaftSrv3.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME);
+ metaStorageRaftSrv3.stop();
+ metaStorageRaftGrpSvc3.shutdown();
+ }
+
+ if (metaStorageRaftSrv2 != null) {
+ metaStorageRaftSrv2.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME);
+ metaStorageRaftSrv2.stop();
+ metaStorageRaftGrpSvc2.shutdown();
+ }
+
+ if (metaStorageRaftSrv1 != null) {
+ metaStorageRaftSrv1.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME);
+ metaStorageRaftSrv1.stop();
+ metaStorageRaftGrpSvc1.shutdown();
+ }
+
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
+
+ for (ClusterService node : cluster) {
+ node.stop();
+ }
+ }
+
+
+ /**
+ * Tests that {@link MetaStorageService#range(ByteArray, ByteArray,
long)}} next command works correctly
+ * after leader changing.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRangeNextWorksCorrectlyAfterLeaderChange() throws
Exception {
+ final AtomicInteger replicatorStartedCounter = new AtomicInteger(0);
+
+ final AtomicInteger replicatorStoppedCounter = new AtomicInteger(0);
+
+ when(mockStorage.range(EXPECTED_RESULT_ENTRY1.key().bytes(), new
byte[]{4})).thenAnswer(invocation -> {
+ List<org.apache.ignite.internal.metastorage.server.Entry> entries
= new ArrayList<>(
+ List.of(EXPECTED_SRV_RESULT_ENTRY1,
EXPECTED_SRV_RESULT_ENTRY2));
+
+ return new
Cursor<org.apache.ignite.internal.metastorage.server.Entry>() {
+ private final
Iterator<org.apache.ignite.internal.metastorage.server.Entry> it =
entries.iterator();
+
+ @Override
+ public void close() {
+ }
+
+ @NotNull
+ @Override
+ public
Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
+ return it;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public org.apache.ignite.internal.metastorage.server.Entry
next() {
+ return it.next();
+ }
+ };
+ });
+
+ Map<RaftServer, RaftGroupService> raftServersRaftGroups =
prepareJraftMetaStorages(
+ replicatorStartedCounter,
+ replicatorStoppedCounter);
+
+ Set<RaftServer> raftServers = raftServersRaftGroups.keySet();
+
+ NetworkAddress oldLeader =
raftServersRaftGroups.get(metaStorageRaftSrv1).leader().address();
+
+ Optional<RaftServer> oldLeaderServer = raftServers.stream()
+ .filter(s ->
s.clusterService().topologyService().localMember().address().equals(oldLeader)).findFirst();
+
+ //Server that will be alive after we stop leader.
+ Optional<RaftServer> liveServer = raftServers.stream()
+ .filter(s ->
!s.clusterService().topologyService().localMember().address().equals(oldLeader)).findFirst();
+
+ if (oldLeaderServer.isEmpty() || liveServer.isEmpty()) {
+ fail();
+ }
+
+ MetaStorageService metaStorageSvc = new
MetaStorageServiceImpl(raftServersRaftGroups.get(liveServer.get()),
"some_node");
+
+ Cursor<Entry> cursor =
metaStorageSvc.range(EXPECTED_RESULT_ENTRY1.key(), new ByteArray(new byte[]
{4}));
+
+ assertTrue(TestUtils.waitForCondition(
+ () -> replicatorStartedCounter.get() == 2, 5_000),
replicatorStartedCounter.get() + "");
+
+ assertTrue(cursor.hasNext());
+
+ assertEquals(EXPECTED_RESULT_ENTRY1, (cursor.iterator().next()));
+
+ // ensure that leader has not been changed
+ assertTrue(TestUtils.waitForCondition(
+ () -> replicatorStartedCounter.get() == 2, 5_000),
replicatorStartedCounter.get() + "");
+
+ //stop leader
+ oldLeaderServer.get().stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME);
+ oldLeaderServer.get().stop();
+ cluster.stream().filter(c ->
c.topologyService().localMember().address().equals(oldLeader)).findFirst().get().stop();
+
+ raftServersRaftGroups.get(liveServer.get()).refreshLeader().get();
+
+ assertNotSame(oldLeader,
raftServersRaftGroups.get(liveServer.get()).leader().address());
+
+ // ensure that leader has been changed only once
+ assertTrue(TestUtils.waitForCondition(
+ () -> replicatorStartedCounter.get() == 4, 5_000),
replicatorStartedCounter.get() + "");
+ assertTrue(TestUtils.waitForCondition(
+ () -> replicatorStoppedCounter.get() == 2, 5_000),
replicatorStoppedCounter.get() + "");
+
+ assertTrue(cursor.hasNext());
+ assertEquals(EXPECTED_RESULT_ENTRY2, (cursor.iterator().next()));
+ }
+
+ private Map<RaftServer, RaftGroupService> prepareJraftMetaStorages(
+ AtomicInteger replicatorStartedCounter,
+ AtomicInteger replicatorStoppedCounter
+ ) throws InterruptedException, ExecutionException {
+ List<Peer> peers = new ArrayList<>();
+
+ cluster.forEach(c -> peers.add(new
Peer(c.topologyService().localMember().address())));
+
+ assertTrue(cluster.size() > 1);
+
+ NodeOptions opt1 = new NodeOptions();
+ opt1.setReplicationStateListeners(
+ List.of(new
UserReplicatorStateListener(replicatorStartedCounter,
replicatorStoppedCounter)));
+
+ NodeOptions opt2 = new NodeOptions();
+ opt2.setReplicationStateListeners(
+ List.of(new
UserReplicatorStateListener(replicatorStartedCounter,
replicatorStoppedCounter)));
+
+ NodeOptions opt3 = new NodeOptions();
+ opt3.setReplicationStateListeners(
+ List.of(new
UserReplicatorStateListener(replicatorStartedCounter,
replicatorStoppedCounter)));
+
+ metaStorageRaftSrv1 = new JraftServerImpl(cluster.get(0), dataPath,
opt1);
+
+ metaStorageRaftSrv2 = new JraftServerImpl(cluster.get(1), dataPath,
opt2);
+
+ metaStorageRaftSrv3 = new JraftServerImpl(cluster.get(2), dataPath,
opt3);
+
+ metaStorageRaftSrv1.start();
+
+ metaStorageRaftSrv2.start();
+
+ metaStorageRaftSrv3.start();
+
+ metaStorageRaftSrv1.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new
MetaStorageListener(mockStorage), peers);
+
+ metaStorageRaftSrv2.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new
MetaStorageListener(mockStorage), peers);
+
+ metaStorageRaftSrv3.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new
MetaStorageListener(mockStorage), peers);
+
+ metaStorageRaftGrpSvc1 = RaftGroupServiceImpl.start(
+ METASTORAGE_RAFT_GROUP_NAME,
+ cluster.get(0),
+ FACTORY,
+ 10_000,
+ peers,
+ true,
+ 200,
+ executor
+ ).get();
+
+ metaStorageRaftGrpSvc2 = RaftGroupServiceImpl.start(
+ METASTORAGE_RAFT_GROUP_NAME,
+ cluster.get(1),
+ FACTORY,
+ 10_000,
+ peers,
+ true,
+ 200,
+ executor
+ ).get();
+
+ metaStorageRaftGrpSvc3 = RaftGroupServiceImpl.start(
+ METASTORAGE_RAFT_GROUP_NAME,
+ cluster.get(2),
+ FACTORY,
+ 10_000,
+ peers,
+ true,
+ 200,
+ executor
+ ).get();
+
+ assertTrue(TestUtils
+ .waitForCondition(
+ () -> sameLeaders(metaStorageRaftGrpSvc1,
metaStorageRaftGrpSvc2, metaStorageRaftGrpSvc3), 10_000),
+ "Leaders: " + metaStorageRaftGrpSvc1.leader() + " " +
metaStorageRaftGrpSvc2.leader() + " " + metaStorageRaftGrpSvc3
+ .leader());
+
+ Map<RaftServer, RaftGroupService> raftServersRaftGroups = new
HashMap<>();
+
+ raftServersRaftGroups.put(metaStorageRaftSrv1, metaStorageRaftGrpSvc1);
+ raftServersRaftGroups.put(metaStorageRaftSrv2, metaStorageRaftGrpSvc2);
+ raftServersRaftGroups.put(metaStorageRaftSrv3, metaStorageRaftGrpSvc3);
+
+ return raftServersRaftGroups;
+ }
+
+ /**
+ * Checks if all raft groups have the same leader.
+ *
+ * @param group1 Raft group 1
+ * @param group2 Raft group 2
+ * @param group3 Raft group 3
+ * @return {@code true} if all raft groups have the same leader.
+ */
+ private boolean sameLeaders(RaftGroupService group1, RaftGroupService
group2, RaftGroupService group3) {
+ group1.refreshLeader();
+ group2.refreshLeader();
+ group3.refreshLeader();
+
+ return Objects.equals(group1.leader(), group2.leader()) &&
Objects.equals(group2.leader(), group3.leader());
+ }
+
+ /**
+ * User's replicator state listener.
+ */
+ static class UserReplicatorStateListener implements
Replicator.ReplicatorStateListener {
+ /** Replicator started counter. */
+ private final AtomicInteger replicatorStartedCounter;
+
+ /** Replicator stopped counter. */
+ private final AtomicInteger replicatorStoppedCounter;
+
+ /**
+ * Constructor.
+ *
+ * @param replicatorStartedCounter Replicator started counter.
+ * @param replicatorStoppedCounter Replicator stopped counter.
+ */
+ UserReplicatorStateListener(AtomicInteger replicatorStartedCounter,
AtomicInteger replicatorStoppedCounter) {
+ this.replicatorStartedCounter = replicatorStartedCounter;
+ this.replicatorStoppedCounter = replicatorStoppedCounter;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onCreated(PeerId peer) {
+ int val = replicatorStartedCounter.incrementAndGet();
+
+ LOG.info("Replicator has been created {} {}", peer, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onError(PeerId peer, Status status) {
+ LOG.info("Replicator has errors {} {}", peer, status);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onDestroyed(PeerId peer) {
+ int val = replicatorStoppedCounter.incrementAndGet();
+
+ LOG.info("Replicator has been destroyed {} {}", peer, val);
+ }
+ }
+}
diff --git
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
index 4ab19ec..61a236f 100644
---
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
+++
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.metastorage.client;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -857,33 +858,6 @@ public class ItMetaStorageServiceTest {
}
/**
- * Wait for topology.
- *
- * @param cluster The cluster.
- * @param exp Expected count.
- * @param timeout The timeout in millis.
- * @return {@code True} if topology size is equal to expected.
- */
- @SuppressWarnings("SameParameterValue")
- private static boolean waitForTopology(ClusterService cluster, int exp,
int timeout) {
- long stop = System.currentTimeMillis() + timeout;
-
- while (System.currentTimeMillis() < stop) {
- if (cluster.topologyService().allMembers().size() >= exp) {
- return true;
- }
-
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- return false;
- }
- }
-
- return false;
- }
-
- /**
* Prepares meta storage by instantiating corresponding raft server with
{@link MetaStorageListener} and {@link
* MetaStorageServiceImpl}.
*