This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a7ef1aaad8 IGNITE-24123 Add DistributionAlgorithm interface (#4984)
a7ef1aaad8 is described below

commit a7ef1aaad8e4cc8a075bacf027968f1f006399a9
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Jan 7 14:01:19 2025 +0300

    IGNITE-24123 Add DistributionAlgorithm interface (#4984)
---
 .../apache/ignite/internal/util/IgniteUtils.java   | 16 ++++++++
 .../rebalance/ItRebalanceDistributedTest.java      |  4 +-
 .../distributionzones/rebalance/RebalanceUtil.java |  5 ++-
 .../ZoneRebalanceRaftGroupEventsListener.java      |  4 +-
 .../rebalance/ZoneRebalanceUtil.java               |  5 ++-
 .../DistributionZoneRebalanceEngineTest.java       |  2 +-
 .../RebalanceUtilUpdateAssignmentsTest.java        |  9 +++--
 .../DistributionAlgorithm.java                     | 43 ++++++++++++++++++++++
 .../PartitionDistributionUtils.java                | 41 +++++++++++----------
 .../RendezvousDistributionFunction.java            | 31 +++++++++++-----
 .../replicator/ItReplicaLifecycleTest.java         |  6 +--
 .../PartitionReplicaLifecycleManager.java          |  2 +
 .../PlacementDriverManagerTest.java                |  4 +-
 .../internal/placementdriver/LeaseUpdater.java     |  5 +++
 .../ignite/internal/table/ItTableScanTest.java     |  2 +-
 .../internal/table/distributed/TableManager.java   |  2 +
 .../distributed/disaster/GroupUpdateRequest.java   |  7 +++-
 .../ignite/internal/utils/RebalanceUtilEx.java     |  4 +-
 .../disaster/DisasterRecoveryMsInvokeTest.java     |  4 +-
 .../internal/tx/test/ItTransactionTestUtils.java   | 14 ++++++-
 20 files changed, 158 insertions(+), 52 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 22bac9e011..b259f7b588 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -57,6 +57,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntSupplier;
@@ -1011,6 +1012,21 @@ public class IgniteUtils {
         return Optional.empty();
     }
 
+    /**
+     * Iterates over the given collection and applies the given closure to 
each element using the collection element and its index.
+     *
+     * @param collection Collection.
+     * @param closure Closure to apply.
+     * @param <T> Type of collection element.
+     */
+    public static <T> void forEachIndexed(Collection<T> collection, 
BiConsumer<T, Integer> closure) {
+        int i = 0;
+
+        for (T t : collection) {
+            closure.accept(t, i++);
+        }
+    }
+
     /**
      * Retries operation until it succeeds or fails with exception that is 
different than the given.
      *
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index a8c82630bb..5745dfd421 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -943,8 +943,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
             dataNodes.add(getNode(i).name);
         }
 
-        Set<Assignment> pendingAssignments = 
calculateAssignmentForPartition(dataNodes, 0, 2);
-        Set<Assignment> plannedAssignments = 
calculateAssignmentForPartition(dataNodes, 0, 3);
+        Set<Assignment> pendingAssignments = 
calculateAssignmentForPartition(dataNodes, 0, 1, 2);
+        Set<Assignment> plannedAssignments = 
calculateAssignmentForPartition(dataNodes, 0, 1, 3);
 
         Node node0 = getNode(0);
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index e90a6d0ef5..c845b17763 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -133,6 +133,7 @@ public class RebalanceUtil {
      * @param tableDescriptor Table descriptor.
      * @param partId Unique identifier of a partition.
      * @param dataNodes Data nodes.
+     * @param partitions Number of partitions.
      * @param replicas Number of replicas for a table.
      * @param revision Revision of Meta Storage that is specific for the 
assignment update.
      * @param metaStorageMgr Meta Storage manager.
@@ -144,6 +145,7 @@ public class RebalanceUtil {
             CatalogTableDescriptor tableDescriptor,
             TablePartitionId partId,
             Collection<String> dataNodes,
+            int partitions,
             int replicas,
             long revision,
             MetaStorageManager metaStorageMgr,
@@ -161,7 +163,7 @@ public class RebalanceUtil {
 
         ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
 
-        Set<Assignment> calculatedAssignments = 
calculateAssignmentForPartition(dataNodes, partNum, replicas);
+        Set<Assignment> calculatedAssignments = 
calculateAssignmentForPartition(dataNodes, partNum, partitions, replicas);
 
         Set<Assignment> partAssignments;
 
@@ -375,6 +377,7 @@ public class RebalanceUtil {
                     tableDescriptor,
                     replicaGrpId,
                     dataNodes,
+                    zoneDescriptor.partitions(),
                     zoneDescriptor.replicas(),
                     storageRevision,
                     metaStorageManager,
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index 4e11970984..03b353622b 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -556,6 +556,7 @@ public class ZoneRebalanceRaftGroupEventsListener 
implements RaftGroupEventsList
      *
      * @param metaStorageMgr MetaStorage manager.
      * @param dataNodes Data nodes.
+     * @param partitions Partitions count.
      * @param replicas Replicas count.
      * @param partId Partition's raft group id.
      * @param event Assignments switch reduce change event.
@@ -564,6 +565,7 @@ public class ZoneRebalanceRaftGroupEventsListener 
implements RaftGroupEventsList
     public static CompletableFuture<Void> handleReduceChanged(
             MetaStorageManager metaStorageMgr,
             Collection<String> dataNodes,
+            int partitions,
             int replicas,
             ZonePartitionId partId,
             WatchEvent event,
@@ -580,7 +582,7 @@ public class ZoneRebalanceRaftGroupEventsListener 
implements RaftGroupEventsList
             return nullCompletedFuture();
         }
 
-        Set<Assignment> assignments = 
calculateAssignmentForPartition(dataNodes, partId.partitionId(), replicas);
+        Set<Assignment> assignments = 
calculateAssignmentForPartition(dataNodes, partId.partitionId(), partitions, 
replicas);
 
         ByteArray pendingKey = pendingPartAssignmentsKey(partId);
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index 07f64ea3b2..b0777e3e77 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -129,6 +129,7 @@ public class ZoneRebalanceUtil {
      * @param zoneDescriptor Zone descriptor.
      * @param zonePartitionId Unique aggregate identifier of a partition of a 
zone.
      * @param dataNodes Data nodes.
+     * @param partitions Number of partitions in a zone.
      * @param replicas Number of replicas for a zone.
      * @param revision Revision of Meta Storage that is specific for the 
assignment update.
      * @param metaStorageMgr Meta Storage manager.
@@ -141,6 +142,7 @@ public class ZoneRebalanceUtil {
             CatalogZoneDescriptor zoneDescriptor,
             ZonePartitionId zonePartitionId,
             Collection<String> dataNodes,
+            int partitions,
             int replicas,
             long revision,
             MetaStorageManager metaStorageMgr,
@@ -158,7 +160,7 @@ public class ZoneRebalanceUtil {
 
         ByteArray partAssignmentsStableKey = 
stablePartAssignmentsKey(zonePartitionId);
 
-        Set<Assignment> calculatedAssignments = 
calculateAssignmentForPartition(dataNodes, partNum, replicas);
+        Set<Assignment> calculatedAssignments = 
calculateAssignmentForPartition(dataNodes, partNum, partitions, replicas);
 
         Set<Assignment> partAssignments;
 
@@ -351,6 +353,7 @@ public class ZoneRebalanceUtil {
                         zoneDescriptor,
                         replicaGrpId,
                         dataNodes,
+                        zoneDescriptor.partitions(),
                         zoneDescriptor.replicas(),
                         storageRevision,
                         metaStorageManager,
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index de3c1b5b76..f40c9d4c3e 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -515,7 +515,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
                 if (expectedNodes != null) {
                     Set<String> expectedAssignments =
-                            calculateAssignmentForPartition(expectedNodes, j, 
zoneDescriptor.replicas())
+                            calculateAssignmentForPartition(expectedNodes, j, 
zoneDescriptor.partitions(), zoneDescriptor.replicas())
                                     
.stream().map(Assignment::consistentId).collect(toSet());
 
                     assertNotNull(actualAssignmentsBytes);
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index d3c53b5258..e44f9687cc 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -121,10 +121,10 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
     private static final Set<String> nodes3 = IntStream.of(5).mapToObj(i -> 
"nodes3_" + i).collect(toSet());
     private static final Set<String> nodes4 = IntStream.of(5).mapToObj(i -> 
"nodes4_" + i).collect(toSet());
 
-    private static final Set<Assignment> assignments1 = 
calculateAssignmentForPartition(nodes1, partNum, replicas);
-    private static final Set<Assignment> assignments2 = 
calculateAssignmentForPartition(nodes2, partNum, replicas);
-    private static final Set<Assignment> assignments3 = 
calculateAssignmentForPartition(nodes3, partNum, replicas);
-    private static final Set<Assignment> assignments4 = 
calculateAssignmentForPartition(nodes4, partNum, replicas);
+    private static final Set<Assignment> assignments1 = 
calculateAssignmentForPartition(nodes1, partNum, partNum + 1, replicas);
+    private static final Set<Assignment> assignments2 = 
calculateAssignmentForPartition(nodes2, partNum, partNum + 1, replicas);
+    private static final Set<Assignment> assignments3 = 
calculateAssignmentForPartition(nodes3, partNum, partNum + 1, replicas);
+    private static final Set<Assignment> assignments4 = 
calculateAssignmentForPartition(nodes4, partNum, partNum + 1, replicas);
 
     private static final long expectedPendingChangeTriggerKey = 10L;
 
@@ -539,6 +539,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
                 tableDescriptor,
                 tablePartitionId,
                 nodesForNewAssignments,
+                partNum + 1,
                 replicas,
                 expectedPendingChangeTriggerKey,
                 metaStorageManager,
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
new file mode 100644
index 0000000000..6bf518dff1
--- /dev/null
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Partition distribution algorithm.
+ */
+public interface DistributionAlgorithm {
+
+    /**
+     * Generates an assignment by the given parameters.
+     *
+     * @param nodes List of topology nodes.
+     * @param currentDistribution Previous assignments or empty list.
+     * @param partitions Number of table partitions.
+     * @param replicaFactor Number partition replicas.
+     * @return List of nodes by partition.
+     */
+    List<List<String>> assignPartitions(
+            Collection<String> nodes,
+            List<List<String>> currentDistribution,
+            int partitions,
+            int replicaFactor
+    );
+}
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
index 694730c982..fe7f2c78c8 100644
--- 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
@@ -17,12 +17,11 @@
 
 package org.apache.ignite.internal.partitiondistribution;
 
+import static java.util.Collections.emptyList;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -30,6 +29,9 @@ import java.util.Set;
  * Stateless distribution utils that produces helper methods for an 
assignments distribution calculation.
  */
 public class PartitionDistributionUtils {
+
+    private static final DistributionAlgorithm DISTRIBUTION_ALGORITHM = new 
RendezvousDistributionFunction();
+
     /**
      * Calculates assignments distribution.
      *
@@ -38,14 +40,16 @@ public class PartitionDistributionUtils {
      * @param replicas Replicas count.
      * @return List assignments by partition.
      */
-    public static List<Set<Assignment>> 
calculateAssignments(Collection<String> dataNodes, int partitions, int 
replicas) {
-        List<Set<String>> nodes = 
RendezvousDistributionFunction.assignPartitions(
+    public static List<Set<Assignment>> calculateAssignments(
+            Collection<String> dataNodes,
+            int partitions,
+            int replicas
+    ) {
+        List<List<String>> nodes = DISTRIBUTION_ALGORITHM.assignPartitions(
                 dataNodes,
+                emptyList(),
                 partitions,
-                replicas,
-                false,
-                null,
-                HashSet::new
+                replicas
         );
 
         return 
nodes.stream().map(PartitionDistributionUtils::dataNodesToAssignments).collect(toList());
@@ -56,21 +60,20 @@ public class PartitionDistributionUtils {
      *
      * @param dataNodes Data nodes.
      * @param partitionId Partition id.
+     * @param partitions Partitions count.
      * @param replicas Replicas count.
      * @return Set of assignments.
      */
-    public static Set<Assignment> 
calculateAssignmentForPartition(Collection<String> dataNodes, int partitionId, 
int replicas) {
-        Set<String> nodes = RendezvousDistributionFunction.assignPartition(
-                partitionId,
-                new ArrayList<>(dataNodes),
-                replicas,
-                null,
-                false,
-                null,
-                HashSet::new
-        );
+    public static Set<Assignment> calculateAssignmentForPartition(
+            Collection<String> dataNodes,
+            int partitionId,
+            int partitions,
+            int replicas
+    ) {
+        List<List<String>> nodes = 
DISTRIBUTION_ALGORITHM.assignPartitions(dataNodes, emptyList(), partitions, 
replicas);
+        List<String> affinityNodes = nodes.get(partitionId);
 
-        return dataNodesToAssignments(nodes);
+        return dataNodesToAssignments(affinityNodes);
     }
 
     private static Set<Assignment> dataNodesToAssignments(Collection<String> 
nodes) {
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
index 0f5e2c99c4..1f3ca4e37f 100644
--- 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.partitiondistribution;
 
+import static org.apache.ignite.internal.util.IgniteUtils.forEachIndexed;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,6 +35,7 @@ import java.util.function.IntFunction;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Partition distribution function for partitioned table based on Highest 
Random Weight algorithm. This function supports the following
@@ -54,7 +57,7 @@ import org.apache.ignite.internal.logger.Loggers;
  * </li>
  * </ul>
  */
-public class RendezvousDistributionFunction {
+public class RendezvousDistributionFunction implements DistributionAlgorithm {
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(RendezvousDistributionFunction.class);
 
@@ -81,11 +84,11 @@ public class RendezvousDistributionFunction {
      */
     public static <T extends Collection<String>> T assignPartition(
             int part,
-            List<String> nodes,
+            Collection<String> nodes,
             int replicas,
             Map<String, Collection<String>> neighborhoodCache,
             boolean exclNeighbors,
-            BiPredicate<String, T> nodeFilter,
+            @Nullable BiPredicate<String, T> nodeFilter,
             IntFunction<T> aggregator
     ) {
         if (nodes.size() <= 1) {
@@ -99,13 +102,11 @@ public class RendezvousDistributionFunction {
         IgniteBiTuple<Long, String>[] hashArr =
                 (IgniteBiTuple<Long, String>[]) new 
IgniteBiTuple[nodes.size()];
 
-        for (int i = 0; i < nodes.size(); i++) {
-            String node = nodes.get(i);
-
+        forEachIndexed(nodes, (node, i) -> {
             long hash = hash(node.hashCode(), part);
 
             hashArr[i] = new IgniteBiTuple<>(hash, node);
-        }
+        });
 
         final int effectiveReplicas = replicas == Integer.MAX_VALUE ? 
nodes.size() : Math.min(replicas, nodes.size());
 
@@ -182,7 +183,7 @@ public class RendezvousDistributionFunction {
      * @param aggregator  Function that creates a collection for the partition 
assignments.
      * @return Assignment.
      */
-    private static <T extends Collection<String>> T 
replicatedAssign(List<String> nodes,
+    private static <T extends Collection<String>> T 
replicatedAssign(Collection<String> nodes,
             Iterable<String> sortedNodes, IntFunction<T> aggregator) {
         String first = sortedNodes.iterator().next();
 
@@ -239,7 +240,7 @@ public class RendezvousDistributionFunction {
             int partitions,
             int replicas,
             boolean exclNeighbors,
-            BiPredicate<String, List<String>> nodeFilter
+            @Nullable BiPredicate<String, List<String>> nodeFilter
     ) {
         return assignPartitions(currentTopologySnapshot, partitions, replicas, 
exclNeighbors, nodeFilter, ArrayList::new);
     }
@@ -260,7 +261,7 @@ public class RendezvousDistributionFunction {
             int partitions,
             int replicas,
             boolean exclNeighbors,
-            BiPredicate<String, T> nodeFilter,
+            @Nullable BiPredicate<String, T> nodeFilter,
             IntFunction<T> aggregator
     ) {
         assert partitions <= MAX_PARTITIONS_COUNT : "partitions <= " + 
MAX_PARTITIONS_COUNT;
@@ -282,6 +283,16 @@ public class RendezvousDistributionFunction {
         return assignments;
     }
 
+    @Override
+    public List<List<String>> assignPartitions(
+            Collection<String> nodes,
+            List<List<String>> currentDistribution,
+            int partitions,
+            int replicaFactor
+    ) {
+        return assignPartitions(nodes, partitions, replicaFactor, false, null);
+    }
+
     /**
      * Builds neighborhood map for all nodes in snapshot.
      *
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index efc7e87d07..749194de65 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -395,7 +395,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
         startNodes(testInfo, 3);
 
         Assignment replicaAssignment = (Assignment) 
calculateAssignmentForPartition(
-                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toList()), 0, 1).toArray()[0];
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toList()), 0, 1, 1).toArray()[0];
 
         Node node = getNode(replicaAssignment.consistentId());
 
@@ -616,7 +616,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
         startNodes(testInfo, 3);
 
         Assignment replicaAssignment = (Assignment) 
calculateAssignmentForPartition(
-                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toList()), 0, 1).toArray()[0];
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toList()), 0, 1, 1).toArray()[0];
 
         Node node = getNode(replicaAssignment.consistentId());
 
@@ -649,7 +649,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
         startNodes(testInfo, 3);
 
         Assignment replicaAssignment = (Assignment) 
calculateAssignmentForPartition(
-                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toList()), 0, 3).toArray()[0];
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toList()), 0, 1, 3).toArray()[0];
 
         Node node = getNode(replicaAssignment.consistentId());
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 1e6e8122f9..656b8e6925 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -534,6 +534,7 @@ public class PartitionReplicaLifecycleManager extends
             ).thenApply(dataNodes -> calculateAssignmentForPartition(
                             dataNodes,
                             zonePartitionId.partitionId(),
+                            zoneDescriptor.partitions(),
                             zoneDescriptor.replicas()
                     )
             );
@@ -778,6 +779,7 @@ public class PartitionReplicaLifecycleManager extends
                         .thenCompose(dataNodes -> handleReduceChanged(
                                 metaStorageMgr,
                                 dataNodes,
+                                zoneDescriptor.partitions(),
                                 zoneDescriptor.replicas(),
                                 replicaGrpId,
                                 evt,
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index 31f9cf2f58..7622b4a40a 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -389,7 +389,7 @@ public class PlacementDriverManagerTest extends 
BasePlacementDriverTest {
 
         }, 10_000));
 
-        assignments = 
calculateAssignmentForPartition(Collections.singleton(nodeName), 1, 1);
+        assignments = 
calculateAssignmentForPartition(Collections.singleton(nodeName), 1, 2, 1);
 
         metaStorageManager.put(fromString(STABLE_ASSIGNMENTS_PREFIX + 
grpPart0), Assignments.toBytes(assignments, assignmentsTimestamp));
 
@@ -428,7 +428,7 @@ public class PlacementDriverManagerTest extends 
BasePlacementDriverTest {
             return falseCompletedFuture();
         });
 
-        Set<Assignment> assignments = 
calculateAssignmentForPartition(Collections.singleton(anotherNodeName), 1, 1);
+        Set<Assignment> assignments = 
calculateAssignmentForPartition(Collections.singleton(anotherNodeName), 1, 2, 
1);
 
         metaStorageManager.put(fromString(STABLE_ASSIGNMENTS_PREFIX + 
grpPart0), Assignments.toBytes(assignments, assignmentsTimestamp));
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index ac2c92e9cb..880875470e 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -543,6 +543,11 @@ public class LeaseUpdater {
                 );
             }
 
+            // This condition allows to skip the meta storage invoke when 
there are no leases to update (renewedLeases.isEmpty()).
+            // However there is the case when we need to save empty leases 
collection: when the assignments are empty and
+            // leasesCurrent (those that reflect the meta storage state) is 
not empty. The negation of this condition gives us
+            // the condition to skip the update and the result is:
+            // !(emptyAssignments && !leasesCurrent.isEmpty()) == 
(!emptyAssignments || leasesCurrent.isEmpty())
             boolean emptyAssignments = 
aggregatedStableAndPendingAssignmentsByGroups.isEmpty();
             if (renewedLeases.isEmpty() && (!emptyAssignments || 
leasesCurrent.leaseByGroupId().isEmpty())) {
                 LOG.debug("No leases to update found.");
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 9a058af9c4..70fefc39f2 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -826,7 +826,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
             if (readOnly) {
                 // Any node from assignments will do it.
                 Set<Assignment> assignments = 
calculateAssignmentForPartition(CLUSTER.aliveNode().clusterNodes().stream().map(
-                        ClusterNode::name).collect(Collectors.toList()), 0, 1);
+                        ClusterNode::name).collect(Collectors.toList()), 0, 1, 
1);
 
                 assertFalse(assignments.isEmpty());
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 91beb10eec..3fdafb9c4d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1403,6 +1403,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     calculateAssignmentForPartition(
                             dataNodes,
                             tablePartitionId.partitionId(),
+                            zoneDescriptor.partitions(),
                             zoneDescriptor.replicas()
                     )
             );
@@ -2490,6 +2491,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                                             .thenCompose(dataNodes -> 
RebalanceUtilEx.handleReduceChanged(
                                                     metaStorageMgr,
                                                     dataNodes,
+                                                    
zoneDescriptor.partitions(),
                                                     zoneDescriptor.replicas(),
                                                     replicaGrpId,
                                                     evt,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
index 085de7dba4..b3afe4904a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
@@ -276,6 +276,7 @@ class GroupUpdateRequest implements DisasterRecoveryRequest 
{
                     replicaGrpId,
                     aliveDataNodes,
                     aliveNodesConsistentIds,
+                    zoneDescriptor.partitions(),
                     zoneDescriptor.replicas(),
                     revision,
                     metaStorageManager,
@@ -297,6 +298,7 @@ class GroupUpdateRequest implements DisasterRecoveryRequest 
{
             TablePartitionId partId,
             Collection<String> aliveDataNodes,
             Set<String> aliveNodesConsistentIds,
+            int partitions,
             int replicas,
             long revision,
             MetaStorageManager metaStorageMgr,
@@ -317,7 +319,7 @@ class GroupUpdateRequest implements DisasterRecoveryRequest 
{
         }
 
         if (manualUpdate) {
-            enrichAssignments(partId, aliveDataNodes, replicas, 
partAssignments);
+            enrichAssignments(partId, aliveDataNodes, partitions, replicas, 
partAssignments);
         }
 
         Assignment nextAssignment = 
nextAssignment(localPartitionStateMessageByNode, partAssignments);
@@ -468,10 +470,11 @@ class GroupUpdateRequest implements 
DisasterRecoveryRequest {
     private static void enrichAssignments(
             TablePartitionId partId,
             Collection<String> aliveDataNodes,
+            int partitions,
             int replicas,
             Set<Assignment> partAssignments
     ) {
-        Set<Assignment> calcAssignments = 
calculateAssignmentForPartition(aliveDataNodes, partId.partitionId(), replicas);
+        Set<Assignment> calcAssignments = 
calculateAssignmentForPartition(aliveDataNodes, partId.partitionId(), 
partitions, replicas);
 
         for (Assignment calcAssignment : calcAssignments) {
             if (partAssignments.size() == replicas) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
index 283769bd00..1a3339f770 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
@@ -110,6 +110,7 @@ public class RebalanceUtilEx {
      *
      * @param metaStorageMgr MetaStorage manager.
      * @param dataNodes Data nodes.
+     * @param partitions Number of partitions.
      * @param replicas Replicas count.
      * @param partId Partition's raft group id.
      * @param event Assignments switch reduce change event.
@@ -118,6 +119,7 @@ public class RebalanceUtilEx {
     public static CompletableFuture<Void> handleReduceChanged(
             MetaStorageManager metaStorageMgr,
             Collection<String> dataNodes,
+            int partitions,
             int replicas,
             TablePartitionId partId,
             WatchEvent event,
@@ -134,7 +136,7 @@ public class RebalanceUtilEx {
             return nullCompletedFuture();
         }
 
-        Set<Assignment> assignments = 
calculateAssignmentForPartition(dataNodes, partId.partitionId(), replicas);
+        Set<Assignment> assignments = 
calculateAssignmentForPartition(dataNodes, partId.partitionId(), partitions, 
replicas);
 
         ByteArray pendingKey = pendingPartAssignmentsKey(partId);
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryMsInvokeTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryMsInvokeTest.java
index ce17ff529e..56649edce2 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryMsInvokeTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryMsInvokeTest.java
@@ -59,8 +59,8 @@ public class DisasterRecoveryMsInvokeTest extends 
BaseIgniteAbstractTest {
     private static final Set<String> nodes1 = IntStream.of(5).mapToObj(i -> 
"nodes1_" + i).collect(toSet());
     private static final Set<String> nodes2 = IntStream.of(5).mapToObj(i -> 
"nodes2_" + i).collect(toSet());
 
-    private static final Set<Assignment> assignments1 = 
calculateAssignmentForPartition(nodes1, partNum, replicas);
-    private static final Set<Assignment> assignments2 = 
calculateAssignmentForPartition(nodes2, partNum, replicas);
+    private static final Set<Assignment> assignments1 = 
calculateAssignmentForPartition(nodes1, partNum, partNum + 1, replicas);
+    private static final Set<Assignment> assignments2 = 
calculateAssignmentForPartition(nodes2, partNum, partNum + 1, replicas);
 
     private static final TablePartitionId tablePartitionId = new 
TablePartitionId(1, 1);
 
diff --git 
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
 
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
index c05dbb03a8..a57e9ce9b0 100644
--- 
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
+++ 
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
@@ -26,6 +26,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
+import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -124,10 +125,14 @@ public class ItTransactionTestUtils {
         Tuple t = initialTuple;
         int tableId = tableId(node, tableName);
 
-        int maxAttempts = 100;
+        Set<Integer> partitionIds = new HashSet<>();
+        Set<String> nodes = new HashSet<>();
+
+        int maxAttempts = 1000;
 
         while (maxAttempts >= 0) {
             int partId = partitionIdForTuple(node, tableName, t, tx);
+            partitionIds.add(partId);
 
             TablePartitionId grpId = new TablePartitionId(tableId, partId);
 
@@ -137,12 +142,16 @@ public class ItTransactionTestUtils {
                 if (node.id().equals(replicaMeta.getLeaseholderId())) {
                     return t;
                 }
+
+                nodes.add(replicaMeta.getLeaseholder());
             } else {
                 Set<String> assignments = partitionAssignment(node, grpId);
 
                 if (assignments.contains(node.name())) {
                     return t;
                 }
+
+                nodes.addAll(assignments);
             }
 
             t = nextTuple.apply(t);
@@ -150,7 +159,8 @@ public class ItTransactionTestUtils {
             maxAttempts--;
         }
 
-        throw new AssertionError("Failed to find a suitable tuple.");
+        throw new AssertionError("Failed to find a suitable tuple, tried " + 
maxAttempts + " times with [partitionIds="
+                + partitionIds + ", nodes=" + nodes + "].");
     }
 
     /**


Reply via email to