This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-22722
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-22722 by this push:
     new 3c31ecf692 Use bulk method for getting assignments for backups.
3c31ecf692 is described below

commit 3c31ecf692def5bdae8202cd5470f8437b500554
Author: amashenkov <[email protected]>
AuthorDate: Thu Aug 8 13:43:20 2024 +0300

    Use bulk method for getting assignments for backups.
---
 .../sql/engine/ExecutionTargetProviderImpl.java    | 55 +++++++++++++++++-----
 1 file changed, 44 insertions(+), 11 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
index 3992941bb3..a00f7bdc58 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
@@ -26,6 +26,8 @@ import static 
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -104,15 +106,23 @@ public class ExecutionTargetProviderImpl implements 
ExecutionTargetProvider {
     ) {
         int partitions = table.partitions();
 
+        if (includeBackups) {
+            List<TablePartitionId> replicationGroupIds = new 
ArrayList<>(partitions);
+
+            for (int p = 0; p < partitions; p++) {
+                replicationGroupIds.add(new TablePartitionId(table.id(), p));
+            }
+
+            return allReplicas(replicationGroupIds, operationTime);
+        }
+
         List<CompletableFuture<TokenizedAssignments>> result = new 
ArrayList<>(partitions);
 
         // no need to wait all partitions after pruning was implemented.
         for (int partId = 0; partId < partitions; ++partId) {
             ReplicationGroupId partGroupId = new TablePartitionId(table.id(), 
partId);
 
-            CompletableFuture<TokenizedAssignments> partitionAssignment = 
includeBackups
-                    ? allReplicas(partGroupId, operationTime)
-                    : primaryReplica(partGroupId, operationTime);
+            CompletableFuture<TokenizedAssignments> partitionAssignment = 
primaryReplica(partGroupId, operationTime);
 
             result.add(partitionAssignment);
         }
@@ -152,22 +162,45 @@ public class ExecutionTargetProviderImpl implements 
ExecutionTargetProvider {
         });
     }
 
-    private CompletableFuture<TokenizedAssignments> allReplicas(
-            ReplicationGroupId replicationGroupId,
+    private CompletableFuture<List<TokenizedAssignments>> allReplicas(
+            List<TablePartitionId> replicationGroupIds,
             HybridTimestamp operationTime
     ) {
-        CompletableFuture<TokenizedAssignments> f = 
placementDriver.getAssignments(
-                replicationGroupId,
+        CompletableFuture<List<TokenizedAssignments>> f = 
placementDriver.getAssignments(
+                replicationGroupIds,
                 operationTime
         );
 
         return f.thenCompose(assignments -> {
-            if (assignments == null) {
-                // assignments are not ready yet, let's fall back to primary 
replicas
-                return primaryReplica(replicationGroupId, operationTime);
+            // Collect missed assignments indexes if found.
+            IntList missedAssignments = new IntArrayList(0);
+
+            for (int i = 0; i < assignments.size(); i++) {
+                if (assignments.get(i) == null) {
+                    missedAssignments.add(i);
+                }
+            }
+
+            if (missedAssignments.isEmpty()) {
+                return completedFuture(assignments);
             }
 
-            return completedFuture(assignments);
+            // assignments are not ready yet, let's fall back to primary 
replicas
+            List<CompletableFuture<TokenizedAssignments>> 
primaryReplicaAssignment = new ArrayList<>(missedAssignments.size());
+
+            for (int i = 0; i < missedAssignments.size(); i++) {
+                
primaryReplicaAssignment.add(primaryReplica(replicationGroupIds.get(missedAssignments.getInt(i)),
 operationTime));
+            }
+
+            CompletableFuture<Void> all = 
CompletableFuture.allOf(primaryReplicaAssignment.toArray(new 
CompletableFuture[0]));
+            return all.thenApply(ignore -> {
+                // Replace missed assignments with primary replicas.
+                for (int i = 0; i < missedAssignments.size(); i++) {
+                    assignments.set(missedAssignments.getInt(i), 
primaryReplicaAssignment.get(i).join());
+                }
+
+                return assignments;
+            });
         });
     }
 }

Reply via email to