This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch ignite-2.12
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.12 by this push:
new 653ac24 IGNITE-16177 Fix the restore snapshot on not all affinity
partitions are physically present (#9679)
653ac24 is described below
commit 653ac2478ce9da3ab8bec62fa425809c1bdec634
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Thu Dec 23 14:37:07 2021 +0300
IGNITE-16177 Fix the restore snapshot on not all affinity partitions are
physically present (#9679)
---
.../snapshot/AbstractSnapshotVerificationTask.java | 42 +++++++++++------
.../snapshot/SnapshotRestoreProcess.java | 55 +++++++++++-----------
.../IgniteClusterSnapshotRestoreSelfTest.java | 16 +++++++
3 files changed, 72 insertions(+), 41 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
index 1f2a48a..addf246 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
@@ -63,21 +64,11 @@ public abstract class AbstractSnapshotVerificationTask
extends
Set<SnapshotMetadata> allMetas = new HashSet<>();
clusterMetas.values().forEach(allMetas::addAll);
- Set<String> missed = null;
-
- for (SnapshotMetadata meta : allMetas) {
- if (missed == null)
- missed = new HashSet<>(meta.baselineNodes());
-
- missed.remove(meta.consistentId());
-
- if (missed.isEmpty())
- break;
+ try {
+ checkMissedMetadata(allMetas);
}
-
- if (!missed.isEmpty()) {
- throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
- new IgniteException("Some metadata is missing from the
snapshot: " + missed)));
+ catch (IgniteCheckedException e) {
+ throw new
IgniteSnapshotVerifyException(F.asMap(ignite.localNode(), new
IgniteException(e.getMessage())));
}
metas.putAll(clusterMetas);
@@ -107,6 +98,29 @@ public abstract class AbstractSnapshotVerificationTask
extends
}
/**
+ * Ensures that all parts of the snapshot are available according to the
metadata.
+ *
+ * @param clusterMetas List of snapshot metadata found in the cluster.
+ * @throws IgniteCheckedException If some metadata is missing.
+ */
+ public static void checkMissedMetadata(Collection<SnapshotMetadata>
clusterMetas) throws IgniteCheckedException {
+ Set<String> missed = null;
+
+ for (SnapshotMetadata meta : clusterMetas) {
+ if (missed == null)
+ missed = new HashSet<>(meta.baselineNodes());
+
+ missed.remove(meta.consistentId());
+
+ if (missed.isEmpty())
+ break;
+ }
+
+ if (!missed.isEmpty())
+ throw new IgniteCheckedException("Some metadata is missing from
the snapshot: " + missed);
+ }
+
+ /**
* @param name Snapshot name.
* @param constId Snapshot metadata file name.
* @param groups Cache groups to be restored from the snapshot. May be
empty if all cache groups are being restored.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index c796299..8f7092a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -36,15 +36,12 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
-import java.util.function.IntFunction;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteIllegalStateException;
@@ -620,7 +617,7 @@ public class SnapshotRestoreProcess {
DiscoCache discoCache0 = discoCache.copy(discoCache.version(), null);
- if (F.first(metas) == null)
+ if (F.isEmpty(metas))
return new SnapshotRestoreContext(req, discoCache0,
Collections.emptyMap(), cctx.localNodeId(), Collections.emptyList());
if (F.first(metas).pageSize() != cctx.database().pageSize()) {
@@ -797,6 +794,11 @@ public class SnapshotRestoreProcess {
if (ctx.isStopping())
throw new NodeStoppingException("Node is stopping: " +
ctx.localNodeId());
+ Set<SnapshotMetadata> allMetas =
+
opCtx0.metasPerNode.values().stream().flatMap(List::stream).collect(Collectors.toSet());
+
+ AbstractSnapshotVerificationTask.checkMissedMetadata(allMetas);
+
IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
synchronized (this) {
@@ -827,8 +829,10 @@ public class SnapshotRestoreProcess {
}
}, snpMgr.snapshotExecutorService()) :
CompletableFuture.completedFuture(null);
+ Map<String, GridAffinityAssignmentCache> affCache = new
HashMap<>();
+
for (StoredCacheData data : opCtx0.cfgs.values()) {
-
opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+ affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
grp -> calculateAffinity(ctx, data.config(),
opCtx0.discoCache));
}
@@ -847,10 +851,26 @@ public class SnapshotRestoreProcess {
Set<PartitionRestoreFuture> leftParts;
- opCtx0.locProgress.put(grpId,
-
nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode,
PartitionRestoreFuture::new));
+ // Partitions contained in the snapshot.
+ Set<Integer> availParts = new HashSet<>();
+
+ for (SnapshotMetadata meta : allMetas) {
+ Set<Integer> parts = meta.partitions().get(grpId);
+
+ if (parts != null)
+ availParts.addAll(parts);
+ }
+
+ List<List<ClusterNode>> assignment =
affCache.get(cacheOrGrpName).idealAssignment().assignment();
+ Set<PartitionRestoreFuture> partFuts = availParts
+ .stream()
+ .filter(p -> p != INDEX_PARTITION &&
assignment.get(p).contains(locNode))
+ .map(PartitionRestoreFuture::new)
+ .collect(Collectors.toSet());
- rmtLoadParts.put(grpId, leftParts = new
HashSet<>(opCtx0.locProgress.get(grpId)));
+ opCtx0.locProgress.put(grpId, partFuts);
+
+ rmtLoadParts.put(grpId, leftParts = new HashSet<>(partFuts));
if (leftParts.isEmpty())
continue;
@@ -1302,22 +1322,6 @@ public class SnapshotRestoreProcess {
}
/**
- * @param affCache Affinity cache.
- * @param node Cluster node to get assigned partitions.
- * @return The set of partitions assigned to the given node.
- */
- private static <T> Set<T> nodeAffinityPartitions(
- GridAffinityAssignmentCache affCache,
- ClusterNode node,
- IntFunction<T> factory
- ) {
- return IntStream.range(0, affCache.partitions())
- .filter(p ->
affCache.idealAssignment().assignment().get(p).contains(node))
- .mapToObj(factory)
- .collect(Collectors.toSet());
- }
-
- /**
* @param col Collection of sets to complete.
* @param ex Exception to set.
*/
@@ -1381,9 +1385,6 @@ public class SnapshotRestoreProcess {
*/
private final GridFutureAdapter<Void> locStopCachesCompleteFut = new
GridFutureAdapter<>();
- /** Calculated affinity assignment cache per each cache group. */
- private final Map<String, GridAffinityAssignmentCache> affCache = new
ConcurrentHashMap<>();
-
/** Cache ID to configuration mapping. */
private volatile Map<Integer, StoredCacheData> cfgs;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index b6361e1..af70b8e 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -101,6 +101,22 @@ public class IgniteClusterSnapshotRestoreSelfTest extends
IgniteClusterSnapshotR
}
/**
+ * Checks snapshot restore if not all "affinity" partitions have been
physically created.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRestoreWithEmptyPartitions() throws Exception {
+ int keysCnt = dfltCacheCfg.getAffinity().partitions() / 2;
+
+ Ignite ignite = startGridsWithSnapshot(1, keysCnt, false);
+
+ ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+
+ assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), keysCnt);
+ }
+
+ /**
* Ensures that system partition verification task is invoked before
restoring the snapshot.
*
* @throws Exception If failed.