This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6beba4153a IGNITE-22722 Add the ability to get assignments for a list
of replication groups at once (#4206)
6beba4153a is described below
commit 6beba4153ada043991311bde4425b24f12fc1247
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Wed Aug 14 17:08:31 2024 +0300
IGNITE-22722 Add the ability to get assignments for a list of replication
groups at once (#4206)
---
.../compaction/CatalogCompactionRunner.java | 55 +++++++++------------
.../CatalogCompactionRunnerSelfTest.java | 16 ++++--
.../ignite/client/handler/FakePlacementDriver.java | 4 +-
.../ignite/internal/index/TestPlacementDriver.java | 5 +-
.../replicator/utils/TestPlacementDriver.java | 5 +-
.../AssignmentsPlacementDriver.java | 22 ++++++++-
.../placementdriver/TestPlacementDriver.java | 5 +-
.../placementdriver/AssignmentsTracker.java | 13 +++--
.../placementdriver/PlacementDriverManager.java | 7 +--
.../sql/engine/ExecutionTargetProviderImpl.java | 57 +++++++++++++++++-----
.../wrappers/DelegatingPlacementDriver.java | 7 +--
11 files changed, 129 insertions(+), 67 deletions(-)
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
index 75e4bbb975..0b1337ddd8 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
+++
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -31,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
@@ -318,53 +316,44 @@ public class CatalogCompactionRunner implements
IgniteComponent {
HybridTimestamp nowTs = clockService.now();
Set<String> required = Collections.newSetFromMap(new
ConcurrentHashMap<>());
- // TODO https://issues.apache.org/jira/browse/IGNITE-22722 This method
needs to be simplified.
- return collectRequiredNodes(catalog, new
ArrayList<>(catalog.tables()).iterator(), required, nowTs);
+ return CompletableFutures.allOf(catalog.tables().stream()
+ .map(table -> collectRequiredNodes(catalog, table, required,
nowTs))
+ .collect(Collectors.toList())
+ ).thenApply(ignore -> required);
}
- private CompletableFuture<Set<String>> collectRequiredNodes(
+ private CompletableFuture<Void> collectRequiredNodes(
Catalog catalog,
- Iterator<CatalogTableDescriptor> tabItr,
+ CatalogTableDescriptor table,
Set<String> required,
HybridTimestamp nowTs
) {
- if (!tabItr.hasNext()) {
- return CompletableFuture.completedFuture(required);
- }
-
- CatalogTableDescriptor table = tabItr.next();
CatalogZoneDescriptor zone = catalog.zone(table.zoneId());
assert zone != null : table.zoneId();
- List<CompletableFuture<?>> partitionFutures = new
ArrayList<>(zone.partitions());
+ int partitions = zone.partitions();
- for (int p = 0; p < zone.partitions(); p++) {
- ReplicationGroupId replicationGroupId = new
TablePartitionId(table.id(), p);
+ List<ReplicationGroupId> replicationGroupIds = new
ArrayList<>(partitions);
- CompletableFuture<TokenizedAssignments> assignmentsFut =
placementDriver.getAssignments(replicationGroupId, nowTs)
- .whenComplete((tokenizedAssignments, ex) -> {
- if (ex != null) {
- return;
- }
+ for (int p = 0; p < partitions; p++) {
+ replicationGroupIds.add(new TablePartitionId(table.id(), p));
+ }
+
+ return placementDriver.getAssignments(replicationGroupIds, nowTs)
+ .thenAccept(tokenizedAssignments -> {
+ assert tokenizedAssignments.size() ==
replicationGroupIds.size();
- if (tokenizedAssignments == null) {
+ for (int p = 0; p < partitions; p++) {
+ TokenizedAssignments assignment =
tokenizedAssignments.get(p);
+ if (assignment == null) {
throw new IllegalStateException("Cannot get
assignments for table " + table.name()
- + " (replication group=" +
replicationGroupId + ").");
+ + " (replication group=" +
replicationGroupIds.get(p) + ").");
}
- List<String> assignments =
tokenizedAssignments.nodes().stream()
- .map(Assignment::consistentId)
- .collect(Collectors.toList());
-
- required.addAll(assignments);
- });
-
- partitionFutures.add(assignmentsFut);
- }
-
- return CompletableFutures.allOf(partitionFutures)
- .thenCompose(ignore -> collectRequiredNodes(catalog, tabItr,
required, nowTs));
+ assignment.nodes().forEach(a ->
required.add(a.consistentId()));
+ }
+ });
}
private static List<String> missingNodes(Set<String> requiredNodes,
Collection<LogicalNode> logicalTopologyNodes) {
diff --git
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
index c322e8c233..bb56812acf 100644
---
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
+++
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
@@ -55,11 +55,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.CatalogTestUtils.TestCommand;
+import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
import org.apache.ignite.internal.catalog.commands.CreateTableCommandBuilder;
import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
@@ -83,7 +85,6 @@ import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.hamcrest.Matchers;
@@ -324,10 +325,12 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
logicalNodes
);
- when(placementDriver.getAssignments(any(),
any())).thenReturn(CompletableFuture.failedFuture(new ArithmeticException()));
+ when(placementDriver.getAssignments(any(List.class),
any())).thenReturn(CompletableFuture.failedFuture(new ArithmeticException()));
assertThat(compactor.triggerCompaction(clockService.now()),
willThrow(ArithmeticException.class));
- when(placementDriver.getAssignments(any(),
any())).thenReturn(CompletableFutures.nullCompletedFuture());
+ List<?> assignments = IntStream.range(0,
CatalogUtils.DEFAULT_PARTITION_COUNT).mapToObj(i ->
null).collect(Collectors.toList());
+
+ when(placementDriver.getAssignments(any(List.class),
any())).thenReturn(CompletableFuture.completedFuture(assignments));
assertThat(compactor.triggerCompaction(clockService.now()),
willThrow(IllegalStateException.class));
}
@@ -374,8 +377,11 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
.map(node -> Assignment.forPeer(node.name()))
.collect(Collectors.toSet());
- TokenizedAssignmentsImpl tokenizedAssignments = new
TokenizedAssignmentsImpl(assignments, Long.MAX_VALUE);
- when(placementDriver.getAssignments(any(),
any())).thenReturn(CompletableFuture.completedFuture(tokenizedAssignments));
+ List<?> tableAssignments = IntStream.range(0,
CatalogUtils.DEFAULT_PARTITION_COUNT)
+ .mapToObj(i -> new TokenizedAssignmentsImpl(assignments,
Long.MAX_VALUE))
+ .collect(Collectors.toList());
+
+ when(placementDriver.getAssignments(any(List.class),
any())).thenReturn(CompletableFuture.completedFuture(tableAssignments));
LogicalTopologySnapshot logicalTop = new LogicalTopologySnapshot(1,
topology);
diff --git
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 54da3e4648..8f1bdc5835 100644
---
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -106,8 +106,8 @@ public class FakePlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
}
@Override
- public CompletableFuture<TokenizedAssignments> getAssignments(
- ReplicationGroupId replicationGroupId,
+ public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait
) {
return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index fb81f9cee2..f0650073ca 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -58,8 +59,8 @@ class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
}
@Override
- public CompletableFuture<TokenizedAssignments> getAssignments(
- ReplicationGroupId replicationGroupId,
+ public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait
) {
return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
index 51b207f66b..aa2c0bcc15 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.partition.replicator.utils;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.affinity.TokenizedAssignments;
@@ -67,8 +68,8 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
}
@Override
- public CompletableFuture<TokenizedAssignments> getAssignments(
- ReplicationGroupId replicationGroupId,
+ public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait
) {
return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
index 641ab117ec..164ee37d59 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.placementdriver;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -39,5 +40,24 @@ public interface AssignmentsPlacementDriver {
* @param clusterTimeToAwait Cluster time to await.
* @return Tokenized assignments.
*/
- CompletableFuture<TokenizedAssignments> getAssignments(ReplicationGroupId
replicationGroupId, HybridTimestamp clusterTimeToAwait);
+ default CompletableFuture<TokenizedAssignments> getAssignments(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp clusterTimeToAwait
+ ) {
+ return getAssignments(List.of(replicationGroupId),
clusterTimeToAwait).thenApply(assignments -> assignments.get(0));
+ }
+
+ /**
+ * Returns the future with list of tokenized assignments, which contains
newest available tokenized assignment for the specified
+ * replication group id or {@code null} if assignment for the replication
group id was not found. The future will be completed after
+ * clusterTime (meta storage safe time) will become greater or equal to
the clusterTimeToAwait parameter.
+ *
+ * @param replicationGroupIds List of replication group Ids.
+ * @param clusterTimeToAwait Cluster time to await.
+ * @return List of tokenized assignments.
+ */
+ CompletableFuture<List<TokenizedAssignments>> getAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
+ HybridTimestamp clusterTimeToAwait
+ );
}
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 3392cef934..d91e9499ff 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -21,6 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -85,8 +86,8 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
}
@Override
- public CompletableFuture<TokenizedAssignments> getAssignments(
- ReplicationGroupId replicationGroupId,
+ public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait
) {
return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index b71a9f7f4c..3f49ca826c 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.StringUtils.incrementLastChar;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -129,14 +130,20 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
}
@Override
- public CompletableFuture<TokenizedAssignments> getAssignments(
- ReplicationGroupId replicationGroupId,
+ public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait
) {
return msManager
.clusterTime()
.waitFor(clusterTimeToAwait)
- .thenApply(ignored -> inBusyLock(busyLock, () ->
assignments().get(replicationGroupId)));
+ .thenApply(ignored -> inBusyLock(busyLock, () -> {
+ Map<ReplicationGroupId, TokenizedAssignments> assignments
= assignments();
+
+ return replicationGroupIds.stream()
+ .map(assignments::get)
+ .collect(Collectors.toList());
+ }));
}
/**
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index a798c7a72a..e7bae39bd6 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -21,6 +21,7 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -272,11 +273,11 @@ public class PlacementDriverManager implements
IgniteComponent {
private PlacementDriver createPlacementDriver() {
return new PlacementDriver() {
@Override
- public CompletableFuture<TokenizedAssignments> getAssignments(
- ReplicationGroupId replicationGroupId,
+ public CompletableFuture<List<TokenizedAssignments>>
getAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp timestamp
) {
- return assignmentsTracker.getAssignments(replicationGroupId,
timestamp);
+ return assignmentsTracker.getAssignments(replicationGroupIds,
timestamp);
}
@Override
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
index 3992941bb3..c9b961080f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
@@ -26,6 +26,8 @@ import static
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -104,15 +106,23 @@ public class ExecutionTargetProviderImpl implements
ExecutionTargetProvider {
) {
int partitions = table.partitions();
+ if (includeBackups) {
+ List<TablePartitionId> replicationGroupIds = new
ArrayList<>(partitions);
+
+ for (int p = 0; p < partitions; p++) {
+ replicationGroupIds.add(new TablePartitionId(table.id(), p));
+ }
+
+ return allReplicas(replicationGroupIds, operationTime);
+ }
+
List<CompletableFuture<TokenizedAssignments>> result = new
ArrayList<>(partitions);
// no need to wait all partitions after pruning was implemented.
for (int partId = 0; partId < partitions; ++partId) {
ReplicationGroupId partGroupId = new TablePartitionId(table.id(),
partId);
- CompletableFuture<TokenizedAssignments> partitionAssignment =
includeBackups
- ? allReplicas(partGroupId, operationTime)
- : primaryReplica(partGroupId, operationTime);
+ CompletableFuture<TokenizedAssignments> partitionAssignment =
primaryReplica(partGroupId, operationTime);
result.add(partitionAssignment);
}
@@ -152,22 +162,47 @@ public class ExecutionTargetProviderImpl implements
ExecutionTargetProvider {
});
}
- private CompletableFuture<TokenizedAssignments> allReplicas(
- ReplicationGroupId replicationGroupId,
+ private CompletableFuture<List<TokenizedAssignments>> allReplicas(
+ List<TablePartitionId> replicationGroupIds,
HybridTimestamp operationTime
) {
- CompletableFuture<TokenizedAssignments> f =
placementDriver.getAssignments(
- replicationGroupId,
+ CompletableFuture<List<TokenizedAssignments>> f =
placementDriver.getAssignments(
+ replicationGroupIds,
operationTime
);
return f.thenCompose(assignments -> {
- if (assignments == null) {
- // assignments are not ready yet, let's fall back to primary
replicas
- return primaryReplica(replicationGroupId, operationTime);
+ // Collect missed assignments indexes if found.
+ IntList missedAssignments = new IntArrayList(0);
+
+ for (int i = 0; i < assignments.size(); i++) {
+ if (assignments.get(i) == null) {
+ missedAssignments.add(i);
+ }
}
- return completedFuture(assignments);
+ if (missedAssignments.isEmpty()) {
+ return completedFuture(assignments);
+ }
+
+ // assignments are not ready yet, let's fall back to primary
replicas
+ List<CompletableFuture<TokenizedAssignments>>
primaryReplicaAssignment = new ArrayList<>(missedAssignments.size());
+
+ for (int i = 0; i < missedAssignments.size(); i++) {
+
primaryReplicaAssignment.add(primaryReplica(replicationGroupIds.get(missedAssignments.getInt(i)),
operationTime));
+ }
+
+ CompletableFuture<Void> all =
CompletableFuture.allOf(primaryReplicaAssignment.toArray(new
CompletableFuture[0]));
+ return all.thenApply(ignore -> {
+ // Creates a mutable copy and replace missed assignments with
primary replicas.
+ List<TokenizedAssignments> finalAssignments = new
ArrayList<>(assignments);
+
+ for (int i = 0; i < missedAssignments.size(); i++) {
+ finalAssignments.set(missedAssignments.getInt(i),
primaryReplicaAssignment.get(i).join());
+ }
+
+ return finalAssignments;
+ });
});
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
index eca347e990..c5989ec45a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.wrappers;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.affinity.TokenizedAssignments;
@@ -65,10 +66,10 @@ abstract class DelegatingPlacementDriver implements
PlacementDriver {
}
@Override
- public CompletableFuture<TokenizedAssignments> getAssignments(
- ReplicationGroupId replicationGroupId,
+ public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait
) {
- return delegate.getAssignments(replicationGroupId, clusterTimeToAwait);
+ return delegate.getAssignments(replicationGroupIds,
clusterTimeToAwait);
}
}