This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch ignite-2.12
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-2.12 by this push:
     new 653ac24  IGNITE-16177 Fix the restore snapshot on not all affinity 
partitions are physically present (#9679)
653ac24 is described below

commit 653ac2478ce9da3ab8bec62fa425809c1bdec634
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Thu Dec 23 14:37:07 2021 +0300

    IGNITE-16177 Fix the restore snapshot on not all affinity partitions are 
physically present (#9679)
---
 .../snapshot/AbstractSnapshotVerificationTask.java | 42 +++++++++++------
 .../snapshot/SnapshotRestoreProcess.java           | 55 +++++++++++-----------
 .../IgniteClusterSnapshotRestoreSelfTest.java      | 16 +++++++
 3 files changed, 72 insertions(+), 41 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
index 1f2a48a..addf246 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
@@ -63,21 +64,11 @@ public abstract class AbstractSnapshotVerificationTask 
extends
         Set<SnapshotMetadata> allMetas = new HashSet<>();
         clusterMetas.values().forEach(allMetas::addAll);
 
-        Set<String> missed = null;
-
-        for (SnapshotMetadata meta : allMetas) {
-            if (missed == null)
-                missed = new HashSet<>(meta.baselineNodes());
-
-            missed.remove(meta.consistentId());
-
-            if (missed.isEmpty())
-                break;
+        try {
+            checkMissedMetadata(allMetas);
         }
-
-        if (!missed.isEmpty()) {
-            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
-                new IgniteException("Some metadata is missing from the 
snapshot: " + missed)));
+        catch (IgniteCheckedException e) {
+            throw new 
IgniteSnapshotVerifyException(F.asMap(ignite.localNode(), new 
IgniteException(e.getMessage())));
         }
 
         metas.putAll(clusterMetas);
@@ -107,6 +98,29 @@ public abstract class AbstractSnapshotVerificationTask 
extends
     }
 
     /**
+     * Ensures that all parts of the snapshot are available according to the 
metadata.
+     *
+     * @param clusterMetas List of snapshot metadata found in the cluster.
+     * @throws IgniteCheckedException If some metadata is missing.
+     */
+    public static void checkMissedMetadata(Collection<SnapshotMetadata> 
clusterMetas) throws IgniteCheckedException {
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : clusterMetas) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty())
+            throw new IgniteCheckedException("Some metadata is missing from 
the snapshot: " + missed);
+    }
+
+    /**
      * @param name Snapshot name.
      * @param constId Snapshot metadata file name.
      * @param groups Cache groups to be restored from the snapshot. May be 
empty if all cache groups are being restored.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index c796299..8f7092a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -36,15 +36,12 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
-import java.util.function.IntFunction;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteIllegalStateException;
@@ -620,7 +617,7 @@ public class SnapshotRestoreProcess {
 
         DiscoCache discoCache0 = discoCache.copy(discoCache.version(), null);
 
-        if (F.first(metas) == null)
+        if (F.isEmpty(metas))
             return new SnapshotRestoreContext(req, discoCache0, 
Collections.emptyMap(), cctx.localNodeId(), Collections.emptyList());
 
         if (F.first(metas).pageSize() != cctx.database().pageSize()) {
@@ -797,6 +794,11 @@ public class SnapshotRestoreProcess {
             if (ctx.isStopping())
                 throw new NodeStoppingException("Node is stopping: " + 
ctx.localNodeId());
 
+            Set<SnapshotMetadata> allMetas =
+                
opCtx0.metasPerNode.values().stream().flatMap(List::stream).collect(Collectors.toSet());
+
+            AbstractSnapshotVerificationTask.checkMissedMetadata(allMetas);
+
             IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
 
             synchronized (this) {
@@ -827,8 +829,10 @@ public class SnapshotRestoreProcess {
                         }
                     }, snpMgr.snapshotExecutorService()) : 
CompletableFuture.completedFuture(null);
 
+            Map<String, GridAffinityAssignmentCache> affCache = new 
HashMap<>();
+
             for (StoredCacheData data : opCtx0.cfgs.values()) {
-                
opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+                affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
                     grp -> calculateAffinity(ctx, data.config(), 
opCtx0.discoCache));
             }
 
@@ -847,10 +851,26 @@ public class SnapshotRestoreProcess {
 
                 Set<PartitionRestoreFuture> leftParts;
 
-                opCtx0.locProgress.put(grpId,
-                    
nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode, 
PartitionRestoreFuture::new));
+                // Partitions contained in the snapshot.
+                Set<Integer> availParts = new HashSet<>();
+
+                for (SnapshotMetadata meta : allMetas) {
+                    Set<Integer> parts = meta.partitions().get(grpId);
+
+                    if (parts != null)
+                        availParts.addAll(parts);
+                }
+
+                List<List<ClusterNode>> assignment = 
affCache.get(cacheOrGrpName).idealAssignment().assignment();
+                Set<PartitionRestoreFuture> partFuts = availParts
+                    .stream()
+                    .filter(p -> p != INDEX_PARTITION && 
assignment.get(p).contains(locNode))
+                    .map(PartitionRestoreFuture::new)
+                    .collect(Collectors.toSet());
 
-                rmtLoadParts.put(grpId, leftParts = new 
HashSet<>(opCtx0.locProgress.get(grpId)));
+                opCtx0.locProgress.put(grpId, partFuts);
+
+                rmtLoadParts.put(grpId, leftParts = new HashSet<>(partFuts));
 
                 if (leftParts.isEmpty())
                     continue;
@@ -1302,22 +1322,6 @@ public class SnapshotRestoreProcess {
     }
 
     /**
-     * @param affCache Affinity cache.
-     * @param node Cluster node to get assigned partitions.
-     * @return The set of partitions assigned to the given node.
-     */
-    private static <T> Set<T> nodeAffinityPartitions(
-        GridAffinityAssignmentCache affCache,
-        ClusterNode node,
-        IntFunction<T> factory
-    ) {
-        return IntStream.range(0, affCache.partitions())
-            .filter(p -> 
affCache.idealAssignment().assignment().get(p).contains(node))
-            .mapToObj(factory)
-            .collect(Collectors.toSet());
-    }
-
-    /**
      * @param col Collection of sets to complete.
      * @param ex Exception to set.
      */
@@ -1381,9 +1385,6 @@ public class SnapshotRestoreProcess {
          */
         private final GridFutureAdapter<Void> locStopCachesCompleteFut = new 
GridFutureAdapter<>();
 
-        /** Calculated affinity assignment cache per each cache group. */
-        private final Map<String, GridAffinityAssignmentCache> affCache = new 
ConcurrentHashMap<>();
-
         /** Cache ID to configuration mapping. */
         private volatile Map<Integer, StoredCacheData> cfgs;
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index b6361e1..af70b8e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -101,6 +101,22 @@ public class IgniteClusterSnapshotRestoreSelfTest extends 
IgniteClusterSnapshotR
     }
 
     /**
+     * Checks snapshot restore if not all "affinity" partitions have been 
physically created.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRestoreWithEmptyPartitions() throws Exception {
+        int keysCnt = dfltCacheCfg.getAffinity().partitions() / 2;
+
+        Ignite ignite = startGridsWithSnapshot(1, keysCnt, false);
+
+        ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+
+        assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), keysCnt);
+    }
+
+    /**
      * Ensures that system partition verification task is invoked before 
restoring the snapshot.
      *
      * @throws Exception If failed.

Reply via email to