This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ee26ae637c IGNITE-18640 Implement placement driver best-effort single
actor selector and fail-over (#1692)
ee26ae637c is described below
commit ee26ae637c7b200c993b9a7dee36f2dbebdda4df
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Feb 21 15:56:43 2023 +0200
IGNITE-18640 Implement placement driver best-effort single actor selector
and fail-over (#1692)
---
.../internal/placementdriver/ActiveActorTest.java | 88 +++++++++
.../client/TopologyAwareRaftGroupServiceTest.java | 212 +++++++++++++++++----
.../placementdriver/PlacementDriverManager.java | 142 +++++++++++++-
.../raft/client/TopologyAwareRaftGroupService.java | 72 ++++---
.../java/org/apache/ignite/internal/raft/Loza.java | 39 +++-
.../raft/server/impl/RaftServiceEventListener.java | 17 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 33 +++-
7 files changed, 526 insertions(+), 77 deletions(-)
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
new file mode 100644
index 0000000000..4eb18b7e85
--- /dev/null
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceTest;
+import org.apache.ignite.network.ClusterService;
+import org.junit.jupiter.api.AfterEach;
+
+/**
+ * Placement driver active actor test.
+ */
+public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest {
+ private Map<String, PlacementDriverManager> placementDriverManagers = new
HashMap<>();
+
+ @AfterEach
+ @Override
+ protected void afterTest() throws Exception {
+ List<AutoCloseable> closeables =
placementDriverManagers.values().stream().map(p -> (AutoCloseable)
p::stop).collect(toList());
+
+ closeAll(closeables);
+
+ placementDriverManagers.clear();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void afterNodeStart(String nodeName, ClusterService
clusterService, Set<String> placementDriverNodesNames) {
+ PlacementDriverManager placementDriverManager = new
PlacementDriverManager(
+ TestReplicationGroup.GROUP_ID,
+ clusterService,
+ raftConfiguration,
+ () -> completedFuture(placementDriverNodesNames),
+ new LogicalTopologyServiceTestImpl(clusterService),
+ executor
+ );
+
+ placementDriverManager.start();
+
+ placementDriverManagers.put(nodeName, placementDriverManager);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected boolean afterInitCheckCondition(String leaderName) {
+ return checkSingleActiveActor(leaderName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected boolean afterLeaderChangeCheckCondition(String leaderName) {
+ return checkSingleActiveActor(leaderName);
+ }
+
+ private boolean checkSingleActiveActor(String leaderName) {
+ for (Map.Entry<String, PlacementDriverManager> e :
placementDriverManagers.entrySet()) {
+ if (e.getValue().isActiveActor() != e.getKey().equals(leaderName))
{
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
index caef80fffe..14cc2537bd 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.raft.client;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceTest.TestReplicationGroup.GROUP_ID;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -31,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -53,7 +56,6 @@ import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -62,6 +64,7 @@ import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -78,10 +81,10 @@ public class TopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
private static final int PORT_BASE = 1234;
@InjectConfiguration
- private RaftConfiguration raftConfiguration;
+ protected RaftConfiguration raftConfiguration;
/** RPC executor. */
- private ScheduledExecutorService executor = new
ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client",
log));
+ protected ScheduledExecutorService executor = new
ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client",
log));
@Test
public void testOneNodeReplicationGroup(TestInfo testInfo) throws
Exception {
@@ -99,15 +102,15 @@ public class TopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
CompletableFuture<ClusterNode> leaderFut = new CompletableFuture<>();
- raftClient.subscribeLeader(node -> {
- leaderFut.complete(node);
- });
+ raftClient.subscribeLeader((node, term) -> leaderFut.complete(node));
ClusterNode leader = leaderFut.get(10, TimeUnit.SECONDS);
assertNotNull(leader);
assertEquals(PORT_BASE, leader.address().port());
+ afterInitCheckConditionWithWait(leader.name());
+
stopCluster(clusterServices, raftServers, raftClient, 2);
}
@@ -115,23 +118,45 @@ public class TopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
public void testChangeLeaderWhenActualLeft(TestInfo testInfo) throws
Exception {
var clusterServices = new HashMap<NetworkAddress, ClusterService>();
var raftServers = new HashMap<NetworkAddress, JraftServerImpl>();
+ Predicate<NetworkAddress> isServerAddress = addr -> addr.port() <
PORT_BASE + 3;
TopologyAwareRaftGroupService raftClient = startCluster(
testInfo,
clusterServices,
raftServers,
- addr -> addr.port() < PORT_BASE + 3,
+ isServerAddress,
4,
PORT_BASE + 3
);
+ raftClient.refreshLeader().get();
+
+ TopologyAwareRaftGroupService raftClientNoInitialNotify =
startTopologyAwareClient(
+ clusterServices.entrySet().iterator().next().getValue(),
+ clusterServices,
+ isServerAddress,
+ 4,
+ false
+ );
+
AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+ AtomicReference<ClusterNode> leaderRefNoInitialNotify = new
AtomicReference<>();
+ AtomicInteger callsCount = new AtomicInteger();
+
+ raftClient.subscribeLeader((node, term) -> leaderRef.set(node));
- raftClient.subscribeLeader(node -> {
- leaderRef.set(node);
- });
+ for (int i = 0; i < 2; i++) {
+ raftClientNoInitialNotify.unsubscribeLeader();
- assertTrue(IgniteTestUtils.waitForCondition(() -> leaderRef.get() !=
null, 10_000));
+ raftClientNoInitialNotify.subscribeLeader((node, term) -> {
+ callsCount.incrementAndGet();
+ leaderRefNoInitialNotify.set(node);
+ });
+ }
+
+ assertTrue(callsCount.get() <= 1);
+
+ assertTrue(waitForCondition(() -> leaderRef.get() != null, 10_000));
ClusterNode leader = leaderRef.get();
@@ -139,16 +164,21 @@ public class TopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
log.info("Leader: " + leader);
+ afterInitCheckConditionWithWait(leader.name());
+
var raftServiceToStop = raftServers.remove(new
NetworkAddress("localhost", leader.address().port()));
raftServiceToStop.stopRaftNodes(GROUP_ID);
raftServiceToStop.stop();
clusterServices.remove(new NetworkAddress("localhost",
leader.address().port())).stop();
- assertTrue(IgniteTestUtils.waitForCondition(() ->
!leader.equals(leaderRef.get()), 10_000));
+ assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()),
10_000));
+ assertTrue(waitForCondition(() ->
!leader.equals(leaderRefNoInitialNotify.get()), 1000));
log.info("New Leader: " + leaderRef.get());
+ afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
+
raftClient.refreshLeader().get();
assertEquals(raftClient.leader().consistentId(),
leaderRef.get().name());
@@ -160,23 +190,45 @@ public class TopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
public void testChangeLeaderForce(TestInfo testInfo) throws Exception {
var clusterServices = new HashMap<NetworkAddress, ClusterService>();
var raftServers = new HashMap<NetworkAddress, JraftServerImpl>();
+ Predicate<NetworkAddress> isServerAddress = addr -> addr.port() <
PORT_BASE + 3;
TopologyAwareRaftGroupService raftClient = startCluster(
testInfo,
clusterServices,
raftServers,
- addr -> addr.port() < PORT_BASE + 3,
+ isServerAddress,
4,
PORT_BASE + 3
);
+ raftClient.refreshLeader().get();
+
+ TopologyAwareRaftGroupService raftClientNoInitialNotify =
startTopologyAwareClient(
+ clusterServices.entrySet().iterator().next().getValue(),
+ clusterServices,
+ isServerAddress,
+ 4,
+ false
+ );
+
AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+ AtomicReference<ClusterNode> leaderRefNoInitialNotify = new
AtomicReference<>();
+ AtomicInteger callsCount = new AtomicInteger();
+
+ raftClient.subscribeLeader((node, term) -> leaderRef.set(node));
+
+ for (int i = 0; i < 2; i++) {
+ raftClientNoInitialNotify.unsubscribeLeader();
+
+ raftClientNoInitialNotify.subscribeLeader((node, term) -> {
+ callsCount.incrementAndGet();
+ leaderRefNoInitialNotify.set(node);
+ });
+ }
- raftClient.subscribeLeader(node -> {
- leaderRef.set(node);
- });
+ assertTrue(callsCount.get() <= 1);
- assertTrue(IgniteTestUtils.waitForCondition(() -> leaderRef.get() !=
null, 10_000));
+ assertTrue(waitForCondition(() -> leaderRef.get() != null, 10_000));
ClusterNode leader = leaderRef.get();
@@ -184,16 +236,25 @@ public class TopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
log.info("Leader: " + leader);
+ afterInitCheckConditionWithWait(leader.name());
+
Peer newLeaderPeer = raftClient.peers().stream().filter(peer ->
!leader.name().equals(peer.consistentId())).findAny().get();
log.info("Peer to transfer leader: " + newLeaderPeer);
raftClient.transferLeadership(newLeaderPeer).get();
- assertTrue(IgniteTestUtils.waitForCondition(() ->
newLeaderPeer.consistentId().equals(leaderRef.get().name()), 10_000));
+ String leaderId = newLeaderPeer.consistentId();
+
+ assertTrue(waitForCondition(() ->
leaderId.equals(leaderRef.get().name()), 10_000));
+ assertTrue(waitForCondition(
+ () -> leaderRefNoInitialNotify.get() != null &&
leaderId.equals(leaderRefNoInitialNotify.get().name()), 1000)
+ );
log.info("New Leader: " + leaderRef.get());
+ afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
+
raftClient.refreshLeader().get();
assertEquals(raftClient.leader().consistentId(),
leaderRef.get().name());
@@ -266,14 +327,13 @@ public class TopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
clusterServices.put(addr, cluster);
}
+ PeersAndLearners peersAndLearners = peersAndLearners(clusterServices,
isServerAddress, nodes);
+
+ Set<String> placementDriverNodesNames =
peersAndLearners.peers().stream().map(Peer::consistentId).collect(toSet());
+
for (NetworkAddress addr : addresses) {
var cluster = clusterServices.get(addr);
- PeersAndLearners peersAndLearners =
PeersAndLearners.fromConsistentIds(
- addresses.stream().filter(isServerAddress)
- .map(netAddr ->
clusterServices.get(netAddr).topologyService().localMember().name()).collect(
- Collectors.toSet()));
-
if (isServerAddress.test(addr)) { //RAFT server node
var localPeer = peersAndLearners.peers().stream()
.filter(peer ->
peer.consistentId().equals(cluster.topologyService().localMember().name())).findAny().get();
@@ -289,24 +349,49 @@ public class TopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
);
raftServers.put(addr, raftServer);
+
+ afterNodeStart(localPeer.consistentId(), cluster,
placementDriverNodesNames);
}
if (addr.port() == clientPort) {
- raftClient = (TopologyAwareRaftGroupService)
TopologyAwareRaftGroupService.start(
- GROUP_ID,
- cluster,
- FACTORY,
- raftConfiguration,
- peersAndLearners,
- true,
- executor,
- new LogicalTopologyServiceTestImpl(cluster)
- ).join();
+ raftClient = startTopologyAwareClient(cluster,
clusterServices, isServerAddress, nodes, true);
}
}
+
return raftClient;
}
+ private TopologyAwareRaftGroupService startTopologyAwareClient(
+ ClusterService localClusterService,
+ HashMap<NetworkAddress, ClusterService> clusterServices,
+ Predicate<NetworkAddress> isServerAddress,
+ int nodes,
+ boolean notifyOnSubscription
+ ) {
+ return (TopologyAwareRaftGroupService)
TopologyAwareRaftGroupService.start(
+ GROUP_ID,
+ localClusterService,
+ FACTORY,
+ raftConfiguration,
+ peersAndLearners(clusterServices, isServerAddress, nodes),
+ true,
+ executor,
+ new LogicalTopologyServiceTestImpl(localClusterService),
+ notifyOnSubscription
+ ).join();
+ }
+
+ private static PeersAndLearners peersAndLearners(
+ HashMap<NetworkAddress, ClusterService> clusterServices,
+ Predicate<NetworkAddress> isServerAddress,
+ int nodes
+ ) {
+ return PeersAndLearners.fromConsistentIds(
+ getNetworkAddresses(nodes).stream().filter(isServerAddress)
+ .map(netAddr ->
clusterServices.get(netAddr).topologyService().localMember().name()).collect(
+ toSet()));
+ }
+
/**
* Generates a node address for each node.
*
@@ -320,6 +405,62 @@ public class TopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
return addresses;
}
+ @AfterEach
+ protected void afterTest() throws Exception {
+ // No-op.
+ }
+
+ /**
+ * The method is called after every node of the cluster starts.
+ *
+ * @param nodeName Node name.
+ * @param clusterService Cluster service.
+ * @param placementDriverNodesNames Names of all nodes in raft group.
+ */
+ protected void afterNodeStart(String nodeName, ClusterService
clusterService, Set<String> placementDriverNodesNames) {
+ // No-op.
+ }
+
+ /**
+ * Checks the condition after cluster and raft clients initialization,
waiting for this condition.
+ *
+ * @param leaderName Current leader name.
+ * @throws InterruptedException If failed.
+ */
+ private void afterInitCheckConditionWithWait(String leaderName) throws
InterruptedException {
+ assertTrue(waitForCondition(() -> afterInitCheckCondition(leaderName),
10_000));
+ }
+
+ /**
+ * Checks the condition after cluster and raft clients initialization.
+ *
+ * @param leaderName Current leader name.
+ * @return Condition result.
+ */
+ protected boolean afterInitCheckCondition(String leaderName) {
+ return true;
+ }
+
+ /**
+ * Checks the condition after leader change, waiting for this condition.
+ *
+ * @param leaderName Current leader name.
+ * @throws InterruptedException If failed.
+ */
+ private void afterLeaderChangeCheckConditionWithWait(String leaderName)
throws InterruptedException {
+ assertTrue(waitForCondition(() ->
afterLeaderChangeCheckCondition(leaderName), 10_000));
+ }
+
+ /**
+ * Checks the condition after leader change.
+ *
+ * @param leaderName Current leader name.
+ * @return Condition result.
+ */
+ protected boolean afterLeaderChangeCheckCondition(String leaderName) {
+ return true;
+ }
+
/**
* Replication test group class.
*/
@@ -360,7 +501,10 @@ public class TopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
}
}
- private static class LogicalTopologyServiceTestImpl implements
LogicalTopologyService {
+ /**
+ * Test implementation of {@link LogicalTopologyService}.
+ */
+ protected static class LogicalTopologyServiceTestImpl implements
LogicalTopologyService {
private final ClusterService clusterService;
public LogicalTopologyServiceTestImpl(ClusterService clusterService) {
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index a30a9c040f..cbf07def6e 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -17,10 +17,25 @@
package org.apache.ignite.internal.placementdriver;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.jetbrains.annotations.TestOnly;
/**
* Placement driver manager.
@@ -31,21 +46,100 @@ public class PlacementDriverManager implements
IgniteComponent {
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+ private final RaftMessagesFactory raftMessagesFactory = new
RaftMessagesFactory();
+
/** Prevents double stopping of the component. */
private final AtomicBoolean isStopped = new AtomicBoolean();
+ private final ReplicationGroupId replicationGroupId;
+
+ private final ClusterService clusterService;
+
+ private final Supplier<CompletableFuture<Set<String>>>
placementDriverNodesNamesProvider;
+
+ /**
+ * Raft client future. Can contain null, if this node is not in placement
driver group.
+ */
+ private final CompletableFuture<TopologyAwareRaftGroupService>
raftClientFuture;
+
+ private final ScheduledExecutorService raftClientExecutor;
+
+ private final LogicalTopologyService logicalTopologyService;
+
+ private final RaftConfiguration raftConfiguration;
+
+ private volatile boolean isActiveActor;
+
+ private volatile long lastTermSeen = -1;
+
/**
* The constructor.
*
- * @param metaStorageMgr Meta Storage manager.
+ * @param replicationGroupId Id of placement driver group.
+ * @param clusterService Cluster service.
+ * @param raftConfiguration Raft configuration.
+ * @param placementDriverNodesNamesProvider Provider of the set of
placement driver nodes' names.
+ * @param logicalTopologyService Logical topology service.
+ * @param raftClientExecutor Raft client executor.
*/
- public PlacementDriverManager(MetaStorageManager metaStorageMgr) {
+ public PlacementDriverManager(
+ ReplicationGroupId replicationGroupId,
+ ClusterService clusterService,
+ RaftConfiguration raftConfiguration,
+ Supplier<CompletableFuture<Set<String>>>
placementDriverNodesNamesProvider,
+ LogicalTopologyService logicalTopologyService,
+ ScheduledExecutorService raftClientExecutor
+ ) {
+ this.replicationGroupId = replicationGroupId;
+ this.clusterService = clusterService;
+ this.raftConfiguration = raftConfiguration;
+ this.placementDriverNodesNamesProvider =
placementDriverNodesNamesProvider;
+ this.logicalTopologyService = logicalTopologyService;
+ this.raftClientExecutor = raftClientExecutor;
+
+ raftClientFuture = new CompletableFuture<>();
}
/** {@inheritDoc} */
@Override
public void start() {
+ placementDriverNodesNamesProvider.get()
+ .thenCompose(placementDriverNodes -> {
+ String thisNodeName =
clusterService.topologyService().localMember().name();
+
+ if (placementDriverNodes.contains(thisNodeName)) {
+ return TopologyAwareRaftGroupService.start(
+ replicationGroupId,
+ clusterService,
+ raftMessagesFactory,
+ raftConfiguration,
+
PeersAndLearners.fromConsistentIds(placementDriverNodes),
+ true,
+ raftClientExecutor,
+ logicalTopologyService,
+ true
+ ).thenCompose(client -> {
+ TopologyAwareRaftGroupService
topologyAwareClient = (TopologyAwareRaftGroupService) client;
+ return
topologyAwareClient.subscribeLeader(this::onLeaderChange).thenApply(v ->
topologyAwareClient);
+ });
+ } else {
+ return completedFuture(null);
+ }
+ })
+ .whenComplete((client, ex) -> {
+ if (ex == null) {
+ raftClientFuture.complete(client);
+ } else {
+ raftClientFuture.completeExceptionally(ex);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void beforeNodeStop() {
+ withRaftClientIfPresent(c -> c.unsubscribeLeader().join());
}
/** {@inheritDoc} */
@@ -56,5 +150,47 @@ public class PlacementDriverManager implements
IgniteComponent {
}
busyLock.block();
+
+ withRaftClientIfPresent(TopologyAwareRaftGroupService::shutdown);
+ }
+
+ private void
withRaftClientIfPresent(Consumer<TopologyAwareRaftGroupService> closure) {
+ raftClientFuture.thenAccept(client -> {
+ if (client != null) {
+ closure.accept(client);
+ }
+ });
+ }
+
+ private void onLeaderChange(ClusterNode leader, Long term) {
+ if (term > lastTermSeen) {
+ if (leader.equals(clusterService.topologyService().localMember()))
{
+ takeOverActiveActor();
+ } else {
+ stepDownActiveActor();
+ }
+
+ lastTermSeen = term;
+ }
+ }
+
+ /**
+ * Takes over active actor of placement driver group.
+ */
+ private void takeOverActiveActor() {
+ isActiveActor = true;
+ }
+
+
+ /**
+ * Steps down as active actor.
+ */
+ private void stepDownActiveActor() {
+ isActiveActor = false;
+ }
+
+ @TestOnly
+ boolean isActiveActor() {
+ return isActiveActor;
}
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 47d16c65cf..d77a1f58b5 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -25,7 +25,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
@@ -78,6 +78,11 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
/** RAFT configuration. */
private final RaftConfiguration raftConfiguration;
+ /**
+ * Whether to notify callback after subscription to pass the current
leader and term into it, even if the leader
+ * did not change in that moment (see {@link
#subscribeLeader(BiConsumer)}).
+ */
+ private final boolean notifyOnSubscription;
/**
* The constructor.
@@ -87,6 +92,8 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
* @param executor RPC executor.
* @param raftClient RPC RAFT client.
* @param logicalTopologyService Logical topology.
+ * @param notifyOnSubscription Whether to notify callback after
subscription to pass the current leader and term into it,
+ * even if the leader did not change in that moment (see {@link
#subscribeLeader(BiConsumer)}).
*/
private TopologyAwareRaftGroupService(
ClusterService cluster,
@@ -94,7 +101,8 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
ScheduledExecutorService executor,
RaftConfiguration raftConfiguration,
RaftGroupService raftClient,
- LogicalTopologyService logicalTopologyService
+ LogicalTopologyService logicalTopologyService,
+ boolean notifyOnSubscription
) {
this.clusterService = cluster;
this.factory = factory;
@@ -103,6 +111,7 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
this.raftClient = raftClient;
this.logicalTopologyService = logicalTopologyService;
this.serverEventHandler = new ServerEventHandler();
+ this.notifyOnSubscription = notifyOnSubscription;
cluster.messagingService().addMessageHandler(RaftMessageGroup.class,
(message, senderConsistentId, correlationId) -> {
if (message instanceof LeaderChangeNotification) {
@@ -154,6 +163,8 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
* @param getLeader True to get the group's leader upon service creation.
* @param executor RPC executor.
* @param logicalTopologyService Logical topology service.
+ * @param notifyOnSubscription Whether to notify callback after
subscription to pass the current leader and term into it,
+ * even if the leader did not change in that moment (see {@link
#subscribeLeader(BiConsumer)}).
* @return Future to create a raft client.
*/
public static CompletableFuture<RaftGroupService> start(
@@ -164,11 +175,12 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
PeersAndLearners configuration,
boolean getLeader,
ScheduledExecutorService executor,
- LogicalTopologyService logicalTopologyService
+ LogicalTopologyService logicalTopologyService,
+ boolean notifyOnSubscription
) {
return RaftGroupServiceImpl.start(groupId, cluster, factory,
raftConfiguration, configuration, getLeader, executor)
.thenApply(raftGroupService -> new
TopologyAwareRaftGroupService(cluster, factory, executor, raftConfiguration,
- raftGroupService, logicalTopologyService));
+ raftGroupService, logicalTopologyService,
notifyOnSubscription));
}
/**
@@ -247,8 +259,9 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
* Assigns a closure to call when a leader will is elected.
*
* @param callback Callback closure.
+ * @return Future that is completed when all subscription messages to
peers are sent.
*/
- public void subscribeLeader(Consumer<ClusterNode> callback) {
+ public CompletableFuture<Void> subscribeLeader(BiConsumer<ClusterNode,
Long> callback) {
assert !serverEventHandler.isSubscribed() : "The node already
subscribed";
int peers = peers().size();
@@ -272,38 +285,51 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
}
}
- CompletableFuture.allOf(futs).whenCompleteAsync((unused, throwable) ->
{
- if (throwable != null) {
- throw new IgniteException(Common.UNEXPECTED_ERR, throwable);
- }
-
- refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm -> {
- if (leaderWithTerm.leader() != null) {
- serverEventHandler.onLeaderElected(
-
clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId()),
- leaderWithTerm.term()
- );
+ if (notifyOnSubscription) {
+ return CompletableFuture.allOf(futs).whenCompleteAsync((unused,
throwable) -> {
+ if (throwable != null) {
+ throw new IgniteException(Common.UNEXPECTED_ERR,
throwable);
}
+
+ refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm
-> {
+ if (leaderWithTerm.leader() != null) {
+ serverEventHandler.onLeaderElected(
+
clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId()),
+ leaderWithTerm.term()
+ );
+ }
+ }, executor);
}, executor);
- }, executor);
+ } else {
+ return CompletableFuture.allOf(futs);
+ }
}
/**
* Unsubscribe of notification about a leader elected.
+ *
+ * @return Future that is completed when all messages about cancelling
subscription to peers are sent.
*/
- public void unsubscribeLeader() {
+ public CompletableFuture<Void> unsubscribeLeader() {
serverEventHandler.setOnLeaderElectedCallback(null);
- for (Peer peer : peers()) {
+ var peers = peers();
+ var futs = new CompletableFuture[peers.size()];
+
+ for (int i = 0; i < peers.size(); i++) {
+ Peer peer = peers.get(i);
+
ClusterNode node =
clusterService.topologyService().getByConsistentId(peer.consistentId());
if (node != null) {
- sendSubscribeMessage(node,
factory.subscriptionLeaderChangeRequest()
+ futs[i] = sendSubscribeMessage(node,
factory.subscriptionLeaderChangeRequest()
.groupId(groupId())
.subscribe(false)
.build());
}
}
+
+ return CompletableFuture.allOf(futs);
}
@Override
@@ -409,7 +435,7 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
private long term = 0;
/** A leader elected callback. */
- private Consumer<ClusterNode> onLeaderElectedCallback;
+ private BiConsumer<ClusterNode, Long> onLeaderElectedCallback;
/**
* Notifies about a new leader elected, if it did not make before.
@@ -421,7 +447,7 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
if (onLeaderElectedCallback != null && term > this.term) {
this.term = term;
- onLeaderElectedCallback.accept(node);
+ onLeaderElectedCallback.accept(node, term);
}
}
@@ -430,7 +456,7 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
*
* @param onLeaderElectedCallback A callback closure.
*/
- public synchronized void
setOnLeaderElectedCallback(Consumer<ClusterNode> onLeaderElectedCallback) {
+ public synchronized void
setOnLeaderElectedCallback(BiConsumer<ClusterNode, Long>
onLeaderElectedCallback) {
this.onLeaderElectedCallback = onLeaderElectedCallback;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 18e6836780..27d2c1d638 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -22,7 +22,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -37,7 +36,6 @@ import
org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
@@ -62,7 +60,7 @@ public class Loza implements RaftManager {
public static final String CLIENT_POOL_NAME = "Raft-Group-Client";
/** Raft client pool size. Size was taken from jraft's TimeManager. */
- private static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
+ public static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(Loza.class);
@@ -94,12 +92,14 @@ public class Loza implements RaftManager {
* @param raftConfiguration Raft configuration.
* @param dataPath Data path.
* @param clock A hybrid logical clock.
+ * @param executor Executor for raft group services.
*/
public Loza(
ClusterService clusterNetSvc,
RaftConfiguration raftConfiguration,
Path dataPath,
- HybridClock clock
+ HybridClock clock,
+ ScheduledExecutorService executor
) {
this.clusterNetSvc = clusterNetSvc;
this.raftConfiguration = raftConfiguration;
@@ -112,9 +112,32 @@ public class Loza implements RaftManager {
this.raftServer = new JraftServerImpl(clusterNetSvc, dataPath,
options);
- this.executor = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
- new
NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(),
- CLIENT_POOL_NAME), LOG
+ this.executor = executor;
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param clusterNetSvc Cluster network service.
+ * @param raftConfiguration Raft configuration.
+ * @param dataPath Data path.
+ * @param clock A hybrid logical clock.
+ */
+ public Loza(
+ ClusterService clusterNetSvc,
+ RaftConfiguration raftConfiguration,
+ Path dataPath,
+ HybridClock clock
+ ) {
+ this(
+ clusterNetSvc,
+ raftConfiguration,
+ dataPath,
+ clock,
+ new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
+ new
NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(),
+ CLIENT_POOL_NAME), LOG
+ )
)
);
}
@@ -136,8 +159,6 @@ public class Loza implements RaftManager {
busyLock.block();
- IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
-
raftServer.stop();
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java
index cfe26eadc4..a538012a48 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java
@@ -17,12 +17,13 @@
package org.apache.ignite.internal.raft.server.impl;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.network.ClusterNode;
/**
@@ -48,9 +49,13 @@ public class RaftServiceEventListener {
actions = new HashSet<>();
}
- actions.add(notifyAction);
+ var finalActions = actions;
nodesSubscriptions.compute(subscriber, (node, nodeActions) -> {
+ if (!nullOrEmpty(finalActions) && !nullOrEmpty(nodeActions)) {
+ return nodeActions;
+ }
+
if (nodeActions == null) {
nodeActions = new HashSet<>();
}
@@ -60,6 +65,8 @@ public class RaftServiceEventListener {
return nodeActions;
});
+ actions.add(notifyAction);
+
return actions;
});
}
@@ -77,7 +84,7 @@ public class RaftServiceEventListener {
grpNodeActions.retainAll(nodeActions);
- if (CollectionUtils.nullOrEmpty(grpNodeActions)) {
+ if (nullOrEmpty(grpNodeActions)) {
return nodeActions;
}
@@ -88,14 +95,14 @@ public class RaftServiceEventListener {
nodeActions.remove(actionToRemove);
actions.remove(actionToRemove);
- if (CollectionUtils.nullOrEmpty(nodeActions)) {
+ if (nullOrEmpty(nodeActions)) {
return null;
}
return nodeActions;
});
- if (CollectionUtils.nullOrEmpty(actions)) {
+ if (nullOrEmpty(actions)) {
return null;
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index afe6311409..8ac95b3be7 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.app;
+import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_NAME;
+import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_SIZE;
+
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -29,6 +32,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -71,6 +77,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
import org.apache.ignite.internal.metrics.rest.MetricRestFactory;
@@ -108,6 +115,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.VaultService;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -253,6 +261,8 @@ public class IgniteImpl implements Ignite {
private final RestAddressReporter restAddressReporter;
+ private final ScheduledExecutorService raftExecutorService;
+
/**
* The Constructor.
*
@@ -310,11 +320,20 @@ public class IgniteImpl implements Ignite {
clock = new HybridClockImpl();
+ RaftConfiguration raftConfiguration =
nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY);
+
+ raftExecutorService = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
+ new
NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterSvc.localConfiguration().getName(),
+ CLIENT_POOL_NAME), LOG
+ )
+ );
+
raftMgr = new Loza(
clusterSvc,
- nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY),
+ raftConfiguration,
workDir,
- clock
+ clock,
+ raftExecutorService
);
LockManager lockMgr = new HeapLockManager();
@@ -354,7 +373,14 @@ public class IgniteImpl implements Ignite {
new RocksDbKeyValueStorage(name,
workDir.resolve(METASTORAGE_DB_PATH))
);
- placementDriverMgr = new PlacementDriverManager(metaStorageMgr);
+ placementDriverMgr = new PlacementDriverManager(
+ MetastorageGroupId.INSTANCE,
+ clusterSvc,
+ raftConfiguration,
+ cmgMgr::metaStorageNodes,
+ logicalTopologyService,
+ raftExecutorService
+ );
this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr,
vaultMgr);
@@ -675,6 +701,7 @@ public class IgniteImpl implements Ignite {
public void stop() {
lifecycleManager.stopNode();
restAddressReporter.removeReport();
+ IgniteUtils.shutdownAndAwaitTermination(raftExecutorService, 10,
TimeUnit.SECONDS);
}
/** {@inheritDoc} */