This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6beba4153a IGNITE-22722 Add the ability to get assignments for a list 
of replication groups at once (#4206)
6beba4153a is described below

commit 6beba4153ada043991311bde4425b24f12fc1247
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Wed Aug 14 17:08:31 2024 +0300

    IGNITE-22722 Add the ability to get assignments for a list of replication 
groups at once (#4206)
---
 .../compaction/CatalogCompactionRunner.java        | 55 +++++++++------------
 .../CatalogCompactionRunnerSelfTest.java           | 16 ++++--
 .../ignite/client/handler/FakePlacementDriver.java |  4 +-
 .../ignite/internal/index/TestPlacementDriver.java |  5 +-
 .../replicator/utils/TestPlacementDriver.java      |  5 +-
 .../AssignmentsPlacementDriver.java                | 22 ++++++++-
 .../placementdriver/TestPlacementDriver.java       |  5 +-
 .../placementdriver/AssignmentsTracker.java        | 13 +++--
 .../placementdriver/PlacementDriverManager.java    |  7 +--
 .../sql/engine/ExecutionTargetProviderImpl.java    | 57 +++++++++++++++++-----
 .../wrappers/DelegatingPlacementDriver.java        |  7 +--
 11 files changed, 129 insertions(+), 67 deletions(-)

diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
index 75e4bbb975..0b1337ddd8 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
+++ 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
@@ -23,7 +23,6 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -31,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManagerImpl;
@@ -318,53 +316,44 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
         HybridTimestamp nowTs = clockService.now();
         Set<String> required = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-22722 This method 
needs to be simplified.
-        return collectRequiredNodes(catalog, new 
ArrayList<>(catalog.tables()).iterator(), required, nowTs);
+        return CompletableFutures.allOf(catalog.tables().stream()
+                .map(table -> collectRequiredNodes(catalog, table, required, 
nowTs))
+                .collect(Collectors.toList())
+        ).thenApply(ignore -> required);
     }
 
-    private CompletableFuture<Set<String>> collectRequiredNodes(
+    private CompletableFuture<Void> collectRequiredNodes(
             Catalog catalog,
-            Iterator<CatalogTableDescriptor> tabItr,
+            CatalogTableDescriptor table,
             Set<String> required,
             HybridTimestamp nowTs
     ) {
-        if (!tabItr.hasNext()) {
-            return CompletableFuture.completedFuture(required);
-        }
-
-        CatalogTableDescriptor table = tabItr.next();
         CatalogZoneDescriptor zone = catalog.zone(table.zoneId());
 
         assert zone != null : table.zoneId();
 
-        List<CompletableFuture<?>> partitionFutures = new 
ArrayList<>(zone.partitions());
+        int partitions = zone.partitions();
 
-        for (int p = 0; p < zone.partitions(); p++) {
-            ReplicationGroupId replicationGroupId = new 
TablePartitionId(table.id(), p);
+        List<ReplicationGroupId> replicationGroupIds = new 
ArrayList<>(partitions);
 
-            CompletableFuture<TokenizedAssignments> assignmentsFut = 
placementDriver.getAssignments(replicationGroupId, nowTs)
-                    .whenComplete((tokenizedAssignments, ex) -> {
-                        if (ex != null) {
-                            return;
-                        }
+        for (int p = 0; p < partitions; p++) {
+            replicationGroupIds.add(new TablePartitionId(table.id(), p));
+        }
+
+        return placementDriver.getAssignments(replicationGroupIds, nowTs)
+                .thenAccept(tokenizedAssignments -> {
+                    assert tokenizedAssignments.size() == 
replicationGroupIds.size();
 
-                        if (tokenizedAssignments == null) {
+                    for (int p = 0; p < partitions; p++) {
+                        TokenizedAssignments assignment = 
tokenizedAssignments.get(p);
+                        if (assignment == null) {
                             throw new IllegalStateException("Cannot get 
assignments for table " + table.name()
-                                    + " (replication group=" + 
replicationGroupId + ").");
+                                    + " (replication group=" + 
replicationGroupIds.get(p) + ").");
                         }
 
-                        List<String> assignments = 
tokenizedAssignments.nodes().stream()
-                                .map(Assignment::consistentId)
-                                .collect(Collectors.toList());
-
-                        required.addAll(assignments);
-                    });
-
-            partitionFutures.add(assignmentsFut);
-        }
-
-        return CompletableFutures.allOf(partitionFutures)
-                .thenCompose(ignore -> collectRequiredNodes(catalog, tabItr, 
required, nowTs));
+                        assignment.nodes().forEach(a -> 
required.add(a.consistentId()));
+                    }
+                });
     }
 
     private static List<String> missingNodes(Set<String> requiredNodes, 
Collection<LogicalNode> logicalTopologyNodes) {
diff --git 
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
index c322e8c233..bb56812acf 100644
--- 
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
+++ 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
@@ -55,11 +55,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.catalog.CatalogTestUtils.TestCommand;
+import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
 import org.apache.ignite.internal.catalog.commands.CreateTableCommandBuilder;
 import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
@@ -83,7 +85,6 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.hamcrest.Matchers;
@@ -324,10 +325,12 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
                 logicalNodes
         );
 
-        when(placementDriver.getAssignments(any(), 
any())).thenReturn(CompletableFuture.failedFuture(new ArithmeticException()));
+        when(placementDriver.getAssignments(any(List.class), 
any())).thenReturn(CompletableFuture.failedFuture(new ArithmeticException()));
         assertThat(compactor.triggerCompaction(clockService.now()), 
willThrow(ArithmeticException.class));
 
-        when(placementDriver.getAssignments(any(), 
any())).thenReturn(CompletableFutures.nullCompletedFuture());
+        List<?> assignments = IntStream.range(0, 
CatalogUtils.DEFAULT_PARTITION_COUNT).mapToObj(i -> 
null).collect(Collectors.toList());
+
+        when(placementDriver.getAssignments(any(List.class), 
any())).thenReturn(CompletableFuture.completedFuture(assignments));
         assertThat(compactor.triggerCompaction(clockService.now()), 
willThrow(IllegalStateException.class));
     }
 
@@ -374,8 +377,11 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
                 .map(node -> Assignment.forPeer(node.name()))
                 .collect(Collectors.toSet());
 
-        TokenizedAssignmentsImpl tokenizedAssignments = new 
TokenizedAssignmentsImpl(assignments, Long.MAX_VALUE);
-        when(placementDriver.getAssignments(any(), 
any())).thenReturn(CompletableFuture.completedFuture(tokenizedAssignments));
+        List<?> tableAssignments = IntStream.range(0, 
CatalogUtils.DEFAULT_PARTITION_COUNT)
+                .mapToObj(i -> new TokenizedAssignmentsImpl(assignments, 
Long.MAX_VALUE))
+                .collect(Collectors.toList());
+
+        when(placementDriver.getAssignments(any(List.class), 
any())).thenReturn(CompletableFuture.completedFuture(tableAssignments));
 
         LogicalTopologySnapshot logicalTop = new LogicalTopologySnapshot(1, 
topology);
 
diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 54da3e4648..8f1bdc5835 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -106,8 +106,8 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
         return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index fb81f9cee2..f0650073ca 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.index;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -58,8 +59,8 @@ class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
         return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
index 51b207f66b..aa2c0bcc15 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.partition.replicator.utils;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.affinity.TokenizedAssignments;
@@ -67,8 +68,8 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
         return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
index 641ab117ec..164ee37d59 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.placementdriver;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -39,5 +40,24 @@ public interface AssignmentsPlacementDriver {
      * @param clusterTimeToAwait Cluster time to await.
      * @return Tokenized assignments.
      */
-    CompletableFuture<TokenizedAssignments> getAssignments(ReplicationGroupId 
replicationGroupId, HybridTimestamp clusterTimeToAwait);
+    default CompletableFuture<TokenizedAssignments> getAssignments(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp clusterTimeToAwait
+    ) {
+        return getAssignments(List.of(replicationGroupId), 
clusterTimeToAwait).thenApply(assignments -> assignments.get(0));
+    }
+
+    /**
+     * Returns the future with list of tokenized assignments, which contains 
newest available tokenized assignment for the specified
+     * replication group id or {@code null} if assignment for the replication 
group id was not found. The future will be completed after
+     * clusterTime (meta storage safe time) will become greater or equal to 
the clusterTimeToAwait parameter.
+     *
+     * @param replicationGroupIds List of replication group Ids.
+     * @param clusterTimeToAwait Cluster time to await.
+     * @return List of tokenized assignments.
+     */
+    CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
+            HybridTimestamp clusterTimeToAwait
+    );
 }
