This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6a2a2e6a2d IGNITE-18172 Added learners' assignments to the rebalance
algorithm (#1366)
6a2a2e6a2d is described below
commit 6a2a2e6a2dd0772122e32c3858b7ba8b30b70742
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Nov 28 16:04:24 2022 +0300
IGNITE-18172 Added learners' assignments to the rebalance algorithm (#1366)
---
.../ignite/internal/affinity/AffinityUtils.java | 57 +--
.../ignite/internal/affinity/Assignment.java | 96 ++++
.../internal/affinity/AffinityServiceTest.java | 9 +-
.../java/org/apache/ignite/internal/raft/Loza.java | 70 ++-
.../org/apache/ignite/internal/raft/LozaTest.java | 2 +-
.../ignite/internal/replicator/ReplicaManager.java | 13 +-
.../storage/ItRebalanceDistributedTest.java | 10 +-
.../sql/engine/exec/MockedStructuresTest.java | 2 +-
.../distributed/ItTxDistributedTestSingleNode.java | 136 +++--
.../internal/table/distributed/TableManager.java | 561 +++++++++++----------
.../raft/RebalanceRaftGroupEventsListener.java | 102 ++--
.../distributed/replicator/LeaderOrTxState.java | 15 +-
.../replicator/PartitionReplicaListener.java | 9 +-
.../distributed/replicator/PlacementDriver.java | 35 +-
.../ignite/internal/utils/RebalanceUtil.java | 101 +---
.../table/distributed/TableManagerTest.java | 34 +-
.../PartitionReplicaListenerIndexLockingTest.java | 2 -
.../replication/PartitionReplicaListenerTest.java | 7 +-
.../table/impl/DummyInternalTableImpl.java | 1 -
19 files changed, 632 insertions(+), 630 deletions(-)
diff --git
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
index d0c1335bae..ec15cedc02 100644
---
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
+++
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
@@ -17,14 +17,15 @@
package org.apache.ignite.internal.affinity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.function.IntFunction;
import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.NotNull;
/**
* Stateless affinity utils that produces helper methods for an affinity
assignments calculation.
@@ -35,59 +36,31 @@ public class AffinityUtils {
*
* @param partitions Partitions count.
* @param replicas Replicas count.
- * @param aggregator Function that creates a collection for the partition
assignments.
- * @return List nodes by partition.
+ * @return List assignments by partition.
*/
- public static <T extends Collection<ClusterNode>> List<T>
calculateAssignments(
- @NotNull Collection<ClusterNode> baselineNodes,
- int partitions,
- int replicas,
- IntFunction<T> aggregator
- ) {
- return RendezvousAffinityFunction.assignPartitions(
+ public static List<Set<Assignment>>
calculateAssignments(Collection<ClusterNode> baselineNodes, int partitions, int
replicas) {
+ List<Set<ClusterNode>> affinityNodes =
RendezvousAffinityFunction.assignPartitions(
baselineNodes,
partitions,
replicas,
false,
null,
- aggregator
+ HashSet::new
);
- }
- /**
- * Calculates affinity assignments.
- *
- * @param partitions Partitions count.
- * @param replicas Replicas count.
- * @return List nodes by partition.
- */
- public static List<List<ClusterNode>> calculateAssignments(
- @NotNull Collection<ClusterNode> baselineNodes,
- int partitions,
- int replicas
- ) {
- return calculateAssignments(
- baselineNodes,
- partitions,
- replicas,
- ArrayList::new
- );
+ return
affinityNodes.stream().map(AffinityUtils::clusterNodesToAssignments).collect(toList());
}
/**
- * Calculates affinity assignments for single partition.
+ * Calculates affinity assignments for a single partition.
*
* @param baselineNodes Nodes.
* @param partition Partition id.
* @param replicas Replicas count.
- * @return List of nodes.
+ * @return List of assignments.
*/
- public static Set<ClusterNode> calculateAssignmentForPartition(
- Collection<ClusterNode> baselineNodes,
- int partition,
- int replicas
- ) {
- return RendezvousAffinityFunction.assignPartition(
+ public static Set<Assignment>
calculateAssignmentForPartition(Collection<ClusterNode> baselineNodes, int
partition, int replicas) {
+ Set<ClusterNode> affinityNodes =
RendezvousAffinityFunction.assignPartition(
partition,
new ArrayList<>(baselineNodes),
replicas,
@@ -96,5 +69,11 @@ public class AffinityUtils {
null,
HashSet::new
);
+
+ return clusterNodesToAssignments(affinityNodes);
+ }
+
+ private static Set<Assignment>
clusterNodesToAssignments(Collection<ClusterNode> nodes) {
+ return nodes.stream().map(node ->
Assignment.forPeer(node.name())).collect(toSet());
}
}
diff --git
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignment.java
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignment.java
new file mode 100644
index 0000000000..e26ad97066
--- /dev/null
+++
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignment.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Represent an assignment of a partition to a node with a specific {@code
consistentId}.
+ *
+ * <p>There can be two types of assignments: one for the synchronous members
of a replication group (a.k.a. "peers") and one for
+ * the asynchronous members (a.k.a. "learners") of the same group. Peers get
synchronously updated during write operations, while learners
+ * are eventually consistent and received updates some time in the future.
+ */
+public class Assignment implements Serializable {
+ private static final long serialVersionUID = -8892379245627437834L;
+
+ private final String consistentId;
+
+ private final boolean isPeer;
+
+ private Assignment(String consistentId, boolean isPeer) {
+ this.consistentId = consistentId;
+ this.isPeer = isPeer;
+ }
+
+ /**
+ * Creates a peer assignment.
+ *
+ * @param consistentId Peer consistent ID.
+ */
+ public static Assignment forPeer(String consistentId) {
+ return new Assignment(consistentId, true);
+ }
+
+ /**
+ * Creates a learner assignment.
+ *
+ * @param consistentId Learner consistent ID.
+ */
+ public static Assignment forLearner(String consistentId) {
+ return new Assignment(consistentId, false);
+ }
+
+ public String consistentId() {
+ return consistentId;
+ }
+
+ public boolean isPeer() {
+ return isPeer;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Assignment that = (Assignment) o;
+
+ if (isPeer != that.isPeer) {
+ return false;
+ }
+ return consistentId.equals(that.consistentId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = consistentId.hashCode();
+ result = 31 * result + (isPeer ? 1 : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(Assignment.class, this);
+ }
+}
diff --git
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
index d93a3663f9..5310ab1dcc 100644
---
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
+++
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
@@ -34,7 +35,7 @@ import org.junit.jupiter.api.Test;
public class AffinityServiceTest {
@Test
public void testCalculatedAssignmentHappyPath() {
- List<List<ClusterNode>> assignments =
AffinityUtils.calculateAssignments(
+ List<Set<Assignment>> assignments = AffinityUtils.calculateAssignments(
Arrays.asList(
new ClusterNode(
UUID.randomUUID().toString(), "node0",
@@ -51,14 +52,14 @@ public class AffinityServiceTest {
assertEquals(10, assignments.size());
- for (List<ClusterNode> partitionAssignment : assignments) {
+ for (Set<Assignment> partitionAssignment : assignments) {
assertEquals(2, partitionAssignment.size());
}
}
@Test
public void testEmptyBaselineAssignmentsCalculation() {
- List<List<ClusterNode>> assignments =
AffinityUtils.calculateAssignments(
+ List<Set<Assignment>> assignments = AffinityUtils.calculateAssignments(
Collections.emptyList(),
10,
3
@@ -66,7 +67,7 @@ public class AffinityServiceTest {
assertEquals(10, assignments.size());
- for (List<ClusterNode> partitionAssignment : assignments) {
+ for (Set<Assignment> partitionAssignment : assignments) {
assertEquals(0, partitionAssignment.size());
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 3f69e670b5..51e0793c96 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -49,7 +49,6 @@ import
org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.TopologyService;
@@ -170,30 +169,12 @@ public class Loza implements IgniteComponent {
raftServer.stop();
}
- /**
- * Determines whether a RAFT group should be started locally according to
a collection of nodes that should have a RAFT group.
- */
- public boolean shouldHaveRaftGroupLocally(Collection<ClusterNode>
raftNodes) {
- String locNodeName =
clusterNetSvc.topologyService().localMember().name();
-
- return raftNodes.stream().anyMatch(n -> locNodeName.equals(n.name()));
- }
-
- /**
- * Determines whether a RAFT group should be started locally according to
a collection of nodes that should have a RAFT group.
- */
- private boolean shouldHaveRaftGroupLocallyImpl(Collection<String>
raftNodes) {
- String locNodeName =
clusterNetSvc.topologyService().localMember().name();
-
- return raftNodes.stream().anyMatch(locNodeName::equals);
- }
-
/**
* Creates a raft group service providing operations on a raft group. If
{@code nodes} contains the current node, then raft group starts
* on the current node.
*
* @param groupId Raft group id.
- * @param nodeConsistentIds Consistent IDs of Raft group nodes.
+ * @param peerConsistentIds Consistent IDs of Raft peers.
* @param lsnrSupplier Raft group listener supplier.
* @param groupOptions Options to apply to the group.
* @return Future representing pending completion of the operation.
@@ -201,20 +182,20 @@ public class Loza implements IgniteComponent {
*/
public CompletableFuture<RaftGroupService> prepareRaftGroup(
ReplicationGroupId groupId,
- Collection<String> nodeConsistentIds,
+ Collection<String> peerConsistentIds,
Supplier<RaftGroupListener> lsnrSupplier,
RaftGroupOptions groupOptions
) throws NodeStoppingException {
- return prepareRaftGroup(groupId, nodeConsistentIds, List.of(),
lsnrSupplier, () -> noopLsnr, groupOptions);
+ return prepareRaftGroup(groupId, peerConsistentIds, List.of(),
lsnrSupplier, () -> noopLsnr, groupOptions);
}
/**
- * Creates a raft group service providing operations on a raft group. If
{@code nodeConsistentIds} or {@code learnerConsistentIds}
+ * Creates a raft group service providing operations on a raft group. If
{@code peerConsistentIds} or {@code learnerConsistentIds}
* contains the current node, then raft group starts on the current node.
*
* @param groupId Raft group id.
- * @param nodeConsistentIds Consistent IDs of Raft group nodes.
- * @param learnerConsistentIds Consistent IDs of Raft learner nodes.
+ * @param peerConsistentIds Consistent IDs of Raft peers.
+ * @param learnerConsistentIds Consistent IDs of Raft learners.
* @param lsnrSupplier Raft group listener supplier.
* @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
* @param groupOptions Options to apply to the group.
@@ -223,7 +204,7 @@ public class Loza implements IgniteComponent {
*/
public CompletableFuture<RaftGroupService> prepareRaftGroup(
ReplicationGroupId groupId,
- Collection<String> nodeConsistentIds,
+ Collection<String> peerConsistentIds,
Collection<String> learnerConsistentIds,
Supplier<RaftGroupListener> lsnrSupplier,
Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
@@ -235,7 +216,7 @@ public class Loza implements IgniteComponent {
try {
return prepareRaftGroupInternal(
- groupId, nodeConsistentIds, learnerConsistentIds,
lsnrSupplier, raftGrpEvtsLsnrSupplier, groupOptions
+ groupId, peerConsistentIds, learnerConsistentIds,
lsnrSupplier, raftGrpEvtsLsnrSupplier, groupOptions
);
} finally {
busyLock.leaveBusy();
@@ -246,8 +227,8 @@ public class Loza implements IgniteComponent {
* Internal method to a raft group creation.
*
* @param groupId Raft group id.
- * @param nodeConsistentIds Consistent IDs of Raft group nodes.
- * @param learnerConsistentIds Consistent IDs of Raft learner nodes.
+ * @param peerConsistentIds Consistent IDs of Raft peers.
+ * @param learnerConsistentIds Consistent IDs of Raft learners.
* @param lsnrSupplier Raft group listener supplier.
* @param eventsLsnrSupplier Raft group events listener supplier.
* @param groupOptions Options to apply to the group.
@@ -255,16 +236,18 @@ public class Loza implements IgniteComponent {
*/
private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(
ReplicationGroupId groupId,
- Collection<String> nodeConsistentIds,
+ Collection<String> peerConsistentIds,
Collection<String> learnerConsistentIds,
Supplier<RaftGroupListener> lsnrSupplier,
Supplier<RaftGroupEventsListener> eventsLsnrSupplier,
RaftGroupOptions groupOptions
) {
- List<Peer> peers = idsToPeers(nodeConsistentIds);
+ List<Peer> peers = idsToPeers(peerConsistentIds);
List<Peer> learners = idsToPeers(learnerConsistentIds);
- if (shouldHaveRaftGroupLocallyImpl(nodeConsistentIds) ||
shouldHaveRaftGroupLocallyImpl(learnerConsistentIds)) {
+ String locNodeName =
clusterNetSvc.topologyService().localMember().name();
+
+ if (peerConsistentIds.contains(locNodeName) ||
learnerConsistentIds.contains(locNodeName)) {
startRaftGroupNodeInternal(
groupId,
peers,
@@ -282,7 +265,8 @@ public class Loza implements IgniteComponent {
* Start RAFT group on the current node.
*
* @param grpId Raft group id.
- * @param nodeConsistentIds Consistent IDs of Raft group nodes.
+ * @param peerConsistentIds Consistent IDs of Raft peers.
+ * @param learnerConsistentIds Consistent IDs of Raft learners.
* @param lsnr Raft group listener.
* @param eventsLsnr Raft group events listener.
* @param groupOptions Options to apply to the group.
@@ -290,7 +274,8 @@ public class Loza implements IgniteComponent {
*/
public void startRaftGroupNode(
ReplicationGroupId grpId,
- Collection<String> nodeConsistentIds,
+ Collection<String> peerConsistentIds,
+ Collection<String> learnerConsistentIds,
RaftGroupListener lsnr,
RaftGroupEventsListener eventsLsnr,
RaftGroupOptions groupOptions
@@ -300,7 +285,14 @@ public class Loza implements IgniteComponent {
}
try {
- startRaftGroupNodeInternal(grpId, idsToPeers(nodeConsistentIds),
List.of(), lsnr, eventsLsnr, groupOptions);
+ startRaftGroupNodeInternal(
+ grpId,
+ idsToPeers(peerConsistentIds),
+ idsToPeers(learnerConsistentIds),
+ lsnr,
+ eventsLsnr,
+ groupOptions
+ );
} finally {
busyLock.leaveBusy();
}
@@ -310,20 +302,22 @@ public class Loza implements IgniteComponent {
* Creates and starts a raft group service providing operations on a raft
group.
*
* @param grpId RAFT group id.
- * @param nodeConsistentIds Consistent IDs of Raft group nodes.
+ * @param peerConsistentIds Consistent IDs of Raft peers.
+ * @param learnerConsistentIds Consistent IDs of Raft learners.
* @return Future that will be completed with an instance of RAFT group
service.
* @throws NodeStoppingException If node stopping intention was detected.
*/
public CompletableFuture<RaftGroupService> startRaftGroupService(
ReplicationGroupId grpId,
- Collection<String> nodeConsistentIds
+ Collection<String> peerConsistentIds,
+ Collection<String> learnerConsistentIds
) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
try {
- return startRaftGroupServiceInternal(grpId,
idsToPeers(nodeConsistentIds), List.of());
+ return startRaftGroupServiceInternal(grpId,
idsToPeers(peerConsistentIds), idsToPeers(learnerConsistentIds));
} finally {
busyLock.leaveBusy();
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index bbedb9debf..54039ed9b9 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -79,7 +79,7 @@ public class LozaTest extends IgniteAbstractTest {
assertThrows(
NodeStoppingException.class,
- () -> loza.startRaftGroupService(raftGroupId, newNodes)
+ () -> loza.startRaftGroupService(raftGroupId, newNodes,
List.of())
);
assertThrows(NodeStoppingException.class, () ->
loza.stopRaftGroup(raftGroupId));
assertThrows(
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 781ff2b1e5..46849de85c 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.replicator;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
-import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -90,7 +89,7 @@ public class ReplicaManager implements IgniteComponent {
Executors.newScheduledThreadPool(1, new
NamedThreadFactory("scheduled-idle-safe-time-sync-thread", LOG));
/** Set of message groups to handler as replica requests. */
- Set<Class<?>> messageGroupsToHandle;
+ private final Set<Class<?>> messageGroupsToHandle;
/**
* Constructor for a replica service.
@@ -272,16 +271,6 @@ public class ReplicaManager implements IgniteComponent {
assert replicas.isEmpty() : "There are replicas alive [replicas=" +
replicas.keySet() + ']';
}
- /**
- * Determines whether a replication group should be started locally
- * according to a collection of nodes that should have a replication group.
- */
- public boolean shouldHaveReplicationGroupLocally(Collection<ClusterNode>
replicas) {
- String locNodeName =
clusterNetSvc.topologyService().localMember().name();
-
- return replicas.stream().anyMatch(r -> locNodeName.equals(r.name()));
- }
-
/**
* Extract a hybrid timestamp from timestamp aware request or return null.
*/
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 314e340173..b626ab21a2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -38,6 +38,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import
org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.baseline.BaselineManager;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -99,7 +100,6 @@ import org.apache.ignite.internal.util.ReverseIterator;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
@@ -279,7 +279,7 @@ public class ItRebalanceDistributedTest {
.changePartitions(1)));
Set<String> partitionNodesConsistentIds = getPartitionClusterNodes(0,
0).stream()
- .map(ClusterNode::name)
+ .map(Assignment::consistentId)
.collect(Collectors.toSet());
Node newNode = nodes.stream().filter(n ->
!partitionNodesConsistentIds.contains(n.name)).findFirst().orElseThrow();
@@ -391,15 +391,15 @@ public class ItRebalanceDistributedTest {
return nodes.stream().filter(n ->
n.name.equals(consistentId)).findFirst().orElseThrow();
}
- private Set<ClusterNode> getPartitionClusterNodes(int nodeNum, int
partNum) {
+ private Set<Assignment> getPartitionClusterNodes(int nodeNum, int partNum)
{
var table = ((ExtendedTableConfiguration)
nodes.get(nodeNum).clusterCfgMgr.configurationRegistry()
.getConfiguration(TablesConfiguration.KEY).tables().get("TBL1"));
if (table != null) {
- var assignments = table.assignments().value();
+ byte[] assignments = table.assignments().value();
if (assignments != null) {
- return ((List<Set<ClusterNode>>)
ByteUtils.fromBytes(assignments)).get(partNum);
+ return ((List<Set<Assignment>>)
ByteUtils.fromBytes(assignments)).get(partNum);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 0a1b361918..a7c9373999 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -449,7 +449,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
return completedFuture(raftGrpSrvcMock);
});
- when(rm.startRaftGroupService(any(), any())).thenAnswer(mock -> {
+ when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock ->
{
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer("test"));
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index efe23949f4..55fb8352e9 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -18,7 +18,7 @@
package org.apache.ignite.distributed;
import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -40,7 +40,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -94,7 +96,6 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
-import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -128,17 +129,15 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
private ReplicaService clientReplicaSvc;
- protected Map<ClusterNode, HybridClock> clocks;
+ protected Map<String, HybridClock> clocks;
- protected Map<ClusterNode, Loza> raftServers;
+ protected Map<String, Loza> raftServers;
- protected Map<ClusterNode, ReplicaManager> replicaManagers;
+ protected Map<String, ReplicaManager> replicaManagers;
- protected Map<ClusterNode, ReplicaService> replicaServices;
+ protected Map<String, ReplicaService> replicaServices;
- protected Map<ClusterNode, TxManager> txManagers;
-
- protected Map<ClusterNode, TopologyService> topologyServices;
+ protected Map<String, TxManager> txManagers;
protected Int2ObjectOpenHashMap<RaftGroupService> accRaftClients;
@@ -245,7 +244,6 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
// Start raft servers. Each raft server can hold multiple groups.
clocks = new HashMap<>(nodes);
raftServers = new HashMap<>(nodes);
- topologyServices = new HashMap<>(nodes);
replicaManagers = new HashMap<>(nodes);
replicaServices = new HashMap<>(nodes);
txManagers = new HashMap<>(nodes);
@@ -258,7 +256,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
HybridClock clock = new HybridClockImpl();
- clocks.put(node, clock);
+ clocks.put(node.name(), clock);
var raftSrv = new Loza(
cluster.get(i),
@@ -270,9 +268,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
raftSrv.start();
- raftServers.put(node, raftSrv);
-
- topologyServices.put(node, cluster.get(i).topologyService());
+ raftServers.put(node.name(), raftSrv);
ReplicaManager replicaMgr = new ReplicaManager(
cluster.get(i),
@@ -282,7 +278,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
replicaMgr.start();
- replicaManagers.put(node, replicaMgr);
+ replicaManagers.put(node.name(), replicaMgr);
log.info("Replica manager has been started, node=[" + node + ']');
@@ -291,13 +287,13 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
clock
);
- replicaServices.put(node, replicaSvc);
+ replicaServices.put(node.name(), replicaSvc);
TxManagerImpl txMgr = new TxManagerImpl(replicaSvc, new
HeapLockManager(), clock);
txMgr.start();
- txManagers.put(node, txMgr);
+ txManagers.put(node.name(), txMgr);
}
log.info("Raft servers have been started");
@@ -313,19 +309,19 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
log.info("Partition groups have been started");
+ String localNodeName =
accRaftClients.get(0).clusterService().topologyService().localMember().name();
+
TxManager txMgr;
if (startClient()) {
txMgr = new TxManagerImpl(clientReplicaSvc, new HeapLockManager(),
clientClock);
} else {
// Collocated mode.
- txMgr =
txManagers.get(accRaftClients.get(0).clusterService().topologyService().localMember());
+ txMgr = txManagers.get(localNodeName);
}
assertNotNull(txMgr);
- ClusterNode localNode =
accRaftClients.get(0).clusterService().topologyService().localMember();
-
igniteTransactions = new IgniteTransactionsImpl(txMgr);
this.accounts = new TableImpl(new InternalTableImpl(
@@ -337,8 +333,8 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
txMgr,
Mockito.mock(MvTableStorage.class),
Mockito.mock(TxStateTableStorage.class),
- startClient() ? clientReplicaSvc :
replicaServices.get(localNode),
- startClient() ? clientClock : clocks.get(localNode)
+ startClient() ? clientReplicaSvc :
replicaServices.get(localNodeName),
+ startClient() ? clientClock : clocks.get(localNodeName)
), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), txMgr.lockManager());
this.customers = new TableImpl(new InternalTableImpl(
@@ -350,8 +346,8 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
txMgr,
Mockito.mock(MvTableStorage.class),
Mockito.mock(TxStateTableStorage.class),
- startClient() ? clientReplicaSvc :
replicaServices.get(localNode),
- startClient() ? clientClock : clocks.get(localNode)
+ startClient() ? clientReplicaSvc :
replicaServices.get(localNodeName),
+ startClient() ? clientClock : clocks.get(localNodeName)
), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), txMgr.lockManager());
log.info("Tables have been started");
@@ -366,43 +362,36 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
*/
protected Int2ObjectOpenHashMap<RaftGroupService> startTable(String name,
UUID tblId)
throws Exception {
- List<List<ClusterNode>> assignment =
RendezvousAffinityFunction.assignPartitions(
- cluster.stream().map(node ->
node.topologyService().localMember())
- .collect(toList()),
+ List<Set<Assignment>> calculatedAssignments =
AffinityUtils.calculateAssignments(
+ cluster.stream().map(node ->
node.topologyService().localMember()).collect(toList()),
1,
- replicas(),
- false,
- null
+ replicas()
);
- Map<ClusterNode, Function<Peer, Boolean>> isLocalPeerCheckerList =
cluster.stream()
- .map(ClusterService::topologyService)
- .collect(toMap(
- TopologyService::localMember,
- ts -> peer ->
ts.getByConsistentId(peer.consistentId()).equals(ts.localMember())
- ));
+ List<Set<String>> assignments = calculatedAssignments.stream()
+ .map(a ->
a.stream().map(Assignment::consistentId).collect(toSet()))
+ .collect(toList());
+
+ List<TablePartitionId> grpIds = IntStream.range(0, assignments.size())
+ .mapToObj(i -> new TablePartitionId(tblId, i))
+ .collect(toList());
Int2ObjectOpenHashMap<RaftGroupService> clients = new
Int2ObjectOpenHashMap<>();
List<CompletableFuture<Void>> partitionReadyFutures = new
ArrayList<>();
- for (int p = 0; p < assignment.size(); p++) {
- List<ClusterNode> partNodes = assignment.get(p);
-
- TablePartitionId grpId = new TablePartitionId(tblId, p);
+ for (int p = 0; p < assignments.size(); p++) {
+ Set<String> partAssignments = assignments.get(p);
- List<Peer> conf = partNodes.stream().map(n -> new Peer(n.name()))
- .collect(toList());
+ TablePartitionId grpId = grpIds.get(p);
- for (ClusterNode node : partNodes) {
+ for (String assignment : partAssignments) {
var testMpPartStorage = new TestMvPartitionStorage(0);
var txStateStorage = new TestTxStateStorage();
- var placementDriver = new
PlacementDriver(replicaServices.get(node));
+ var placementDriver = new
PlacementDriver(replicaServices.get(assignment), consistentIdToNode);
- for (int part = 0; part < assignment.size(); part++) {
- ReplicationGroupId replicaGrpId = new
TablePartitionId(tblId, part);
-
- placementDriver.updateAssignment(replicaGrpId,
assignment.get(part));
+ for (int part = 0; part < assignments.size(); part++) {
+ placementDriver.updateAssignment(grpIds.get(part),
assignments.get(part));
}
int partId = p;
@@ -422,46 +411,43 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
row2tuple
));
- IndexLocker pkLocker = new HashIndexLocker(indexId, true,
txManagers.get(node).lockManager(), row2tuple);
+ IndexLocker pkLocker = new HashIndexLocker(indexId, true,
txManagers.get(assignment).lockManager(), row2tuple);
- CompletableFuture<Void> partitionReadyFuture =
raftServers.get(node).prepareRaftGroup(
+ CompletableFuture<Void> partitionReadyFuture =
raftServers.get(assignment).prepareRaftGroup(
grpId,
-
partNodes.stream().map(ClusterNode::name).collect(toList()),
- () -> {
- return new PartitionListener(
- new
TestPartitionDataStorage(testMpPartStorage),
- new TestTxStateStorage(),
- txManagers.get(node),
- () -> Map.of(pkStorage.get().id(),
pkStorage.get()),
- partId
- );
- },
+ partAssignments,
+ () -> new PartitionListener(
+ new
TestPartitionDataStorage(testMpPartStorage),
+ new TestTxStateStorage(),
+ txManagers.get(assignment),
+ () -> Map.of(pkStorage.get().id(),
pkStorage.get()),
+ partId
+ ),
RaftGroupOptions.defaults()
).thenAccept(
raftSvc -> {
try {
PendingComparableValuesTracker<HybridTimestamp> safeTime =
- new
PendingComparableValuesTracker<>(clocks.get(node).now());
+ new
PendingComparableValuesTracker<>(clocks.get(assignment).now());
- replicaManagers.get(node).startReplica(
+ replicaManagers.get(assignment).startReplica(
new TablePartitionId(tblId, partId),
new PartitionReplicaListener(
testMpPartStorage,
raftSvc,
- txManagers.get(node),
-
txManagers.get(node).lockManager(),
+ txManagers.get(assignment),
+
txManagers.get(assignment).lockManager(),
Runnable::run,
partId,
tblId,
() -> Map.of(pkLocker.id(),
pkLocker),
pkStorage,
() -> Map.of(),
- clocks.get(node),
+ clocks.get(assignment),
safeTime,
txStateStorage,
- topologyServices.get(node),
placementDriver,
-
isLocalPeerCheckerList.get(node)
+ peer ->
assignment.equals(peer.consistentId())
)
);
} catch (NodeStoppingException e) {
@@ -473,6 +459,8 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
partitionReadyFutures.add(partitionReadyFuture);
}
+ List<Peer> conf =
partAssignments.stream().map(Peer::new).collect(toList());
+
if (startClient()) {
RaftGroupService service = RaftGroupServiceImpl
.start(grpId, client, FACTORY, 10_000, conf, true,
200, executor)
@@ -491,8 +479,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
service.shutdown();
- Loza leaderSrv = raftServers
-
.get(tmpSvc.topologyService().getByConsistentId(leader.consistentId()));
+ Loza leaderSrv = raftServers.get(leader.consistentId());
RaftGroupService leaderClusterSvc = RaftGroupServiceImpl
.start(grpId, leaderSrv.service(), FACTORY,
@@ -518,7 +505,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
assertNotNull(leader);
- return
raftServers.get(svc.clusterService().topologyService().getByConsistentId(leader.consistentId()));
+ return raftServers.get(leader.consistentId());
}
/**
@@ -540,7 +527,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
- for (Entry<ClusterNode, Loza> entry : raftServers.entrySet()) {
+ for (Entry<String, Loza> entry : raftServers.entrySet()) {
Loza rs = entry.getValue();
ReplicaManager replicaMgr = replicaManagers.get(entry.getKey());
@@ -604,8 +591,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
fail("Unknown table " + t.name());
}
- TxManager manager = txManagers
-
.get(clients.get(0).clusterService().topologyService().getByConsistentId(clients.get(0).leader().consistentId()));
+ TxManager manager =
txManagers.get(clients.get(0).leader().consistentId());
assertNotNull(manager);
@@ -617,7 +603,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
protected boolean assertPartitionsSame(TableImpl table, int partId) {
int hash = 0;
- for (Map.Entry<ClusterNode, Loza> entry : raftServers.entrySet()) {
+ for (Map.Entry<String, Loza> entry : raftServers.entrySet()) {
Loza svc = entry.getValue();
JraftServerImpl server = (JraftServerImpl) svc.server();
org.apache.ignite.raft.jraft.RaftGroupService grp =
server.raftGroupService(new TablePartitionId(table.tableId(), partId));
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index fecb331b4e..1eee73eab6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -21,7 +21,6 @@ import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
import static
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -41,7 +40,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -66,11 +65,13 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
+import java.util.stream.Stream;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.ConfigurationProperty;
import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.causality.VersionedValue;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
@@ -110,7 +111,6 @@ import
org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
-import
org.apache.ignite.internal.table.distributed.message.HasDataRequestBuilder;
import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
@@ -159,7 +159,6 @@ import
org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
import org.apache.ignite.table.Table;
-import org.apache.ignite.table.manager.IgniteTables;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -167,8 +166,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Table manager.
*/
-public class TableManager extends Producer<TableEvent, TableEventParameters>
implements IgniteTables, IgniteTablesInternal,
- IgniteComponent {
+public class TableManager extends Producer<TableEvent, TableEventParameters>
implements IgniteTablesInternal, IgniteComponent {
/**
* The special value of the last applied index to indicate the beginning
of a full data rebalancing.
*
@@ -274,8 +272,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
/** Scan request executor. */
private final ExecutorService scanRequestExecutor;
- /** Separate executor for IO operations like partition storage
initialization
- * or partition raft group meta data persisting.
+ /**
+ * Separate executor for IO operations like partition storage
initialization or partition raft group meta data persisting.
*/
private final ExecutorService ioExecutor;
@@ -348,10 +346,10 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
this.clock = clock;
this.outgoingSnapshotsManager = outgoingSnapshotsManager;
- placementDriver = new PlacementDriver(replicaSvc);
-
clusterNodeResolver = topologyService::getByConsistentId;
+ placementDriver = new PlacementDriver(replicaSvc, clusterNodeResolver);
+
tablesByIdVv = new VersionedValue<>(null, HashMap::new);
registry.accept(token -> {
@@ -655,9 +653,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
long causalityToken = assignmentsCtx.storageRevision();
- List<Set<ClusterNode>> oldAssignments = assignmentsCtx.oldValue() ==
null ? null : ByteUtils.fromBytes(assignmentsCtx.oldValue());
+ List<Set<Assignment>> oldAssignments = assignmentsCtx.oldValue() ==
null ? null : ByteUtils.fromBytes(assignmentsCtx.oldValue());
- List<Set<ClusterNode>> newAssignments =
ByteUtils.fromBytes(assignmentsCtx.newValue());
+ List<Set<Assignment>> newAssignments =
ByteUtils.fromBytes(assignmentsCtx.newValue());
// Empty assignments might be a valid case if tables are created from
within cluster init HOCON
// configuration, which is not supported now.
@@ -670,12 +668,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
futures[i] = new CompletableFuture<>();
}
- // TODO: IGNITE-16288 directAssignments should use async configuration
API
- CompletableFuture<List<Set<ClusterNode>>> assignmentsLatestFut =
CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, () ->
- directAssignments(tblCfg)));
-
- TopologyService topologyService = raftMgr.topologyService();
- ClusterNode localMember = topologyService.localMember();
+ String localMemberName =
raftMgr.topologyService().localMember().name();
// Create new raft nodes according to new assignments.
tablesByIdVv.update(causalityToken, (tablesById, e) -> {
@@ -689,122 +682,130 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
for (int i = 0; i < partitions; i++) {
int partId = i;
- Set<ClusterNode> oldPartAssignment = oldAssignments == null ?
Collections.emptySet() :
- oldAssignments.get(partId);
-
- Set<ClusterNode> newPartAssignment =
newAssignments.get(partId);
-
- List<String> newPartAssignmentIds =
newPartAssignment.stream().map(ClusterNode::name).collect(toList());
+ Set<Assignment> oldPartAssignment = oldAssignments == null ?
Set.of() : oldAssignments.get(partId);
+ Set<Assignment> newPartAssignment = newAssignments.get(partId);
TableImpl table = tablesById.get(tblId);
InternalTable internalTbl = table.internalTable();
- MvTableStorage storage = internalTbl.storage();
- boolean isInMemory = storage.isVolatile();
+ Set<String> newPeers = new HashSet<>();
+ Set<String> newLearners = new HashSet<>();
+ // Temporary variable for "localMemberAssignment" to be
effectively final.
+ Assignment t = null;
- // start new nodes, only if it is table creation
- // other cases will be covered by rebalance logic
- Set<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ?
newPartAssignment : Collections.emptySet();
+ for (Assignment assignment : newPartAssignment) {
+ String consistentId = assignment.consistentId();
- TablePartitionId replicaGrpId = new TablePartitionId(tblId,
partId);
+ if (localMemberName.equals(consistentId)) {
+ t = assignment;
+ }
- placementDriver.updateAssignment(replicaGrpId, nodes);
+ if (assignment.isPeer()) {
+ newPeers.add(consistentId);
+ } else {
+ newLearners.add(consistentId);
+ }
+ }
- CompletableFuture<Void> startGroupFut = completedFuture(null);
+ Assignment localMemberAssignment = t;
- PendingComparableValuesTracker<HybridTimestamp> safeTime = new
PendingComparableValuesTracker<>(clock.now());
+ TablePartitionId replicaGrpId = new TablePartitionId(tblId,
partId);
- if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
- startGroupFut =
getOrCreateMvPartition(internalTbl.storage(), partId)
- .thenComposeAsync(mvPartitionStorage ->
assignmentsLatestFut.thenCompose(assignmentsLatest -> {
- boolean hasData =
mvPartitionStorage.lastAppliedIndex() > 0;
+ placementDriver.updateAssignment(replicaGrpId, newPeers);
- CompletableFuture<Boolean> fut;
+ PendingComparableValuesTracker<HybridTimestamp> safeTime = new
PendingComparableValuesTracker<>(clock.now());
- if (isInMemory || !hasData) {
- Set<ClusterNode> partAssignments =
assignmentsLatest.get(partId);
+ CompletableFuture<Void> startGroupFut;
- fut = queryDataNodesCount(tblId, partId,
partAssignments).thenApply(dataNodesCount -> {
- boolean fullPartitionRestart =
dataNodesCount == 0;
+ // start new nodes, only if it is table creation, other cases
will be covered by rebalance logic
+ if (oldPartAssignment.isEmpty() && localMemberAssignment !=
null) {
+ startGroupFut =
getOrCreateMvPartition(internalTbl.storage(),
partId).thenComposeAsync(mvPartitionStorage -> {
+ boolean hasData =
mvPartitionStorage.lastAppliedIndex() > 0;
- if (fullPartitionRestart) {
- return true;
- }
+ CompletableFuture<Boolean> fut;
- boolean majorityAvailable =
dataNodesCount >= (partAssignments.size() / 2) + 1;
+ // If Raft is running in in-memory mode or the PDS has
been cleared, we need to remove the current node
+ // from the Raft group in order to avoid the double
vote problem.
+ // See
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
+ if (internalTbl.storage().isVolatile() || !hasData) {
+ fut = queryDataNodesCount(tblId, partId,
newPeers).thenApply(dataNodesCount -> {
+ boolean fullPartitionRestart = dataNodesCount
== 0;
- if (majorityAvailable) {
-
RebalanceUtil.startPeerRemoval(replicaGrpId, localMember, metaStorageMgr);
+ if (fullPartitionRestart) {
+ return true;
+ }
- return false;
- } else {
- // No majority and not a full
partition restart - need to restart nodes
- // with current partition.
- String msg = "Unable to start
partition " + partId + ". Majority not available.";
+ boolean majorityAvailable = dataNodesCount >=
(newPeers.size() / 2) + 1;
- throw new
IgniteInternalException(msg);
- }
- });
+ if (majorityAvailable) {
+
RebalanceUtil.startPeerRemoval(replicaGrpId, localMemberAssignment,
metaStorageMgr);
+
+ return false;
} else {
- fut = completedFuture(true);
+ // No majority and not a full partition
restart - need to restart nodes
+ // with current partition.
+ String msg = "Unable to start partition "
+ partId + ". Majority not available.";
+
+ throw new IgniteInternalException(msg);
}
+ });
+ } else {
+ fut = completedFuture(true);
+ }
- return fut.thenCompose(startGroup -> {
- if (!startGroup) {
- return completedFuture(null);
- }
+ return fut.thenCompose(startGroup -> {
+ if (!startGroup) {
+ return completedFuture(null);
+ }
- return CompletableFuture.supplyAsync(
- () ->
getOrCreateTxStatePartitionStorage(internalTbl.txStateStorage(), partId),
- ioExecutor
- )
-
.thenComposeAsync(txStatePartitionStorage -> {
- RaftGroupOptions groupOptions
= groupOptionsForPartition(
- internalTbl.storage(),
-
internalTbl.txStateStorage(),
-
partitionKey(internalTbl, partId),
- newPartAssignment,
- safeTime
- );
-
- try {
- raftMgr.startRaftGroupNode(
+ return
getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId)
+ .thenAcceptAsync(txStatePartitionStorage
-> {
+ RaftGroupOptions groupOptions =
groupOptionsForPartition(
+ internalTbl.storage(),
+ internalTbl.txStateStorage(),
+ partitionKey(internalTbl,
partId),
+ safeTime
+ );
+
+ try {
+ raftMgr.startRaftGroupNode(
+ replicaGrpId,
+ newPeers,
+ newLearners,
+ new PartitionListener(
+
partitionDataStorage(mvPartitionStorage, internalTbl, partId),
+
txStatePartitionStorage,
+ txManager,
+
table.indexStorageAdapters(partId),
+ partId
+ ),
+ new
RebalanceRaftGroupEventsListener(
+ metaStorageMgr,
+
tablesCfg.tables().get(table.name()),
replicaGrpId,
-
newPartAssignmentIds,
- new
PartitionListener(
-
partitionDataStorage(mvPartitionStorage, internalTbl, partId),
-
txStatePartitionStorage,
- txManager,
-
table.indexStorageAdapters(partId),
- partId
- ),
- new
RebalanceRaftGroupEventsListener(
-
metaStorageMgr,
-
tablesCfg.tables().get(tablesById.get(tblId).name()),
-
replicaGrpId,
- partId,
- busyLock,
-
createPartitionMover(internalTbl, partId),
-
this::calculateAssignments,
-
rebalanceScheduler
- ),
- groupOptions
- );
-
- return
completedFuture(null);
- } catch (NodeStoppingException
ex) {
- return failedFuture(ex);
- }
- }, ioExecutor);
- });
- }), ioExecutor);
+ partId,
+ busyLock,
+
createPartitionMover(internalTbl, partId),
+
this::calculateAssignments,
+ rebalanceScheduler
+ ),
+ groupOptions
+ );
+ } catch (NodeStoppingException ex) {
+ throw new CompletionException(ex);
+ }
+ }, ioExecutor);
+ });
+ }, ioExecutor);
+ } else {
+ startGroupFut = completedFuture(null);
}
startGroupFut
- .thenComposeAsync((v) -> {
+ .thenComposeAsync(v -> {
try {
- return
raftMgr.startRaftGroupService(replicaGrpId, newPartAssignmentIds);
+ return
raftMgr.startRaftGroupService(replicaGrpId, newPeers, newLearners);
} catch (NodeStoppingException ex) {
return failedFuture(ex);
}
@@ -812,53 +813,47 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
.thenCompose(updatedRaftGroupService -> {
((InternalTableImpl)
internalTbl).updateInternalTableRaftGroupService(partId,
updatedRaftGroupService);
- if
(replicaMgr.shouldHaveReplicationGroupLocally(nodes)) {
- return
getOrCreateMvPartition(internalTbl.storage(), partId)
- .thenCombine(
- CompletableFuture.supplyAsync(
- () ->
getOrCreateTxStatePartitionStorage(internalTbl.txStateStorage(), partId),
- ioExecutor
- ),
- (mvPartitionStorage,
txStatePartitionStorage) -> {
- try {
-
replicaMgr.startReplica(replicaGrpId,
- new
PartitionReplicaListener(
-
mvPartitionStorage,
-
updatedRaftGroupService,
-
txManager,
-
lockMgr,
-
scanRequestExecutor,
- partId,
- tblId,
-
table.indexesLockers(partId),
- new
Lazy<>(() -> table.indexStorageAdapters(partId)
-
.get().get(table.pkId())),
- () ->
table.indexStorageAdapters(partId).get(),
- clock,
-
safeTime,
-
txStatePartitionStorage,
-
topologyService,
-
placementDriver,
-
this::isLocalPeer
- )
- );
- } catch
(NodeStoppingException ex) {
- throw new
AssertionError("Loza was stopped before Table manager", ex);
- }
-
- return null;
- });
- } else {
+ if (localMemberAssignment == null) {
return completedFuture(null);
}
- })
- .exceptionally(th -> {
- LOG.warn("Unable to update raft groups on the
node", th);
- return null;
+ CompletableFuture<MvPartitionStorage>
partitionStorageFuture =
+
getOrCreateMvPartition(internalTbl.storage(), partId);
+
+ CompletableFuture<TxStateStorage>
txStateStorageFuture =
+
getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId);
+
+ return
partitionStorageFuture.thenAcceptBoth(txStateStorageFuture, (partitionStorage,
txStateStorage) -> {
+ try {
+ replicaMgr.startReplica(replicaGrpId,
+ new PartitionReplicaListener(
+ partitionStorage,
+ updatedRaftGroupService,
+ txManager,
+ lockMgr,
+ scanRequestExecutor,
+ partId,
+ tblId,
+
table.indexesLockers(partId),
+ new Lazy<>(() ->
table.indexStorageAdapters(partId).get().get(table.pkId())),
+ () ->
table.indexStorageAdapters(partId).get(),
+ clock,
+ safeTime,
+ txStateStorage,
+ placementDriver,
+ this::isLocalPeer
+ )
+ );
+ } catch (NodeStoppingException ex) {
+ throw new AssertionError("Loza was stopped
before Table manager", ex);
+ }
+ });
})
.whenComplete((res, ex) -> {
- // Only successful completion is possible here due
to .exceptionally() just above.
+ if (ex != null) {
+ LOG.warn("Unable to update raft groups on the
node", ex);
+ }
+
futures[partId].complete(null);
});
}
@@ -888,22 +883,25 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*
* @param tblId Table id.
* @param partId Partition id.
- * @param partAssignments Partition assignments.
+ * @param peerNames Consistent IDs of Raft peers.
* @return A future that will hold the quantity of data nodes.
*/
- private CompletableFuture<Long> queryDataNodesCount(UUID tblId, int
partId, Set<ClusterNode> partAssignments) {
- HasDataRequestBuilder requestBuilder =
TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId);
+ private CompletableFuture<Long> queryDataNodesCount(UUID tblId, int
partId, Collection<String> peerNames) {
+ HasDataRequest request =
TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build();
//noinspection unchecked
- CompletableFuture<Boolean>[] requestFutures =
partAssignments.stream().map(node -> {
- HasDataRequest request = requestBuilder.build();
-
- return raftMgr.messagingService().invoke(node, request,
QUERY_DATA_NODES_COUNT_TIMEOUT).thenApply(response -> {
- assert response instanceof HasDataResponse : response;
-
- return ((HasDataResponse) response).result();
- }).exceptionally(unused -> false);
- }).toArray(CompletableFuture[]::new);
+ CompletableFuture<Boolean>[] requestFutures = peerNames.stream()
+ .map(clusterNodeResolver)
+ .filter(Objects::nonNull)
+ .map(node -> raftMgr.messagingService()
+ .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT)
+ .thenApply(response -> {
+ assert response instanceof HasDataResponse :
response;
+
+ return ((HasDataResponse) response).result();
+ })
+ .exceptionally(unused -> false))
+ .toArray(CompletableFuture[]::new);
return allOf(requestFutures)
.thenApply(unused ->
Arrays.stream(requestFutures).filter(CompletableFuture::join).count());
@@ -913,7 +911,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
MvTableStorage mvTableStorage,
TxStateTableStorage txStateTableStorage,
PartitionKey partitionKey,
- Set<ClusterNode> peers,
PendingComparableValuesTracker<HybridTimestamp> safeTime
) {
RaftGroupOptions raftGroupOptions;
@@ -1065,7 +1062,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* Creates local structures for a table.
*
* @param causalityToken Causality token.
- * @param name Table name.
+ * @param name Table name.
* @param tblId Table id.
* @param partitions Count of partitions.
* @return Future that will be completed when local changes related to the
table creation are applied.
@@ -1082,7 +1079,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
partitions, clusterNodeResolver, txManager, tableStorage,
txStateStorage, replicaSvc, clock);
// TODO: IGNITE-16288 directIndexIds should use async configuration API
- var table = new TableImpl(internalTable, lockMgr, () ->
CompletableFuture.supplyAsync(() -> directIndexIds()));
+ var table = new TableImpl(internalTable, lockMgr, () ->
CompletableFuture.supplyAsync(() -> directIndexIds()));
tablesByIdVv.update(causalityToken, (previous, e) ->
inBusyLock(busyLock, () -> {
if (e != null) {
@@ -1177,9 +1174,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* Drops local structures for a table.
*
* @param causalityToken Causality token.
- * @param name Table name.
- * @param tblId Table id.
- * @param assignment Affinity assignment.
+ * @param name Table name.
+ * @param tblId Table id.
+ * @param assignment Affinity assignment.
*/
private void dropTableLocally(long causalityToken, String name, UUID
tblId, List<Set<ClusterNode>> assignment) {
try {
@@ -1208,7 +1205,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
TableImpl table = tablesByIdVv.latest().get(tblId);
assert table != null : IgniteStringFormatter.format("There is no
table with the name specified [name={}, id={}]",
- name, tblId);
+ name, tblId);
CompletableFuture<Void> destroyMvStorageFuture =
table.internalTable().storage().destroy();
@@ -1225,15 +1222,15 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
}
- private Set<ClusterNode> calculateAssignments(TableConfiguration tableCfg,
int partNum) {
+ private Set<Assignment> calculateAssignments(TableConfiguration tableCfg,
int partNum) {
return
AffinityUtils.calculateAssignmentForPartition(baselineMgr.nodes(), partNum,
tableCfg.value().replicas());
}
/**
- * Creates a new table with the given {@code name} asynchronously. If a
table with the same name already exists,
- * a future will be completed with {@link TableAlreadyExistsException}.
+ * Creates a new table with the given {@code name} asynchronously. If a
table with the same name already exists, a future will be
+ * completed with {@link TableAlreadyExistsException}.
*
- * @param name Table name.
+ * @param name Table name.
* @param tableInitChange Table changer.
* @return Future representing pending completion of the operation.
* @throws IgniteException If an unspecified platform exception has
happened internally. Is thrown when:
@@ -1288,8 +1285,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
baselineMgr.nodes(),
tableChange.partitions(),
- tableChange.replicas(),
- HashSet::new)));
+ tableChange.replicas())));
});
})).exceptionally(t -> {
Throwable ex = getRootCause(t);
@@ -1407,8 +1403,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
/**
- * Drops a table with the name specified. If appropriate table does not be
found, a future will be
- * completed with {@link TableNotFoundException}.
+ * Drops a table with the name specified. If appropriate table does not be
found, a future will be completed with
+ * {@link TableNotFoundException}.
*
* @param name Table name.
* @return Future representing pending completion of the operation.
@@ -1448,8 +1444,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
tblChg.delete(name);
- }
- ).changeIndexes(idxChg -> {
+ }).changeIndexes(idxChg -> {
List<String> indicesNames =
tablesCfg.indexes().value().namedListKeys();
indicesNames.stream().filter(idx ->
@@ -1528,16 +1523,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}));
}
- /**
- * Returns assignments nodes.
- *
- * @param tblCfg Table configuration.
- * @return Set assignments nodes.
- */
- private List<Set<ClusterNode>>
directAssignments(ExtendedTableConfiguration tblCfg) {
- return ByteUtils.fromBytes(directProxy(tblCfg.assignments()).value());
- }
-
/**
* Collects a list of direct table ids.
*
@@ -1654,7 +1639,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*
* @param name Table name.
* @return Future representing pending completion of the {@code
TableManager#tableAsyncInternal} operation.
- * */
+ */
public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
@@ -1803,95 +1788,95 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
TablePartitionId replicaGrpId = new
TablePartitionId(tblId, partId);
- // Assignments of the pending rebalance that we received
through the meta storage watch mechanism.
- Set<ClusterNode> newPeers =
ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
-
- var pendingAssignments =
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)).join();
+ Entry pendingAssignmentsEntry =
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)).join();
- assert pendingAssignmentsWatchEvent.revision() <=
pendingAssignments.revision()
+ assert pendingAssignmentsWatchEvent.revision() <=
pendingAssignmentsEntry.revision()
: "Meta Storage watch cannot notify about an event
with the revision that is more than the actual revision.";
+ // Assignments of the pending rebalance that we received
through the meta storage watch mechanism.
+ Set<Assignment> pendingAssignments =
ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
+
TableImpl tbl = tablesByIdVv.latest().get(tblId);
ExtendedTableConfiguration tblCfg =
(ExtendedTableConfiguration) tablesCfg.tables().get(tbl.name());
// Stable assignments from the meta store, which revision
is bounded by the current pending event.
- byte[] stableAssignments =
metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId),
+ byte[] stableAssignmentsBytes =
metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId),
pendingAssignmentsWatchEvent.revision()).join().value();
- Set<ClusterNode> assignments = stableAssignments == null
+ Set<Assignment> stableAssignments = stableAssignmentsBytes
== null
// This is for the case when the first rebalance
occurs.
- ? ((List<Set<ClusterNode>>)
ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
- : ByteUtils.fromBytes(stableAssignments);
+ ? ((List<Set<Assignment>>)
ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
+ : ByteUtils.fromBytes(stableAssignmentsBytes);
+
+ List<String> stablePeers = new ArrayList<>();
+ List<String> stableLearners = new ArrayList<>();
- placementDriver.updateAssignment(replicaGrpId,
assignments);
+ for (Assignment assignment : stableAssignments) {
+ if (assignment.isPeer()) {
+ stablePeers.add(assignment.consistentId());
+ } else {
+ stableLearners.add(assignment.consistentId());
+ }
+ }
+
+ placementDriver.updateAssignment(replicaGrpId,
stablePeers);
ClusterNode localMember =
raftMgr.topologyService().localMember();
- List<ClusterNode> deltaPeers = newPeers.stream()
- .filter(p -> !assignments.contains(p))
- .collect(toList());
+ // Start a new Raft node and Replica if this node has
appeared in the new assignments.
+ boolean shouldStartLocalServices =
pendingAssignments.stream()
+ .filter(assignment ->
localMember.name().equals(assignment.consistentId()))
+ .anyMatch(assignment ->
!stableAssignments.contains(assignment));
PendingComparableValuesTracker<HybridTimestamp> safeTime =
new PendingComparableValuesTracker<>(clock.now());
InternalTable internalTable = tbl.internalTable();
- try {
- LOG.info("Received update on pending assignments.
Check if new raft group should be started"
- + " [key={}, partition={}, table={},
localMemberAddress={}]",
- pendingAssignmentsWatchEvent.key(), partId,
tbl.name(), localMember.address());
+ LOG.info("Received update on pending assignments. Check if
new raft group should be started"
+ + " [key={}, partition={}, table={},
localMemberAddress={}]",
+ pendingAssignmentsWatchEvent.key(), partId,
tbl.name(), localMember.address());
- if (raftMgr.shouldHaveRaftGroupLocally(deltaPeers)) {
- MvPartitionStorage mvPartitionStorage =
getOrCreateMvPartition(internalTable.storage(), partId).join();
+ if (shouldStartLocalServices) {
+ MvPartitionStorage mvPartitionStorage =
getOrCreateMvPartition(internalTable.storage(), partId).join();
- TxStateStorage txStatePartitionStorage =
getOrCreateTxStatePartitionStorage(
- internalTable.txStateStorage(),
- partId
- );
+ TxStateStorage txStatePartitionStorage =
getOrCreateTxStateStorage(internalTable.txStateStorage(), partId);
- RaftGroupOptions groupOptions =
groupOptionsForPartition(
- internalTable.storage(),
- internalTable.txStateStorage(),
- partitionKey(internalTable, partId),
- assignments,
- safeTime
- );
+ RaftGroupOptions groupOptions =
groupOptionsForPartition(
+ internalTable.storage(),
+ internalTable.txStateStorage(),
+ partitionKey(internalTable, partId),
+ safeTime
+ );
- RaftGroupListener raftGrpLsnr = new
PartitionListener(
- partitionDataStorage(mvPartitionStorage,
internalTable, partId),
- txStatePartitionStorage,
- txManager,
- tbl.indexStorageAdapters(partId),
- partId
- );
+ RaftGroupListener raftGrpLsnr = new PartitionListener(
+ partitionDataStorage(mvPartitionStorage,
internalTable, partId),
+ txStatePartitionStorage,
+ txManager,
+ tbl.indexStorageAdapters(partId),
+ partId
+ );
- RaftGroupEventsListener raftGrpEvtsLsnr = new
RebalanceRaftGroupEventsListener(
- metaStorageMgr,
- tblCfg,
- replicaGrpId,
- partId,
- busyLock,
- createPartitionMover(internalTable,
partId),
- TableManager.this::calculateAssignments,
- rebalanceScheduler
- );
+ RaftGroupEventsListener raftGrpEvtsLsnr = new
RebalanceRaftGroupEventsListener(
+ metaStorageMgr,
+ tblCfg,
+ replicaGrpId,
+ partId,
+ busyLock,
+ createPartitionMover(internalTable, partId),
+ TableManager.this::calculateAssignments,
+ rebalanceScheduler
+ );
+ try {
raftMgr.startRaftGroupNode(
replicaGrpId,
-
assignments.stream().map(ClusterNode::name).collect(toList()),
+ stablePeers,
+ stableLearners,
raftGrpLsnr,
raftGrpEvtsLsnr,
groupOptions
);
- }
-
- if
(replicaMgr.shouldHaveReplicationGroupLocally(deltaPeers)) {
- MvPartitionStorage mvPartitionStorage =
getOrCreateMvPartition(internalTable.storage(), partId).join();
-
- TxStateStorage txStatePartitionStorage =
getOrCreateTxStatePartitionStorage(
- internalTable.txStateStorage(),
- partId
- );
replicaMgr.startReplica(replicaGrpId,
new PartitionReplicaListener(
@@ -1908,35 +1893,43 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
clock,
safeTime,
txStatePartitionStorage,
- raftMgr.topologyService(),
placementDriver,
- peer -> isLocalPeer(peer)
+ TableManager.this::isLocalPeer
)
);
+ } catch (NodeStoppingException e) {
+ // no-op
}
- } catch (NodeStoppingException e) {
- // no-op
}
// Do not change peers of the raft group if this is a
stale event.
// Note that we start raft node before for the sake of the
consistency in a starting and stopping raft nodes.
- if (pendingAssignmentsWatchEvent.revision() <
pendingAssignments.revision()) {
+ if (pendingAssignmentsWatchEvent.revision() <
pendingAssignmentsEntry.revision()) {
return true;
}
- List<Peer> newNodes = newPeers.stream().map(n -> new
Peer(n.name())).collect(toList());
-
RaftGroupService partGrpSvc =
internalTable.partitionRaftGroupService(partId);
LeaderWithTerm leaderWithTerm =
partGrpSvc.refreshAndGetLeaderWithTerm().join();
// run update of raft configuration if this node is a
leader
- if
(localMember.name().equals(leaderWithTerm.leader().consistentId())) {
+ if (isLocalPeer(leaderWithTerm.leader())) {
+ List<Peer> newPeers = new ArrayList<>();
+ List<Peer> newLearners = new ArrayList<>();
+
+ for (Assignment assignment : pendingAssignments) {
+ if (assignment.isPeer()) {
+ newPeers.add(new
Peer(assignment.consistentId()));
+ } else {
+ newLearners.add(new
Peer(assignment.consistentId()));
+ }
+ }
+
LOG.info("Current node={} is the leader of partition
raft group={}. "
+ "Initiate rebalance process for
partition={}, table={}",
localMember.address(), replicaGrpId, partId,
tbl.name());
- // TODO: Provide learners during rebalance, see
https://issues.apache.org/jira/browse/IGNITE-18172
- partGrpSvc.changePeersAsync(newNodes, List.of(),
leaderWithTerm.term()).join();
+
+ partGrpSvc.changePeersAsync(newPeers, newLearners,
leaderWithTerm.term()).join();
}
return true;
@@ -1972,25 +1965,28 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
TablePartitionId replicaGrpId = new
TablePartitionId(tblId, part);
- Set<ClusterNode> stableAssignments =
ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
+ Set<Assignment> stableAssignments =
ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
byte[] pendingFromMetastorage =
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId),
stableAssignmentsWatchEvent.revision()).join().value();
- Set<ClusterNode> pendingAssignments =
pendingFromMetastorage == null
- ? Collections.emptySet()
+ Set<Assignment> pendingAssignments =
pendingFromMetastorage == null
+ ? Set.of()
: ByteUtils.fromBytes(pendingFromMetastorage);
- try {
- ClusterNode localMember =
raftMgr.topologyService().localMember();
+ String localMemberName =
raftMgr.topologyService().localMember().name();
+
+ boolean shouldStopLocalServices =
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
+ .noneMatch(assignment ->
assignment.consistentId().equals(localMemberName));
- if (!stableAssignments.contains(localMember) &&
!pendingAssignments.contains(localMember)) {
+ if (shouldStopLocalServices) {
+ try {
raftMgr.stopRaftGroup(replicaGrpId);
replicaMgr.stopReplica(new TablePartitionId(tblId,
part));
+ } catch (NodeStoppingException e) {
+ // no-op
}
- } catch (NodeStoppingException e) {
- // no-op
}
return true;
@@ -2062,21 +2058,22 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* in when the rebalance was interrupted.
*
* @param mvTableStorage Multi-versioned table storage.
- * @param partitioId Partition ID.
+ * @param partitionId Partition ID.
* @return Future that will complete when the operation completes.
*/
- private static CompletableFuture<MvPartitionStorage>
getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitioId) {
- MvPartitionStorage mvPartitionStorage =
mvTableStorage.getOrCreateMvPartition(partitioId);
-
- // If a full rebalance did not happen, then we return the storage as
is.
- if (mvPartitionStorage.persistedIndex() != FULL_RABALANCING_STARTED) {
- return completedFuture(mvPartitionStorage);
- }
-
- // A full rebalance was started but not completed, so the partition
must be recreated to remove the garbage.
- return mvTableStorage
- .destroyPartition(partitioId)
- .thenApply(unused ->
mvTableStorage.getOrCreateMvPartition(partitioId));
+ private CompletableFuture<MvPartitionStorage>
getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitionId) {
+ return CompletableFuture.supplyAsync(() ->
mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor)
+ .thenCompose(storage -> {
+ if (storage.persistedIndex() != FULL_RABALANCING_STARTED) {
+ // If a full rebalance did not happen, then we return
the storage as is.
+ return completedFuture(storage);
+ } else {
+ // A full rebalance was started but not completed, so
the partition must be recreated to remove the garbage.
+ return mvTableStorage
+ .destroyPartition(partitionId)
+ .thenApplyAsync(unused ->
mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor);
+ }
+ });
}
/**
@@ -2088,10 +2085,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* @param txStateTableStorage Transaction state storage for a table.
* @param partId Partition ID.
*/
- private static TxStateStorage getOrCreateTxStatePartitionStorage(
- TxStateTableStorage txStateTableStorage,
- int partId
- ) {
+ private static TxStateStorage
getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int partId) {
TxStateStorage txStatePartitionStorage =
txStateTableStorage.getOrCreateTxStateStorage(partId);
// If a full rebalance did not happen, then we return the storage as
is.
@@ -2103,4 +2097,11 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return txStateTableStorage.getOrCreateTxStateStorage(partId);
}
+
+ /**
+ * Async version of {@link #getOrCreateTxStateStorage}.
+ */
+ private CompletableFuture<TxStateStorage>
getOrCreateTxStateStorageAsync(TxStateTableStorage txStateTableStorage, int
partId) {
+ return CompletableFuture.supplyAsync(() ->
getOrCreateTxStateStorage(txStateTableStorage, partId), ioExecutor);
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index e7eecc8724..79c5be150f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -26,8 +26,6 @@ import static
org.apache.ignite.internal.metastorage.client.Operations.remove;
import static org.apache.ignite.internal.utils.RebalanceUtil.intersect;
import static
org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.utils.RebalanceUtil.plannedPartAssignmentsKey;
-import static org.apache.ignite.internal.utils.RebalanceUtil.readClusterNodes;
-import static
org.apache.ignite.internal.utils.RebalanceUtil.resolveClusterNodes;
import static
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.utils.RebalanceUtil.subtract;
import static org.apache.ignite.internal.utils.RebalanceUtil.switchAppendKey;
@@ -46,6 +44,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -62,7 +63,6 @@ import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.entity.PeerId;
@@ -80,7 +80,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
private static final int REBALANCE_RETRY_THRESHOLD = 10;
/** Delay between unsuccessful trial of a rebalance and a new trial, ms. */
- public static final int REBALANCE_RETRY_DELAY_MS = 200;
+ private static final int REBALANCE_RETRY_DELAY_MS = 200;
/** Success code for the MetaStorage switch append assignments change. */
private static final int SWITCH_APPEND_SUCCESS = 1;
@@ -131,7 +131,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
private final AtomicInteger rebalanceAttempts = new AtomicInteger(0);
/** Function that calculates assignments for table's partition. */
- private final BiFunction<TableConfiguration, Integer, Set<ClusterNode>>
calculateAssignmentsFn;
+ private final BiFunction<TableConfiguration, Integer, Set<Assignment>>
calculateAssignmentsFn;
/**
* Constructs new listener.
@@ -152,7 +152,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
int partNum,
IgniteSpinBusyLock busyLock,
PartitionMover partitionMover,
- BiFunction<TableConfiguration, Integer, Set<ClusterNode>>
calculateAssignmentsFn,
+ BiFunction<TableConfiguration, Integer, Set<Assignment>>
calculateAssignmentsFn,
ScheduledExecutorService rebalanceScheduler) {
this.metaStorageMgr = metaStorageMgr;
this.tblConfiguration = tblConfiguration;
@@ -180,17 +180,29 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
try {
rebalanceAttempts.set(0);
- Entry pendingEntry =
metaStorageMgr.get(pendingPartAssignmentsKey(partId)).get();
+ byte[] pendingAssignmentsBytes =
metaStorageMgr.get(pendingPartAssignmentsKey(partId)).get().value();
- if (pendingEntry.value() != null) {
- Set<ClusterNode> pendingNodes =
ByteUtils.fromBytes(pendingEntry.value());
+ if (pendingAssignmentsBytes != null) {
+ Set<Assignment> pendingAssignments =
ByteUtils.fromBytes(pendingAssignmentsBytes);
- LOG.info("New leader elected. Going to reconfigure
peers [group={}, partition={}, table={}, peers={}]",
- partId, partNum,
tblConfiguration.name().value(), pendingNodes);
+ List<Peer> peers = new ArrayList<>();
+ List<Peer> learners = new ArrayList<>();
-
partitionMover.movePartition(clusterNodesToPeers(pendingNodes), List.of(),
term).join();
+ for (Assignment assignment : pendingAssignments) {
+ if (assignment.isPeer()) {
+ peers.add(new Peer(assignment.consistentId()));
+ } else {
+ learners.add(new
Peer(assignment.consistentId()));
+ }
+ }
+
+ LOG.info("New leader elected. Going to apply new
configuration "
+ + "[group={}, partition={}, table={},
peers={}, learners={}]",
+ partId, partNum,
tblConfiguration.name().value(), peers, learners);
+
+ partitionMover.movePartition(peers, learners,
term).get();
}
- } catch (InterruptedException | ExecutionException e) {
+ } catch (Exception e) {
// TODO: IGNITE-14693
LOG.warn("Unable to start rebalance [partition={},
table={}, term={}]",
e, partNum, tblConfiguration.name().value(), term);
@@ -320,31 +332,31 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
Entry switchReduceEntry = values.get(switchReduceKey);
Entry switchAppendEntry = values.get(switchAppendKey);
- Set<ClusterNode> calculatedAssignments =
calculateAssignmentsFn.apply(tblConfiguration, partNum);
+ Set<Assignment> retrievedStable = readAssignments(stableEntry);
+ Set<Assignment> retrievedSwitchReduce =
readAssignments(switchReduceEntry);
+ Set<Assignment> retrievedSwitchAppend =
readAssignments(switchAppendEntry);
- Set<ClusterNode> stable = resolveClusterNodes(peers,
pendingEntry.value(), stableEntry.value());
+ Set<Assignment> calculatedAssignments =
calculateAssignmentsFn.apply(tblConfiguration, partNum);
- Set<ClusterNode> retrievedSwitchReduce =
readClusterNodes(switchReduceEntry);
- Set<ClusterNode> retrievedSwitchAppend =
readClusterNodes(switchAppendEntry);
- Set<ClusterNode> retrievedStable = readClusterNodes(stableEntry);
+ Set<Assignment> stable = createAssignments(peers, learners);
// Were reduced
- Set<ClusterNode> reducedNodes = subtract(retrievedSwitchReduce,
stable);
+ Set<Assignment> reducedNodes = subtract(retrievedSwitchReduce,
stable);
// Were added
- Set<ClusterNode> addedNodes = subtract(stable, retrievedStable);
+ Set<Assignment> addedNodes = subtract(stable, retrievedStable);
// For further reduction
- Set<ClusterNode> calculatedSwitchReduce =
subtract(retrievedSwitchReduce, reducedNodes);
+ Set<Assignment> calculatedSwitchReduce =
subtract(retrievedSwitchReduce, reducedNodes);
// For further addition
- Set<ClusterNode> calculatedSwitchAppend =
union(retrievedSwitchAppend, reducedNodes);
+ Set<Assignment> calculatedSwitchAppend =
union(retrievedSwitchAppend, reducedNodes);
calculatedSwitchAppend = subtract(calculatedSwitchAppend,
addedNodes);
calculatedSwitchAppend = intersect(calculatedAssignments,
calculatedSwitchAppend);
- Set<ClusterNode> calculatedPendingReduction = subtract(stable,
retrievedSwitchReduce);
+ Set<Assignment> calculatedPendingReduction = subtract(stable,
retrievedSwitchReduce);
- Set<ClusterNode> calculatedPendingAddition = union(stable,
reducedNodes);
+ Set<Assignment> calculatedPendingAddition = union(stable,
reducedNodes);
calculatedPendingAddition = intersect(calculatedAssignments,
calculatedPendingAddition);
// eq(revision(assignments.stable),
retrievedAssignmentsStable.revision)
@@ -368,7 +380,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
// TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove
synchronous wait
tblConfiguration.change(ch -> {
- List<Set<ClusterNode>> assignments =
ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
+ List<Set<Assignment>> assignments =
ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
assignments.set(partNum, stable);
((ExtendedTableChange)
ch).changeAssignments(ByteUtils.toBytes(assignments));
}).get(10, TimeUnit.SECONDS);
@@ -486,22 +498,6 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
}
}
- /**
- * Transforms list of cluster nodes to the list of peers.
- *
- * @param nodes List of cluster nodes to transform.
- * @return List of transformed peers.
- */
- private static List<Peer> clusterNodesToPeers(Set<ClusterNode> nodes) {
- List<Peer> peers = new ArrayList<>(nodes.size());
-
- for (ClusterNode node : nodes) {
- peers.add(new Peer(node.name()));
- }
-
- return peers;
- }
-
/**
* Transforms list of peerIds to list of peers.
*
@@ -517,4 +513,28 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
return peers;
}
+
+ /**
+ * Creates a set of assignments from the given set of peers and learners.
+ */
+ private static Set<Assignment> createAssignments(Collection<PeerId> peers,
Collection<PeerId> learners) {
+ Stream<Assignment> newAssignments = Stream.concat(
+ peers.stream().map(peerId ->
Assignment.forPeer(peerId.getConsistentId())),
+ learners.stream().map(peerId ->
Assignment.forLearner(peerId.getConsistentId()))
+ );
+
+ return newAssignments.collect(Collectors.toSet());
+ }
+
+ /**
+ * Reads a list of cluster nodes from a MetaStorage entry.
+ *
+ * @param entry MetaStorage entry.
+ * @return List of cluster nodes.
+ */
+ private static Set<Assignment> readAssignments(Entry entry) {
+ byte[] value = entry.value();
+
+ return value == null ? Set.of() : ByteUtils.fromBytes(value);
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
index d36859d084..a37a7b512f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
@@ -20,18 +20,17 @@ package
org.apache.ignite.internal.table.distributed.replicator;
import java.io.Serializable;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
-import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
/**
- * Response for the {@link TxStateReplicaRequest}. Can contain either the
Partition Group leader, which should be
+ * Response for the {@link TxStateReplicaRequest}. Can contain either the
consistent ID of the Partition Group leader, which should be
* queried for the TX Meta, or the TX Meta itself.
*/
public class LeaderOrTxState implements Serializable {
private static final long serialVersionUID = -3555591755828355117L;
@Nullable
- private final ClusterNode leader;
+ private final String leaderName;
@Nullable
private final TxMeta txMeta;
@@ -39,16 +38,16 @@ public class LeaderOrTxState implements Serializable {
/**
* Creates a response.
*
- * @param leader Leader node.
+ * @param leaderName Leader consistent ID.
* @param txMeta TX meta.
*/
- public LeaderOrTxState(@Nullable ClusterNode leader, @Nullable TxMeta
txMeta) {
- this.leader = leader;
+ public LeaderOrTxState(@Nullable String leaderName, @Nullable TxMeta
txMeta) {
+ this.leaderName = leaderName;
this.txMeta = txMeta;
}
- public @Nullable ClusterNode leader() {
- return leader;
+ public @Nullable String leaderName() {
+ return leaderName;
}
public @Nullable TxMeta txMeta() {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 5dbb8f0475..cee18b3a9e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -108,7 +108,6 @@ import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
@@ -160,9 +159,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
/** Tx state storage. */
private final TxStateStorage txStateStorage;
- /** Topology service. */
- private final TopologyService topologyService;
-
/** Hybrid clock. */
private final HybridClock hybridClock;
@@ -203,7 +199,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param hybridClock Hybrid clock.
* @param safeTime Safe time clock.
* @param txStateStorage Transaction state storage.
- * @param topologyService Topology services.
* @param placementDriver Placement driver.
* @param isLocalPeerChecker Function for checking that the given peer is
local.
*/
@@ -221,7 +216,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
HybridClock hybridClock,
PendingComparableValuesTracker<HybridTimestamp> safeTime,
TxStateStorage txStateStorage,
- TopologyService topologyService,
PlacementDriver placementDriver,
Function<Peer, Boolean> isLocalPeerChecker
) {
@@ -238,7 +232,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
this.hybridClock = hybridClock;
this.safeTime = safeTime;
this.txStateStorage = txStateStorage;
- this.topologyService = topologyService;
this.placementDriver = placementDriver;
this.isLocalPeerChecker = isLocalPeerChecker;
@@ -310,7 +303,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return txStateFut.thenApply(txMeta -> new
LeaderOrTxState(null, txMeta));
} else {
- return completedFuture(new
LeaderOrTxState(topologyService.getByConsistentId(leader.consistentId()),
null));
+ return completedFuture(new
LeaderOrTxState(leader.consistentId(), null));
}
});
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
index 9710a5f0d8..ea43c02085 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
@@ -20,35 +20,42 @@ package
org.apache.ignite.internal.table.distributed.replicator;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
/**
* Placement driver.
*/
public class PlacementDriver {
- /** Assignment nodes per replication group. */
- private final Map<ReplicationGroupId, LinkedHashSet<ClusterNode>>
primaryReplicaMapping = new ConcurrentHashMap<>();
+ /** Assignment node names per replication group. */
+ private final Map<ReplicationGroupId, LinkedHashSet<String>>
primaryReplicaMapping = new ConcurrentHashMap<>();
/** Replication service. */
private final ReplicaService replicaService;
+ /** Function that resolves a node consistent ID to a cluster node. */
+ private final Function<String, ClusterNode> clusterNodeResolver;
+
/**
* The constructor.
*
* @param replicaService Replication service.
*/
- public PlacementDriver(ReplicaService replicaService) {
+ public PlacementDriver(ReplicaService replicaService, Function<String,
ClusterNode> clusterNodeResolver) {
this.replicaService = replicaService;
+ this.clusterNodeResolver = clusterNodeResolver;
}
/**
- * Sends a transaction sate request to the primary replica.
+ * Sends a transaction state request to the primary replica.
*
* @param replicaGrp Replication group id.
* @param request Status request.
@@ -66,10 +73,10 @@ public class PlacementDriver {
* Updates an assignment for the specific replication group.
*
* @param replicaGrpId Replication group id.
- * @param assignment Assignment.
+ * @param nodeNames Assignment node names.
*/
- public void updateAssignment(ReplicationGroupId replicaGrpId,
Collection<ClusterNode> assignment) {
- primaryReplicaMapping.put(replicaGrpId, new
LinkedHashSet<>(assignment));
+ public void updateAssignment(ReplicationGroupId replicaGrpId,
Collection<String> nodeNames) {
+ primaryReplicaMapping.put(replicaGrpId, new
LinkedHashSet<>(nodeNames));
}
/**
@@ -81,19 +88,23 @@ public class PlacementDriver {
* @param request Request.
*/
private void sendAndRetry(CompletableFuture<TxMeta> resFut,
ReplicationGroupId replicaGrp, TxStateReplicaRequest request) {
- ClusterNode nodeToSend =
primaryReplicaMapping.get(replicaGrp).iterator().next();
+ ClusterNode nodeToSend = primaryReplicaMapping.get(replicaGrp).stream()
+ .map(clusterNodeResolver)
+ .filter(Objects::nonNull)
+ .findFirst()
+ .orElseThrow(() -> new IgniteInternalException("All replica
nodes are unavailable"));
replicaService.invoke(nodeToSend, request).thenAccept(resp -> {
assert resp instanceof LeaderOrTxState : "Unsupported response
type [type=" + resp.getClass().getSimpleName() + ']';
- LeaderOrTxState stateAndLeader = (LeaderOrTxState) resp;
+ LeaderOrTxState stateOrLeader = (LeaderOrTxState) resp;
- ClusterNode nextNodeToSend = stateAndLeader.leader();
+ String nextNodeToSend = stateOrLeader.leaderName();
if (nextNodeToSend == null) {
- resFut.complete(stateAndLeader.txMeta());
+ resFut.complete(stateOrLeader.txMeta());
} else {
- LinkedHashSet<ClusterNode> newAssignment = new
LinkedHashSet<>();
+ LinkedHashSet<String> newAssignment = new LinkedHashSet<>();
newAssignment.add(nextNodeToSend);
newAssignment.addAll(primaryReplicaMapping.get(replicaGrp));
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index c987ed95e9..6c1f98358e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -26,22 +26,18 @@ import static
org.apache.ignite.internal.metastorage.client.If.iif;
import static org.apache.ignite.internal.metastorage.client.Operations.ops;
import static org.apache.ignite.internal.metastorage.client.Operations.put;
import static org.apache.ignite.internal.metastorage.client.Operations.remove;
-import static org.apache.ignite.internal.util.IgniteUtils.capacity;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.client.Conditions;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.If;
import org.apache.ignite.internal.metastorage.client.Operations;
@@ -49,9 +45,7 @@ import
org.apache.ignite.internal.metastorage.client.WatchEvent;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.raft.jraft.entity.PeerId;
import org.jetbrains.annotations.NotNull;
/**
@@ -104,7 +98,7 @@ public class RebalanceUtil {
ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
- Set<ClusterNode> partAssignments =
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+ Set<Assignment> partAssignments =
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
byte[] partAssignmentsBytes = ByteUtils.toBytes(partAssignments);
@@ -296,13 +290,13 @@ public class RebalanceUtil {
* storage has been cleared.
*
* @param partId Partition's raft group id.
- * @param clusterNode Cluster node to be removed from peers.
+ * @param peerAssignment Assignment of the peer to be removed.
* @param metaStorageMgr MetaStorage manager.
* @return Completable future that signifies the completion of this
operation.
*/
public static CompletableFuture<Void> startPeerRemoval(
TablePartitionId partId,
- ClusterNode clusterNode,
+ Assignment peerAssignment,
MetaStorageManager metaStorageMgr
) {
ByteArray key = switchReduceKey(partId);
@@ -312,29 +306,29 @@ public class RebalanceUtil {
byte[] prevValue =
retrievedAssignmentsSwitchReduce.value();
if (prevValue != null) {
- Set<ClusterNode> prev = ByteUtils.fromBytes(prevValue);
+ Set<Assignment> prev = ByteUtils.fromBytes(prevValue);
- prev.add(clusterNode);
+ prev.add(peerAssignment);
return metaStorageMgr.invoke(
revision(key).eq(retrievedAssignmentsSwitchReduce.revision()),
- Operations.put(key, ByteUtils.toBytes(prev)),
+ put(key, ByteUtils.toBytes(prev)),
Operations.noop()
);
} else {
var newValue = new HashSet<>();
- newValue.add(clusterNode);
+ newValue.add(peerAssignment);
return metaStorageMgr.invoke(
- Conditions.notExists(key),
- Operations.put(key,
ByteUtils.toBytes(newValue)),
+ notExists(key),
+ put(key, ByteUtils.toBytes(newValue)),
Operations.noop()
);
}
}).thenCompose(res -> {
if (!res) {
- return startPeerRemoval(partId, clusterNode,
metaStorageMgr);
+ return startPeerRemoval(partId, peerAssignment,
metaStorageMgr);
}
return CompletableFuture.completedFuture(null);
@@ -358,23 +352,23 @@ public class RebalanceUtil {
Entry entry = event.entryEvent().newEntry();
byte[] eventData = entry.value();
- Set<ClusterNode> assignments =
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+ Set<Assignment> switchReduce = ByteUtils.fromBytes(eventData);
- Set<ClusterNode> switchReduce = ByteUtils.fromBytes(eventData);
+ if (switchReduce.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ Set<Assignment> assignments =
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
ByteArray pendingKey = pendingPartAssignmentsKey(partId);
- Set<ClusterNode> pendingAssignments = subtract(assignments,
switchReduce);
+ Set<Assignment> pendingAssignments = subtract(assignments,
switchReduce);
byte[] pendingByteArray = ByteUtils.toBytes(pendingAssignments);
byte[] assignmentsByteArray = ByteUtils.toBytes(assignments);
- if (switchReduce.isEmpty()) {
- return CompletableFuture.completedFuture(null);
- }
-
ByteArray changeTriggerKey = partChangeTriggerKey(partId);
- byte[] rev =
ByteUtils.longToBytes(event.entryEvent().newEntry().revision());
+ byte[] rev = ByteUtils.longToBytes(entry.revision());
// Here is what happens in the MetaStorage:
// if ((notExists(changeTriggerKey) || value(changeTriggerKey) <
revision) && (notExists(pendingKey) && notExists(stableKey)) {
@@ -412,57 +406,6 @@ public class RebalanceUtil {
return metaStorageMgr.invoke(resultingOperation).thenApply(unused ->
null);
}
- /**
- * Builds a list of cluster nodes based on a list of peers, pending and
stable assignments.
- * A peer will be added to the result list iff peer's consistent ID is
present in pending or stable assignments.
- *
- * @param peers List of peers.
- * @param pendingAssignments Byte array that contains serialized list of
pending assignments.
- * @param stableAssignments Byte array that contains serialized list of
stable assignments.
- * @return Resolved cluster nodes.
- */
- public static Set<ClusterNode> resolveClusterNodes(Collection<PeerId>
peers, byte[] pendingAssignments, byte[] stableAssignments) {
- Map<String, ClusterNode> resolveRegistry = new HashMap<>();
-
- if (pendingAssignments != null) {
- Set<ClusterNode> pending = ByteUtils.fromBytes(pendingAssignments);
- pending.forEach(n -> resolveRegistry.put(n.name(), n));
- }
-
- if (stableAssignments != null) {
- Set<ClusterNode> stable = ByteUtils.fromBytes(stableAssignments);
- stable.forEach(n -> resolveRegistry.put(n.name(), n));
- }
-
- var resolvedNodes = new HashSet<ClusterNode>(capacity(peers.size()));
-
- for (PeerId p : peers) {
- ClusterNode resolvedNode =
resolveRegistry.get(p.getConsistentId());
-
- if (resolvedNode != null) {
- resolvedNodes.add(resolvedNode);
- } else {
- throw new IgniteInternalException("Can't find appropriate
cluster node for raft group peer: " + p);
- }
- }
-
- return resolvedNodes;
- }
-
- /**
- * Reads a list of cluster nodes from a MetaStorage entry.
- *
- * @param entry MetaStorage entry.
- * @return List of cluster nodes.
- */
- public static Set<ClusterNode> readClusterNodes(Entry entry) {
- if (entry.empty()) {
- return Collections.emptySet();
- }
-
- return ByteUtils.fromBytes(entry.value());
- }
-
/**
* Removes nodes from set of nodes.
*
@@ -470,7 +413,7 @@ public class RebalanceUtil {
* @param subtrahend Set of nodes to be removed.
* @return Result of the subtraction.
*/
- public static Set<ClusterNode> subtract(Set<ClusterNode> minuend,
Set<ClusterNode> subtrahend) {
+ public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {
return minuend.stream().filter(v ->
!subtrahend.contains(v)).collect(Collectors.toSet());
}
@@ -481,7 +424,7 @@ public class RebalanceUtil {
* @param op2 Second operand.
* @return Result of the addition.
*/
- public static Set<ClusterNode> union(Set<ClusterNode> op1,
Set<ClusterNode> op2) {
+ public static <T> Set<T> union(Set<T> op1, Set<T> op2) {
var res = new HashSet<>(op1);
res.addAll(op2);
@@ -496,7 +439,7 @@ public class RebalanceUtil {
* @param op2 Second operand.
* @return Result of the intersection.
*/
- public static Set<ClusterNode> intersect(Set<ClusterNode> op1,
Set<ClusterNode> op2) {
+ public static <T> Set<T> intersect(Set<T> op1, Set<T> op2) {
return op1.stream().filter(op2::contains).collect(Collectors.toSet());
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 6d5487c3e8..03c8e3b2dd 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -55,6 +55,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import
org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListenerHolder;
@@ -91,7 +92,6 @@ import org.apache.ignite.internal.table.TableImpl;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.ByteUtils;
@@ -101,7 +101,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
@@ -163,13 +162,9 @@ public class TableManagerTest extends IgniteAbstractTest {
private ReplicaManager replicaMgr;
/** TX manager. */
- @Mock(lenient = true)
+ @Mock
private TxManager tm;
- /** TX manager. */
- @Mock(lenient = true)
- private LockManager lm;
-
/** Meta storage manager. */
@Mock
MetaStorageManager msm;
@@ -178,10 +173,6 @@ public class TableManagerTest extends IgniteAbstractTest {
@Mock
private MessagingService messagingService;
- /** Mock cluster. */
- @Mock
- private ClusterService cluster;
-
/**
* Revision listener holder. It uses for the test configurations:
* <ul>
@@ -222,8 +213,12 @@ public class TableManagerTest extends IgniteAbstractTest {
/** Before all test scenarios. */
@BeforeEach
void before() {
- when(rm.messagingService()).thenReturn(mock(MessagingService.class));
- when(rm.topologyService()).thenReturn(mock(TopologyService.class));
+ when(rm.messagingService()).thenReturn(messagingService);
+
+ TopologyService topologyService = mock(TopologyService.class);
+
+ when(rm.topologyService()).thenReturn(topologyService);
+ when(topologyService.localMember()).thenReturn(node);
revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
function.apply(0L).join();
@@ -235,7 +230,7 @@ public class TableManagerTest extends IgniteAbstractTest {
});
};
- when(msm.registerWatch(any(ByteArray.class),
any())).thenReturn(CompletableFuture.completedFuture(1L));
+ when(msm.registerWatch(any(ByteArray.class),
any())).thenReturn(completedFuture(1L));
tblManagerFut = new CompletableFuture<>();
}
@@ -260,8 +255,7 @@ public class TableManagerTest extends IgniteAbstractTest {
*/
@Test
public void testPreconfiguredTable() throws Exception {
- when(rm.startRaftGroupService(any(), any())).thenAnswer(mock ->
-
CompletableFuture.completedFuture(mock(RaftGroupService.class)));
+ when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock ->
completedFuture(mock(RaftGroupService.class)));
TableManager tableManager = createTableManager(tblManagerFut, false);
@@ -284,10 +278,10 @@ public class TableManagerTest extends IgniteAbstractTest {
var extConfCh = ((ExtendedTableChange) tableChange);
- ArrayList<Set<ClusterNode>> assignment = new
ArrayList<>(PARTITIONS);
+ var assignment = new ArrayList<Set<Assignment>>(PARTITIONS);
for (int part = 0; part < PARTITIONS; part++) {
- assignment.add(new HashSet<>(Collections.singleton(node)));
+ assignment.add(new
HashSet<>(Collections.singleton(Assignment.forPeer(node.name()))));
}
extConfCh.changeAssignments(ByteUtils.toBytes(assignment)).changeSchemaId(1);
@@ -497,7 +491,7 @@ public class TableManagerTest extends IgniteAbstractTest {
TableImpl table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
- verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any());
+ verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(),
any());
TableManager tableManager = tblManagerFut.join();
@@ -630,7 +624,7 @@ public class TableManagerTest extends IgniteAbstractTest {
) throws Exception {
String consistentId = "node0";
- when(rm.startRaftGroupService(any(), any())).thenAnswer(mock -> {
+ when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock ->
{
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer(consistentId));
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index dd9c1fa62c..f555919264 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -76,7 +76,6 @@ import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.service.LeaderWithTerm;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.hamcrest.CustomMatcher;
@@ -180,7 +179,6 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
CLOCK,
new PendingComparableValuesTracker<>(CLOCK.now()),
new TestTxStateStorage(),
- mock(TopologyService.class),
mock(PlacementDriver.class),
peer -> true
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 06bb6b469d..ba78fde628 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -265,7 +265,6 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
clock,
safeTimeClock,
txStateStorage,
- topologySrv,
placementDriver,
peer -> localNode.name().equals(peer.consistentId())
);
@@ -304,7 +303,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS);
- assertNull(tuple.leader());
+ assertNull(tuple.leaderName());
assertNull(tuple.txMeta());
}
@@ -326,7 +325,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(TxState.COMMITED, tuple.txMeta().txState());
assertTrue(readTimestamp.compareTo(tuple.txMeta().commitTimestamp()) >
0);
- assertNull(tuple.leader());
+ assertNull(tuple.leaderName());
}
@Test
@@ -342,7 +341,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS);
assertNull(tuple.txMeta());
- assertNotNull(tuple.leader());
+ assertNotNull(tuple.leaderName());
}
@Test
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index f459ff6043..bf02ede3fc 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -252,7 +252,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
clock,
new PendingComparableValuesTracker<>(clock.now()),
txStateStorage().getOrCreateTxStateStorage(0),
- null,
placementDriver,
peer -> true
);