diff --git 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 3392cef934..d91e9499ff 100644
--- 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -85,8 +86,8 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
         return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index b71a9f7f4c..3f49ca826c 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.StringUtils.incrementLastChar;
 
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -129,14 +130,20 @@ public class AssignmentsTracker implements 
AssignmentsPlacementDriver {
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
         return msManager
                 .clusterTime()
                 .waitFor(clusterTimeToAwait)
-                .thenApply(ignored -> inBusyLock(busyLock, () -> 
assignments().get(replicationGroupId)));
+                .thenApply(ignored -> inBusyLock(busyLock, () -> {
+                    Map<ReplicationGroupId, TokenizedAssignments> assignments 
= assignments();
+
+                    return replicationGroupIds.stream()
+                            .map(assignments::get)
+                            .collect(Collectors.toList());
+                }));
     }
 
     /**
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index a798c7a72a..e7bae39bd6 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -272,11 +273,11 @@ public class PlacementDriverManager implements 
IgniteComponent {
     private PlacementDriver createPlacementDriver() {
         return new PlacementDriver() {
             @Override
-            public CompletableFuture<TokenizedAssignments> getAssignments(
-                    ReplicationGroupId replicationGroupId,
+            public CompletableFuture<List<TokenizedAssignments>> 
getAssignments(
+                    List<? extends ReplicationGroupId> replicationGroupIds,
                     HybridTimestamp timestamp
             ) {
-                return assignmentsTracker.getAssignments(replicationGroupId, 
timestamp);
+                return assignmentsTracker.getAssignments(replicationGroupIds, 
timestamp);
             }
 
             @Override
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
index 3992941bb3..c9b961080f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
@@ -26,6 +26,8 @@ import static 
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -104,15 +106,23 @@ public class ExecutionTargetProviderImpl implements 
ExecutionTargetProvider {
     ) {
         int partitions = table.partitions();
 
+        if (includeBackups) {
+            List<TablePartitionId> replicationGroupIds = new 
ArrayList<>(partitions);
+
+            for (int p = 0; p < partitions; p++) {
+                replicationGroupIds.add(new TablePartitionId(table.id(), p));
+            }
+
+            return allReplicas(replicationGroupIds, operationTime);
+        }
+
         List<CompletableFuture<TokenizedAssignments>> result = new 
ArrayList<>(partitions);
 
         // no need to wait all partitions after pruning was implemented.
         for (int partId = 0; partId < partitions; ++partId) {
             ReplicationGroupId partGroupId = new TablePartitionId(table.id(), 
partId);
 
-            CompletableFuture<TokenizedAssignments> partitionAssignment = 
includeBackups
-                    ? allReplicas(partGroupId, operationTime)
-                    : primaryReplica(partGroupId, operationTime);
+            CompletableFuture<TokenizedAssignments> partitionAssignment = 
primaryReplica(partGroupId, operationTime);
 
             result.add(partitionAssignment);
         }
@@ -152,22 +162,47 @@ public class ExecutionTargetProviderImpl implements 
ExecutionTargetProvider {
         });
     }
 
-    private CompletableFuture<TokenizedAssignments> allReplicas(
-            ReplicationGroupId replicationGroupId,
+    private CompletableFuture<List<TokenizedAssignments>> allReplicas(
+            List<TablePartitionId> replicationGroupIds,
             HybridTimestamp operationTime
     ) {
-        CompletableFuture<TokenizedAssignments> f = 
placementDriver.getAssignments(
-                replicationGroupId,
+        CompletableFuture<List<TokenizedAssignments>> f = 
placementDriver.getAssignments(
+                replicationGroupIds,
                 operationTime
         );
 
         return f.thenCompose(assignments -> {
-            if (assignments == null) {
-                // assignments are not ready yet, let's fall back to primary 
replicas
-                return primaryReplica(replicationGroupId, operationTime);
+            // Collect missed assignments indexes if found.
+            IntList missedAssignments = new IntArrayList(0);
+
+            for (int i = 0; i < assignments.size(); i++) {
+                if (assignments.get(i) == null) {
+                    missedAssignments.add(i);
+                }
             }
 
-            return completedFuture(assignments);
+            if (missedAssignments.isEmpty()) {
+                return completedFuture(assignments);
+            }
+
+            // assignments are not ready yet, let's fall back to primary 
replicas
+            List<CompletableFuture<TokenizedAssignments>> 
primaryReplicaAssignment = new ArrayList<>(missedAssignments.size());
+
+            for (int i = 0; i < missedAssignments.size(); i++) {
+                
primaryReplicaAssignment.add(primaryReplica(replicationGroupIds.get(missedAssignments.getInt(i)),
 operationTime));
+            }
+
+            CompletableFuture<Void> all = 
CompletableFuture.allOf(primaryReplicaAssignment.toArray(new 
CompletableFuture[0]));
+            return all.thenApply(ignore -> {
+                // Creates a mutable copy and replace missed assignments with 
primary replicas.
+                List<TokenizedAssignments> finalAssignments = new 
ArrayList<>(assignments);
+
+                for (int i = 0; i < missedAssignments.size(); i++) {
+                    finalAssignments.set(missedAssignments.getInt(i), 
primaryReplicaAssignment.get(i).join());
+                }
+
+                return finalAssignments;
+            });
         });
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
index eca347e990..c5989ec45a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.wrappers;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.affinity.TokenizedAssignments;
@@ -65,10 +66,10 @@ abstract class DelegatingPlacementDriver implements 
PlacementDriver {
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
-        return delegate.getAssignments(replicationGroupId, clusterTimeToAwait);
+        return delegate.getAssignments(replicationGroupIds, 
clusterTimeToAwait);
     }
 }

Reply via email to