This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch IGNITE-22662__snapshot_refactoring
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to
refs/heads/IGNITE-22662__snapshot_refactoring by this push:
new 66a3220c561 IGNITE-23058: Remove obsolete snapshot check tasks.
(#11583)
66a3220c561 is described below
commit 66a3220c561bf571a19ef16af9f1e944d7ff5b2a
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Oct 17 17:30:30 2024 +0300
IGNITE-23058: Remove obsolete snapshot check tasks. (#11583)
---
.../ignite/common/ComputeTaskPermissionsTest.java | 4 +-
.../apache/ignite/util/GridCommandHandlerTest.java | 11 +-
.../management/snapshot/SnapshotCheckCommand.java | 6 +-
.../management/snapshot/SnapshotCheckTask.java | 10 +-
.../snapshot/AbstractSnapshotVerificationTask.java | 156 ---------------------
.../snapshot/IgniteSnapshotManager.java | 66 ++-------
.../persistence/snapshot/SnapshotCheckProcess.java | 24 ++--
.../snapshot/SnapshotCheckProcessRequest.java | 13 ++
.../persistence/snapshot/SnapshotChecker.java | 7 +-
.../snapshot/SnapshotHandlerRestoreTask.java | 102 --------------
.../snapshot/SnapshotMetadataVerificationTask.java | 122 ----------------
.../SnapshotMetadataVerificationTaskArg.java | 118 ----------------
.../SnapshotMetadataVerificationTaskResult.java | 75 ----------
...lt.java => SnapshotPartitionsVerifyResult.java} | 6 +-
.../snapshot/SnapshotPartitionsVerifyTask.java | 125 -----------------
.../snapshot/SnapshotPartitionsVerifyTaskArg.java | 134 ------------------
.../snapshot/SnapshotRestoreProcess.java | 2 +-
.../main/resources/META-INF/classnames.properties | 9 --
.../snapshot/EncryptedSnapshotTest.java | 4 +-
.../snapshot/IgniteClusterSnapshotCheckTest.java | 70 +++------
.../IgniteClusterSnapshotRestoreSelfTest.java | 4 +-
.../IgniteClusterSnapshotStreamerTest.java | 14 +-
.../snapshot/IgniteSnapshotMXBeanTest.java | 7 +-
.../ConcurrentTxsIncrementalSnapshotTest.java | 4 +-
.../IncrementalSnapshotCheckBeforeRestoreTest.java | 10 +-
25 files changed, 108 insertions(+), 995 deletions(-)
diff --git
a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
index e3e337468ac..c5a6116b968 100644
---
a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
@@ -51,7 +51,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientConfiguration;
import org.apache.ignite.internal.client.GridClientFactory;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataVerificationTask;
+import
org.apache.ignite.internal.processors.cache.verify.CollectConflictPartitionKeysTask;
import org.apache.ignite.internal.processors.security.AbstractSecurityTest;
import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.PublicAccessJob;
@@ -112,7 +112,7 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
private static final IgniteClosure SYSTEM_CLOSURE = new
ToStringClosure<>();
/** */
- private static final ComputeTask SYSTEM_TASK = new
SnapshotMetadataVerificationTask();
+ private static final ComputeTask SYSTEM_TASK = new
CollectConflictPartitionKeysTask();
/** */
private static final AtomicInteger EXECUTED_TASK_CNTR = new
AtomicInteger();
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 51dd7ff9629..ae6d68b2bb8 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -104,7 +104,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandler;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyResult;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import
org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
@@ -173,6 +173,7 @@ import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.I
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.SNAPSHOT_RESTORE_METRICS;
import static
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.GRID_NOT_IDLE_MSG;
import static
org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.DEFAULT_TARGET_FOLDER;
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.assertNotContains;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
@@ -3347,7 +3348,7 @@ public class GridCommandHandlerTest extends
GridCommandHandlerClusterPerMethodAb
StringBuilder sb = new StringBuilder();
-
((SnapshotPartitionsVerifyTaskResult)h.getLastOperationResult()).print(sb::append);
+
((SnapshotPartitionsVerifyResult)h.getLastOperationResult()).print(sb::append);
assertContains(log, sb.toString(), "The check procedure has finished,
no conflicts have been found");
}
@@ -3832,7 +3833,8 @@ public class GridCommandHandlerTest extends
GridCommandHandlerClusterPerMethodAb
awaitPartitionMapExchange();
- spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage);
+ spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage
+ && ((SingleNodeMessage)msg).type() ==
RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());
fut = srv.snapshot().restoreSnapshot(snapshotName,
F.asList(DEFAULT_CACHE_NAME));
@@ -3851,7 +3853,8 @@ public class GridCommandHandlerTest extends
GridCommandHandlerClusterPerMethodAb
awaitPartitionMapExchange();
- spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage);
+ spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage
+ && ((SingleNodeMessage)msg).type() ==
RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());
fut = srv.snapshot().restoreSnapshot(snapshotName,
F.asList(DEFAULT_CACHE_NAME), 1);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotCheckCommand.java
b/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotCheckCommand.java
index 11f833819c0..3bccad3d225 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotCheckCommand.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotCheckCommand.java
@@ -18,10 +18,10 @@
package org.apache.ignite.internal.management.snapshot;
import java.util.function.Consumer;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyResult;
/** */
-public class SnapshotCheckCommand extends
AbstractSnapshotCommand<SnapshotCheckCommandArg,
SnapshotPartitionsVerifyTaskResult> {
+public class SnapshotCheckCommand extends
AbstractSnapshotCommand<SnapshotCheckCommandArg,
SnapshotPartitionsVerifyResult> {
/** {@inheritDoc} */
@Override public String description() {
return "Check snapshot";
@@ -38,7 +38,7 @@ public class SnapshotCheckCommand extends
AbstractSnapshotCommand<SnapshotCheckC
}
/** {@inheritDoc} */
- @Override public void printResult(SnapshotCheckCommandArg arg,
SnapshotPartitionsVerifyTaskResult res, Consumer<String> printer) {
+ @Override public void printResult(SnapshotCheckCommandArg arg,
SnapshotPartitionsVerifyResult res, Consumer<String> printer) {
res.print(printer);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotCheckTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotCheckTask.java
index 0d48dac64cc..73742c866e9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotCheckTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotCheckTask.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.management.snapshot;
import org.apache.ignite.IgniteException;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyResult;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.visor.VisorJob;
@@ -29,17 +29,17 @@ import org.apache.ignite.internal.visor.VisorOneNodeTask;
* @see IgniteSnapshotManager#checkSnapshot(String, String)
*/
@GridInternal
-public class SnapshotCheckTask extends
VisorOneNodeTask<SnapshotCheckCommandArg, SnapshotPartitionsVerifyTaskResult> {
+public class SnapshotCheckTask extends
VisorOneNodeTask<SnapshotCheckCommandArg, SnapshotPartitionsVerifyResult> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override protected VisorJob<SnapshotCheckCommandArg,
SnapshotPartitionsVerifyTaskResult> job(SnapshotCheckCommandArg arg) {
+ @Override protected VisorJob<SnapshotCheckCommandArg,
SnapshotPartitionsVerifyResult> job(SnapshotCheckCommandArg arg) {
return new SnapshotCheckJob(arg, debug);
}
/** */
- private static class SnapshotCheckJob extends
SnapshotJob<SnapshotCheckCommandArg, SnapshotPartitionsVerifyTaskResult> {
+ private static class SnapshotCheckJob extends
SnapshotJob<SnapshotCheckCommandArg, SnapshotPartitionsVerifyResult> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -52,7 +52,7 @@ public class SnapshotCheckTask extends
VisorOneNodeTask<SnapshotCheckCommandArg,
}
/** {@inheritDoc} */
- @Override protected SnapshotPartitionsVerifyTaskResult
run(SnapshotCheckCommandArg arg) throws IgniteException {
+ @Override protected SnapshotPartitionsVerifyResult
run(SnapshotCheckCommandArg arg) throws IgniteException {
IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
return new
IgniteFutureImpl<>(snpMgr.checkSnapshot(arg.snapshotName(), arg.src(),
arg.increment())).get();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
deleted file mode 100644
index e8d348fc6da..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTaskAdapter;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * The task for checking the consistency of snapshots in the cluster.
- */
-public abstract class AbstractSnapshotVerificationTask extends
- ComputeTaskAdapter<SnapshotPartitionsVerifyTaskArg,
SnapshotPartitionsVerifyTaskResult> {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** Map of snapshot metadata information found on each cluster node. */
- protected final Map<ClusterNode, List<SnapshotMetadata>> metas = new
HashMap<>();
-
- /** Ignite instance. */
- @IgniteInstanceResource
- protected IgniteEx ignite;
-
- /** Injected logger. */
- @LoggerResource
- protected IgniteLogger log;
-
- /** {@inheritDoc} */
- @Override public Map<ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid, SnapshotPartitionsVerifyTaskArg arg) {
- Map<ClusterNode, List<SnapshotMetadata>> clusterMetas =
arg.clusterMetadata();
-
- if (!subgrid.containsAll(clusterMetas.keySet())) {
- throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
- new IgniteException("Some of Ignite nodes left the cluster
during the snapshot verification " +
- "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
- ", init=" + F.viewReadOnly(clusterMetas.keySet(),
F.node2id()) + ']')));
- }
-
- Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
- Set<SnapshotMetadata> allMetas = new HashSet<>();
- clusterMetas.values().forEach(allMetas::addAll);
- metas.putAll(clusterMetas);
-
- while (!allMetas.isEmpty()) {
- for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e :
clusterMetas.entrySet()) {
- SnapshotMetadata meta = F.find(e.getValue(), null,
allMetas::remove);
-
- if (meta == null)
- continue;
-
- jobs.put(createJob(meta.snapshotName(), meta.consistentId(),
arg), e.getKey());
-
- if (allMetas.isEmpty())
- break;
- }
- }
-
- return jobs;
- }
-
- /** {@inheritDoc} */
- @Override public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) throws IgniteException {
- // Handle all exceptions during the `reduce` operation.
- return ComputeJobResultPolicy.WAIT;
- }
-
- /**
- * @param name Snapshot name.
- * @param consId Consistent id of the related node.
- * @param args Check snapshot parameters.
- *
- * @return Compute job.
- */
- protected abstract AbstractSnapshotVerificationJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args);
-
- /** */
- protected abstract static class AbstractSnapshotVerificationJob extends
ComputeJobAdapter {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** Ignite instance. */
- @IgniteInstanceResource
- protected IgniteEx ignite;
-
- /** Injected logger. */
- @LoggerResource
- protected IgniteLogger log;
-
- /** Snapshot name. */
- protected final String snpName;
-
- /** Snapshot directory path. */
- @Nullable protected final String snpPath;
-
- /** Consistent id of the related node. */
- protected final String consId;
-
- /** Set of cache groups to be checked in the snapshot. {@code Null} or
empty to check everything. */
- @Nullable protected final Collection<String> rqGrps;
-
- /** If {@code true}, calculates and compares partition hashes.
Otherwise, only basic snapshot validation is launched. */
- protected final boolean check;
-
- /**
- * @param snpName Snapshot name.
- * @param snpPath Snapshot directory path.
- * @param consId Consistent id of the related node.
- * @param rqGrps Set of cache groups to be checked in the snapshot.
{@code Null} or empty to check everything.
- * @param check If {@code true}, calculates and compares partition
hashes. Otherwise, only basic snapshot validation is launched.
- */
- protected AbstractSnapshotVerificationJob(
- String snpName,
- @Nullable String snpPath,
- String consId,
- @Nullable Collection<String> rqGrps,
- boolean check
- ) {
- this.snpName = snpName;
- this.snpPath = snpPath;
- this.consId = consId;
- this.rqGrps = rqGrps;
- this.check = check;
- }
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 2d16637af6e..f83ce0e5950 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -1842,7 +1842,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @return Future with the result of execution snapshot partitions verify
task, which besides calculating partition
* hashes of {@link IdleVerifyResultV2} also contains the snapshot
metadata distribution across the cluster.
*/
- public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult>
checkSnapshot(String name, @Nullable String snpPath) {
+ public IgniteInternalFuture<SnapshotPartitionsVerifyResult>
checkSnapshot(String name, @Nullable String snpPath) {
return checkSnapshot(name, snpPath, -1);
}
@@ -1855,7 +1855,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @return Future with the result of execution snapshot partitions verify
task, which besides calculating partition
* hashes of {@link IdleVerifyResultV2} also contains the snapshot
metadata distribution across the cluster.
*/
- public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult>
checkSnapshot(String name, @Nullable String snpPath, int incIdx) {
+ public IgniteInternalFuture<SnapshotPartitionsVerifyResult>
checkSnapshot(String name, @Nullable String snpPath, int incIdx) {
A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy
the following name pattern: a-zA-Z0-9_");
@@ -1886,7 +1886,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @return Future with the result of execution snapshot partitions verify
task, which besides calculating partition
* hashes of {@link IdleVerifyResultV2} also contains the snapshot
metadata distribution across the cluster.
*/
- public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult>
checkSnapshot(
+ public IgniteInternalFuture<SnapshotPartitionsVerifyResult> checkSnapshot(
String name,
@Nullable String snpPath,
@Nullable Collection<String> grps,
@@ -1904,64 +1904,16 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
", incIdx=" + incIdx + ", grps=" + grps + ", validateParts=" +
check + ']');
}
- if (check && (incIdx < 1 || !includeCustomHandlers))
- return checkSnpProc.start(name, snpPath, grps, incIdx,
includeCustomHandlers);
+ IgniteInternalFuture<SnapshotPartitionsVerifyResult> res =
checkSnpProc.start(name, snpPath, grps, check, incIdx,
+ includeCustomHandlers);
- GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> res = new
GridFutureAdapter<>();
-
- GridKernalContext kctx0 = cctx.kernalContext();
-
- Collection<ClusterNode> bltNodes =
F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
- (node) -> CU.baselineNode(node, kctx0.state().clusterState()));
-
- Collection<Integer> grpIds = grps == null ? Collections.emptySet() :
F.viewReadOnly(grps, CU::cacheId);
-
- SnapshotMetadataVerificationTaskArg taskArg = new
SnapshotMetadataVerificationTaskArg(name, snpPath, incIdx, grpIds);
-
- kctx0.task().execute(
- SnapshotMetadataVerificationTask.class,
- taskArg,
- options(bltNodes)
- ).listen(f0 -> {
- SnapshotMetadataVerificationTaskResult metasRes = f0.result();
-
- if (f0.error() == null && F.isEmpty(metasRes.exceptions())) {
- Map<ClusterNode, List<SnapshotMetadata>> metas =
metasRes.meta();
-
- Class<? extends AbstractSnapshotVerificationTask> cls =
includeCustomHandlers
- ? SnapshotHandlerRestoreTask.class
- : SnapshotPartitionsVerifyTask.class;
-
- kctx0.task().execute(
- cls,
- new SnapshotPartitionsVerifyTaskArg(grps, metas,
snpPath, incIdx, check),
- options(new ArrayList<>(metas.keySet()))
- ).listen(f1 -> {
- if (f1.error() == null)
- res.onDone(f1.result());
- else if (f1.error() instanceof
IgniteSnapshotVerifyException)
- res.onDone(new
SnapshotPartitionsVerifyTaskResult(metas,
- new
IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions())));
- else
- res.onDone(f1.error());
- });
- }
- else {
- if (f0.error() == null)
- res.onDone(new
IgniteSnapshotVerifyException(metasRes.exceptions()));
- else if (f0.error() instanceof IgniteSnapshotVerifyException)
- res.onDone(new SnapshotPartitionsVerifyTaskResult(null,
- new
IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions())));
- else
- res.onDone(f0.error());
+ res.listen(lsnr -> {
+ if (log.isInfoEnabled()) {
+ log.info("The check snapshot procedure finished [snpName=" +
name + ", snpPath=" + snpPath
+ + ", incIdx=" + incIdx + ", grps=" + grps + ']');
}
});
- if (log.isInfoEnabled()) {
- res.listen(() -> log.info("The check snapshot procedure finished
[snpName=" + name +
- ", snpPath=" + snpPath + ", incIdx=" + incIdx + ", grps=" +
grps + ']'));
- }
-
return res;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
index e9f64247e82..130d21c79f2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
@@ -65,7 +65,7 @@ public class SnapshotCheckProcess {
private final Map<String, SnapshotCheckContext> contexts = new
ConcurrentHashMap<>();
/** Cluster-wide operation futures per snapshot called from current node.
*/
- private final Map<UUID,
GridFutureAdapter<SnapshotPartitionsVerifyTaskResult>> clusterOpFuts = new
ConcurrentHashMap<>();
+ private final Map<UUID, GridFutureAdapter<SnapshotPartitionsVerifyResult>>
clusterOpFuts = new ConcurrentHashMap<>();
/** Check metas first phase subprocess. */
private final DistributedProcess<SnapshotCheckProcessRequest,
SnapshotCheckResponse> phase1CheckMetas;
@@ -141,7 +141,7 @@ public class SnapshotCheckProcess {
if (log.isInfoEnabled())
log.info("Finished snapshot validation [req=" + ctx.req + ']');
- GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut =
clusterOpFuts.get(reqId);
+ GridFutureAdapter<SnapshotPartitionsVerifyResult> clusterOpFut =
clusterOpFuts.get(reqId);
if (clusterOpFut == null)
return new GridFinishedFuture<>();
@@ -156,7 +156,7 @@ public class SnapshotCheckProcess {
mapErrors(errors)
);
- clusterOpFut.onDone(new
SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, chkRes));
+ clusterOpFut.onDone(new
SnapshotPartitionsVerifyResult(ctx.clusterMetas, chkRes));
}
else if (ctx.req.allRestoreHandlers()) {
try {
@@ -168,7 +168,7 @@ public class SnapshotCheckProcess {
checker.checkCustomHandlersResults(ctx.req.snapshotName(),
cstRes);
- clusterOpFut.onDone(new
SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, null));
+ clusterOpFut.onDone(new
SnapshotPartitionsVerifyResult(ctx.clusterMetas, null));
}
catch (Throwable err) {
clusterOpFut.onDone(err);
@@ -183,7 +183,7 @@ public class SnapshotCheckProcess {
IdleVerifyResultV2 chkRes =
SnapshotChecker.reduceHashesResults(results0, errors0);
- clusterOpFut.onDone(new
SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, chkRes));
+ clusterOpFut.onDone(new
SnapshotPartitionsVerifyResult(ctx.clusterMetas, chkRes));
}
else
clusterOpFut.onDone(new
IgniteSnapshotVerifyException(errors0));
@@ -221,7 +221,7 @@ public class SnapshotCheckProcess {
workingFut = req.allRestoreHandlers()
? snpMgr.checker().invokeCustomHandlers(ctx.locMeta,
req.snapshotPath(), req.groups(), true)
: snpMgr.checker().checkPartitions(ctx.locMeta,
snpMgr.snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
- req.groups(), false, true, false);
+ req.groups(), false, req.fullCheck(), false);
}
workingFut.whenComplete((res, err) -> {
@@ -325,7 +325,7 @@ public class SnapshotCheckProcess {
SnapshotCheckContext ctx = context(null, reqId);
// The context is not stored in the case of concurrent check of the
same snapshot but the operation future is registered.
- GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut =
clusterOpFuts.get(reqId);
+ GridFutureAdapter<SnapshotPartitionsVerifyResult> clusterOpFut =
clusterOpFuts.get(reqId);
try {
if (!errors.isEmpty())
@@ -345,7 +345,7 @@ public class SnapshotCheckProcess {
results.forEach((nodeId, nodeRes) -> {
// A node might be not required. It gives null result. But a
required node might have invalid empty result
// which must be validated.
- if (ctx.req.nodes().contains(nodeId) && baseline(nodeId)) {
+ if (ctx.req.nodes().contains(nodeId) && baseline(nodeId) &&
!nodeRes.metas.isEmpty()) {
assert nodeRes != null && nodeRes.partsResults == null;
metas.put(kctx.cluster().get().node(nodeId),
nodeRes.metas);
@@ -387,15 +387,18 @@ public class SnapshotCheckProcess {
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
* @param grpNames List of cache group names.
+ * @param fullCheck If {@code true}, additionally calculates partition
hashes. Otherwise, checks only snapshot integrity
+ * and partition counters.
* @param incIdx Incremental snapshot index. If not positive, snapshot is
not considered as incremental.
* @param allRestoreHandlers If {@code true}, all the registered {@link
IgniteSnapshotManager#handlers()} of type
* {@link SnapshotHandlerType#RESTORE} are invoked.
Otherwise, only snapshot metadatas and partition
* hashes are validated.
*/
- public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> start(
+ public IgniteInternalFuture<SnapshotPartitionsVerifyResult> start(
String snpName,
@Nullable String snpPath,
@Nullable Collection<String> grpNames,
+ boolean fullCheck,
int incIdx,
boolean allRestoreHandlers
) {
@@ -414,11 +417,12 @@ public class SnapshotCheckProcess {
snpName,
snpPath,
grpNames,
+ fullCheck,
incIdx,
allRestoreHandlers
);
- GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut =
new GridFutureAdapter<>();
+ GridFutureAdapter<SnapshotPartitionsVerifyResult> clusterOpFut = new
GridFutureAdapter<>();
clusterOpFut.listen(fut -> {
clusterOpFuts.remove(reqId);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
index d1b38d4d512..3fa731fc0e4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
@@ -33,6 +33,10 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
/** Serial version uid. */
private static final long serialVersionUID = 0L;
+ /** If {@code true}, additionally calculates partition hashes. Otherwise,
checks only snapshot integrity and partition counters. */
+ @GridToStringInclude
+ private final boolean fullCheck;
+
/**
* If {@code true}, all the registered {@link
IgniteSnapshotManager#handlers()} of type {@link SnapshotHandlerType#RESTORE}
* are invoked. Otherwise, only snapshot metadatas and partition hashes
are validated.
@@ -52,6 +56,8 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
* @param nodes Baseline node IDs that must be alive to complete the
operation..
* @param snpPath Snapshot directory path.
* @param grps List of cache group names.
+ * @param fullCheck If {@code true}, additionally calculates partition
hashes. Otherwise, checks only snapshot integrity
+ * and partition counters.
* @param incIdx Incremental snapshot index. If not positive, snapshot is
not considered as incremental.
* @param allRestoreHandlers If {@code true}, all the registered {@link
IgniteSnapshotManager#handlers()} of type
* {@link SnapshotHandlerType#RESTORE} are
invoked. Otherwise, only snapshot metadatas and
@@ -63,6 +69,7 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
String snpName,
String snpPath,
@Nullable Collection<String> grps,
+ boolean fullCheck,
int incIdx,
boolean allRestoreHandlers
) {
@@ -70,6 +77,7 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
assert !F.isEmpty(nodes);
+ this.fullCheck = fullCheck;
this.allRestoreHandlers = allRestoreHandlers;
this.incIdx = incIdx;
}
@@ -82,6 +90,11 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
return allRestoreHandlers;
}
+ /** If {@code true}, additionally calculates partition hashes. Otherwise,
checks only snapshot integrity and partition counters. */
+ public boolean fullCheck() {
+ return fullCheck;
+ }
+
/** @return Incremental snapshot index. If not positive, snapshot is not
considered as incremental. */
public int incrementalIndex() {
return incIdx;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 8a12db38ffd..994a362221f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -354,7 +354,7 @@ public class SnapshotChecker {
return CompletableFuture.supplyAsync(
() -> {
- String consId =
kctx.cluster().get().localNode().consistentId().toString();
+ Object consId =
kctx.cluster().get().localNode().consistentId();
File snpDir =
kctx.cache().context().snapshotMgr().snapshotLocalDir(snpName, snpPath);
@@ -366,9 +366,10 @@ public class SnapshotChecker {
BaselineTopology blt =
kctx.state().clusterState().baselineTopology();
- SnapshotMetadata meta =
kctx.cache().context().snapshotMgr().readSnapshotMetadata(snpDir, consId);
+ SnapshotMetadata meta =
kctx.cache().context().snapshotMgr().readSnapshotMetadata(snpDir,
consId.toString());
- if (!F.eqNotOrdered(blt.consistentIds(),
meta.baselineNodes())) {
+ if
(!F.eqNotOrdered(blt.consistentIds().stream().map(Object::toString).collect(Collectors.toList()),
+ meta.baselineNodes())) {
throw new IgniteCheckedException("Topologies of
snapshot and current cluster are different [snp=" +
meta.baselineNodes() + ", current=" +
blt.consistentIds() + ']');
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
deleted file mode 100644
index 7df4a551b75..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Snapshot restore operation handling task.
- */
-public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override protected SnapshotHandlerRestoreJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args) {
- return new SnapshotHandlerRestoreJob(name, args.snapshotPath(),
consId, args.cacheGroupNames(), args.check());
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("rawtypes")
- @Nullable @Override public SnapshotPartitionsVerifyTaskResult
reduce(List<ComputeJobResult> results) {
- String snpName = F.first(F.first(metas.values())).snapshotName();
-
- Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> resMap = new
HashMap<>();
-
- results.forEach(jobRes -> {
- if (jobRes.getException() != null)
- throw jobRes.getException();
- else
- resMap.put(jobRes.getNode(), jobRes.getData());
- });
-
- try {
-
ignite.context().cache().context().snapshotMgr().checker().checkCustomHandlersResults(snpName,
resMap);
- }
- catch (Exception e) {
- log.warning("The snapshot operation will be aborted due to a
handler error [snapshot=" + snpName + "].", e);
-
- throw new IgniteException(e);
- }
-
- return new SnapshotPartitionsVerifyTaskResult(metas, null);
- }
-
- /** Invokes all {@link SnapshotHandlerType#RESTORE} handlers locally. */
- private static class SnapshotHandlerRestoreJob extends
AbstractSnapshotVerificationJob {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param snpName Snapshot name.
- * @param snpPath Snapshot directory path.
- * @param consId Consistent id of the related node.
- * @param grps Cache group names.
- * @param check If {@code true} check snapshot before restore.
- */
- public SnapshotHandlerRestoreJob(
- String snpName,
- @Nullable String snpPath,
- String consId,
- Collection<String> grps,
- boolean check
- ) {
- super(snpName, snpPath, consId, grps, check);
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, SnapshotHandlerResult<Object>> execute() {
- try {
- IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
-
- return snpMgr.checker().invokeCustomHandlers(snpName, consId,
snpPath, rqGrps, check).get();
- }
- catch (Exception e) {
- throw new IgniteException("Filed to invoke all the snapshot
validation handlers.", e);
- }
- }
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
deleted file mode 100644
index bad025849b3..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTaskAdapter;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/** Snapshot task to verify snapshot metadata on the baseline nodes for given
snapshot name. */
-@GridInternal
-public class SnapshotMetadataVerificationTask
- extends ComputeTaskAdapter<SnapshotMetadataVerificationTaskArg,
SnapshotMetadataVerificationTaskResult> {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** */
- private SnapshotMetadataVerificationTaskArg arg;
-
- /** */
- @IgniteInstanceResource
- private transient IgniteEx ignite;
-
- /** {@inheritDoc} */
- @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
- List<ClusterNode> subgrid,
- SnapshotMetadataVerificationTaskArg arg
- ) throws IgniteException {
- this.arg = arg;
-
- Map<ComputeJob, ClusterNode> map = U.newHashMap(subgrid.size());
-
- for (ClusterNode node : subgrid)
- map.put(new MetadataVerificationJob(arg), node);
-
- return map;
- }
-
- /** Job that verifies snapshot on an Ignite node. */
- private static class MetadataVerificationJob extends ComputeJobAdapter {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- @IgniteInstanceResource
- private transient IgniteEx ignite;
-
- /** */
- private final SnapshotMetadataVerificationTaskArg arg;
-
- /** */
- public MetadataVerificationJob(SnapshotMetadataVerificationTaskArg
arg) {
- this.arg = arg;
- }
-
- /** {@inheritDoc} */
- @Override public List<SnapshotMetadata> execute() {
- IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
-
- return
snpMgr.checker().checkLocalMetas(snpMgr.snapshotLocalDir(arg.snapshotName(),
arg.snapshotPath()),
- arg.incrementIndex(), arg.grpIds(),
ignite.localNode().consistentId()).join();
- }
- }
-
- /** {@inheritDoc} */
- @Override public @Nullable SnapshotMetadataVerificationTaskResult reduce(
- List<ComputeJobResult> results) throws IgniteException {
- Map<ClusterNode, List<SnapshotMetadata>> reduceRes = new HashMap<>();
- Map<ClusterNode, Exception> exs = new HashMap<>();
-
- for (ComputeJobResult res : results) {
- if (res.getException() != null) {
- exs.put(res.getNode(), res.getException());
-
- continue;
- }
-
- if (!F.isEmpty((Collection<?>)res.getData()))
- reduceRes.computeIfAbsent(res.getNode(), n -> new
ArrayList<>()).addAll(res.getData());
- }
-
- exs = SnapshotChecker.reduceMetasResults(arg.snapshotName(),
arg.snapshotPath(), reduceRes, exs, ignite.localNode().consistentId());
-
- return new SnapshotMetadataVerificationTaskResult(reduceRes, exs);
- }
-
- /** {@inheritDoc} */
- @Override public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) throws IgniteException {
- // Handle all exceptions during the `reduce` operation.
- return ComputeJobResultPolicy.WAIT;
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTaskArg.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTaskArg.java
deleted file mode 100644
index 6bc69b2c7d3..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTaskArg.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.visor.VisorDataTransferObject;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Input parameters for checking snapshot metadata.
- */
-public class SnapshotMetadataVerificationTaskArg extends
VisorDataTransferObject {
- /** Serial version UID. */
- private static final long serialVersionUID = 0L;
-
- /** Snapshot name. */
- private String snpName;
-
- /** Snapshot directory path. */
- private String snpPath;
-
- /** Incremental snapshot index. */
- private int incIdx;
-
- /** Cache group ids. */
- @Nullable private Collection<Integer> grpIds;
-
- /** Default constructor. */
- public SnapshotMetadataVerificationTaskArg() {
- // No-op.
- }
-
- /**
- * @param snpName Snapshot name.
- * @param snpPath Snapshot directory path.
- */
- public SnapshotMetadataVerificationTaskArg(String snpName, @Nullable
String snpPath, int incIdx, Collection<Integer> grpIds) {
- this.snpName = snpName;
- this.snpPath = snpPath;
- this.incIdx = incIdx;
- this.grpIds = grpIds;
- }
-
- /** {@inheritDoc} */
- @Override public byte getProtocolVersion() {
- return V2;
- }
-
- /**
- * @return Snapshot name.
- */
- public String snapshotName() {
- return snpName;
- }
-
- /**
- * @return Snapshot directory path.
- */
- public String snapshotPath() {
- return snpPath;
- }
-
- /**
- * @return Incremental snapshot index.
- */
- public int incrementIndex() {
- return incIdx;
- }
-
- /**
- * @return Cache group ids.
- */
- @Nullable public Collection<Integer> grpIds() {
- return grpIds;
- }
-
- /** {@inheritDoc} */
- @Override protected void writeExternalData(ObjectOutput out) throws
IOException {
- U.writeString(out, snpName);
- U.writeString(out, snpPath);
- out.writeInt(incIdx);
- U.writeCollection(out, grpIds);
- }
-
- /** {@inheritDoc} */
- @Override protected void readExternalData(byte protoVer, ObjectInput in)
throws IOException, ClassNotFoundException {
- snpName = U.readString(in);
- snpPath = U.readString(in);
- incIdx = in.readInt();
-
- if (protoVer > V1)
- grpIds = U.readCollection(in);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(SnapshotMetadataVerificationTaskArg.class, this);
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTaskResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTaskResult.java
deleted file mode 100644
index 212f615ce1a..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTaskResult.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.dto.IgniteDataTransferObject;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/** */
-public class SnapshotMetadataVerificationTaskResult extends
IgniteDataTransferObject {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Full snapshot metadata. */
- private Map<ClusterNode, List<SnapshotMetadata>> meta;
-
- /** Errors happened during snapshot metadata verification. */
- private Map<ClusterNode, Exception> exceptions;
-
- /** */
- public SnapshotMetadataVerificationTaskResult(
- Map<ClusterNode, List<SnapshotMetadata>> meta,
- Map<ClusterNode, Exception> exceptions
- ) {
- this.meta = Collections.unmodifiableMap(meta);
- this.exceptions = Collections.unmodifiableMap(exceptions);
- }
-
- /** */
- public SnapshotMetadataVerificationTaskResult() {
- }
-
- /** @return Errors happened during snapshot metadata verification. */
- public Map<ClusterNode, Exception> exceptions() {
- return Collections.unmodifiableMap(exceptions);
- }
-
- /** @return Full snapshot metadata. */
- public Map<ClusterNode, List<SnapshotMetadata>> meta() {
- return Collections.unmodifiableMap(meta);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeExternalData(ObjectOutput out) throws
IOException {
- U.writeMap(out, meta);
- U.writeMap(out, exceptions);
- }
-
- /** {@inheritDoc} */
- @Override protected void readExternalData(byte protoVer, ObjectInput in)
throws IOException, ClassNotFoundException {
- meta = U.readMap(in);
- exceptions = U.readMap(in);
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyResult.java
similarity index 96%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskResult.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyResult.java
index a32397742ec..560f6237c1c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskResult.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyResult.java
@@ -38,7 +38,7 @@ import org.jetbrains.annotations.Nullable;
* The result of execution snapshot partitions verify task which besides
calculating partition hashes of
* {@link IdleVerifyResultV2} also contains the snapshot metadata distribution
across the cluster.
*/
-public class SnapshotPartitionsVerifyTaskResult extends
IgniteDataTransferObject {
+public class SnapshotPartitionsVerifyResult extends IgniteDataTransferObject {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -49,7 +49,7 @@ public class SnapshotPartitionsVerifyTaskResult extends
IgniteDataTransferObject
@Nullable private IdleVerifyResultV2 idleRes;
/** Default constructor. */
- public SnapshotPartitionsVerifyTaskResult() {
+ public SnapshotPartitionsVerifyResult() {
// No-op.
}
@@ -57,7 +57,7 @@ public class SnapshotPartitionsVerifyTaskResult extends
IgniteDataTransferObject
* @param metas Map of snapshot metadata information found on each cluster
node.
* @param idleRes Result of cluster nodes partitions comparison.
*/
- public SnapshotPartitionsVerifyTaskResult(
+ SnapshotPartitionsVerifyResult(
Map<ClusterNode, List<SnapshotMetadata>> metas,
@Nullable IdleVerifyResultV2 idleRes
) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
deleted file mode 100644
index 1ad30d2c981..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.management.cache.PartitionKeyV2;
-import
org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTaskV2;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
-import org.apache.ignite.internal.processors.task.GridInternal;
-import org.jetbrains.annotations.Nullable;
-
-import static
org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTaskV2.reduce0;
-
-/**
- * Task for checking snapshot partitions consistency the same way as {@link
VerifyBackupPartitionsTaskV2} does.
- * Since a snapshot partitions already stored apart on disk the is no
requirement for a cluster upcoming updates
- * to be hold on.
- */
-@GridInternal
-public class SnapshotPartitionsVerifyTask extends
AbstractSnapshotVerificationTask {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override protected VerifySnapshotPartitionsJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args) {
- return new VerifySnapshotPartitionsJob(name, args.snapshotPath(),
consId, args.cacheGroupNames(), args.check());
- }
-
- /** {@inheritDoc} */
- @Override public @Nullable SnapshotPartitionsVerifyTaskResult
reduce(List<ComputeJobResult> results) throws IgniteException {
- return new SnapshotPartitionsVerifyTaskResult(metas, reduce0(results));
- }
-
- /** Job that collects update counters of snapshot partitions on the node
it executes. */
- private static class VerifySnapshotPartitionsJob extends
AbstractSnapshotVerificationJob {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param snpName Snapshot name to validate.
- * @param consId Consistent id of the related node.
- * @param rqGrps Set of cache groups to be checked in the snapshot or
{@code empty} to check everything.
- * @param snpPath Snapshot directory path.
- * @param check If {@code true} check snapshot before restore.
- */
- public VerifySnapshotPartitionsJob(
- String snpName,
- @Nullable String snpPath,
- String consId,
- Collection<String> rqGrps,
- boolean check
- ) {
- super(snpName, snpPath, consId, rqGrps, check);
- }
-
- /** {@inheritDoc} */
- @Override public Map<PartitionKeyV2, PartitionHashRecordV2> execute()
throws IgniteException {
- GridCacheSharedContext<?, ?> cctx =
ignite.context().cache().context();
-
- if (log.isInfoEnabled()) {
- log.info("Verify snapshot partitions procedure has been
initiated " +
- "[snpName=" + snpName + ", consId=" + consId + ']');
- }
-
- File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName,
snpPath);
-
- try {
- SnapshotMetadata meta =
cctx.snapshotMgr().readSnapshotMetadata(snpDir, consId);
-
- return cctx.snapshotMgr().checker().checkPartitions(meta,
snpDir, rqGrps, false, check, false).get();
- }
- catch (Exception e) {
- throw new IgniteException("Failed to read snapshot metadatas
of the snapshot '" + snpName + "'.", e);
- }
- finally {
- if (log.isInfoEnabled()) {
- log.info("Verify snapshot partitions procedure has been
finished " +
- "[snpName=" + snpName + ", consId=" + consId + ']');
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- VerifySnapshotPartitionsJob job = (VerifySnapshotPartitionsJob)o;
-
- return snpName.equals(job.snpName) && consId.equals(job.consId) &&
- Objects.equals(rqGrps, job.rqGrps) && Objects.equals(snpPath,
job.snpPath) &&
- Objects.equals(check, job.check);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return Objects.hash(snpName, consId, rqGrps, snpPath, check);
- }
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java
deleted file mode 100644
index c3c660d94b5..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.visor.VisorDataTransferObject;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Input parameters for checking snapshot partitions consistency task.
- */
-public class SnapshotPartitionsVerifyTaskArg extends VisorDataTransferObject {
- /** Serial version UID. */
- private static final long serialVersionUID = 0L;
-
- /** Cache group names to be verified. */
- @Nullable private Collection<String> grpNames;
-
- /** The map of distribution of snapshot metadata pieces across the
cluster. */
- private Map<ClusterNode, List<SnapshotMetadata>> clusterMetas;
-
- /** Snapshot directory path. */
- @Nullable private String snpPath;
-
- /** If {@code true} check snapshot integrity. */
- private boolean check;
-
- /** Incremental snapshot index. */
- private int incIdx;
-
- /** Default constructor. */
- public SnapshotPartitionsVerifyTaskArg() {
- // No-op.
- }
-
- /**
- * @param grpNames Cache group names to be verified.
- * @param clusterMetas The map of distribution of snapshot metadata pieces
across the cluster.
- * @param snpPath Snapshot directory path.
- * @param incIdx Incremental snapshot index.
- * @param check If {@code true} check snapshot integrity.
- */
- public SnapshotPartitionsVerifyTaskArg(
- @Nullable Collection<String> grpNames,
- Map<ClusterNode, List<SnapshotMetadata>> clusterMetas,
- @Nullable String snpPath,
- int incIdx,
- boolean check
- ) {
- this.grpNames = grpNames;
- this.clusterMetas = clusterMetas;
- this.snpPath = snpPath;
- this.incIdx = incIdx;
- this.check = check;
- }
-
- /**
- * @return Cache group names to be verified.
- */
- @Nullable public Collection<String> cacheGroupNames() {
- return grpNames;
- }
-
- /**
- * @return The map of distribution of snapshot metadata pieces across the
cluster.
- */
- public Map<ClusterNode, List<SnapshotMetadata>> clusterMetadata() {
- return clusterMetas;
- }
-
- /**
- * @return Snapshot directory path.
- */
- @Nullable public String snapshotPath() {
- return snpPath;
- }
-
- /**
- * @return Incremental snapshot index.
- */
- public int incrementIndex() {
- return incIdx;
- }
-
- /** @return If {@code true} check snapshot integrity. */
- public boolean check() {
- return check;
- }
-
- /** {@inheritDoc} */
- @Override protected void writeExternalData(ObjectOutput out) throws
IOException {
- U.writeCollection(out, grpNames);
- U.writeMap(out, clusterMetas);
- U.writeString(out, snpPath);
- out.writeBoolean(check);
- out.writeInt(incIdx);
- }
-
- /** {@inheritDoc} */
- @Override protected void readExternalData(byte protoVer, ObjectInput in)
throws IOException, ClassNotFoundException {
- grpNames = U.readCollection(in);
- clusterMetas = U.readMap(in);
- snpPath = U.readString(in);
- check = in.readBoolean();
- incIdx = in.readInt();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(SnapshotPartitionsVerifyTaskArg.class, this);
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index c02deccd103..a0ea3bbb01f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -333,7 +333,7 @@ public class SnapshotRestoreProcess {
snpMgr.recordSnapshotEvent(snpName, msg,
EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED);
- snpMgr.checkSnapshot(snpName, snpPath, cacheGrpNames, true, incIdx,
check).listen(f -> {
+ snpMgr.checkSnapshot(snpName, snpPath, cacheGrpNames, incIdx < 1,
incIdx, check).listen(f -> {
if (f.error() != null) {
finishProcess(fut0.rqId, f.error());
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 63d8fd49495..f892a1f21c5 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1144,7 +1144,6 @@
org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePending
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl$Segment
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl$ThrottlingPolicy
org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotMessage
-org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotVerificationTask
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager$3
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager$CancelSnapshotCallable
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager$CreateSnapshotCallable
@@ -1166,16 +1165,8 @@
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandler
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataVerificationTask
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataVerificationTask$MetadataVerificationJob
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataVerificationTaskArg
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataVerificationTaskResult
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationRequest
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTask
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTask$VerifySnapshotPartitionsJob
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskArg
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess$SnapshotRestoreOperationResponse
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStatusTask
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStatusTask$1
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java
index b1a719687e7..86f30620b90 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java
@@ -43,6 +43,7 @@ import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CACHE_GROUP_KEY_CHANGE_PREPARE;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.MASTER_KEY_CHANGE_PREPARE;
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
/**
@@ -387,7 +388,8 @@ public class EncryptedSnapshotTest extends
AbstractSnapshotSelfTest {
ensureCacheAbsent(dfltCacheCfg);
- spi0.block((msg) -> msg instanceof FullMessage &&
((FullMessage<?>)msg).error().isEmpty());
+ spi0.block((msg) -> msg instanceof FullMessage &&
((FullMessage<?>)msg).error().isEmpty()
+ && ((FullMessage<?>)msg).type() ==
RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());
fut = grid(1).snapshot().restoreSnapshot(SNAPSHOT_NAME,
Collections.singletonList(dfltCacheCfg.getName()));
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index ac041c439e5..0e6fbf134c1 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -52,7 +52,6 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridJobExecuteRequest;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -86,6 +85,7 @@ import
org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.distributed.FullMessage;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
@@ -341,17 +341,18 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
Set<UUID> assigns = Collections.newSetFromMap(new
ConcurrentHashMap<>());
for (int i = 4; i < 7; i++) {
-
startGrid(optimize(getConfiguration(getTestIgniteInstanceName(i)).setCacheConfiguration()));
+ IgniteEx grid =
startGrid(optimize(getConfiguration(getTestIgniteInstanceName(i)).setCacheConfiguration()));
- UUID locNodeId = grid(i).localNode().id();
+ if (!U.isLocalNodeCoordinator(grid.context().discovery()))
+ continue;
- grid(i).context().io().addMessageListener(GridTopic.TOPIC_JOB, new
GridMessageListener() {
+
grid.context().io().addMessageListener(GridTopic.TOPIC_DISTRIBUTED_PROCESS, new
GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte
plc) {
- if (msg instanceof GridJobExecuteRequest) {
- GridJobExecuteRequest msg0 =
(GridJobExecuteRequest)msg;
+ if (msg instanceof SingleNodeMessage) {
+ SingleNodeMessage<?> msg0 = (SingleNodeMessage<?>)msg;
- if
(msg0.getTaskName().contains(SnapshotPartitionsVerifyTask.class.getName()))
- assigns.add(locNodeId);
+ if (msg0.type() == CHECK_SNAPSHOT_PARTS.ordinal())
+ assigns.add(nodeId);
}
}
});
@@ -361,14 +362,14 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
ignite.cluster().baselineAutoAdjustEnabled(false);
ignite.cluster().state(ACTIVE);
- IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME,
null, null, false, 0, false).get().idleVerifyResult();
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME,
null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
// GridJobExecuteRequest is not send to the local node.
- assertTrue("Number of jobs must be equal to the cluster size (except
local node): " + assigns + ", count: "
- + assigns.size(), waitForCondition(() -> assigns.size() == 2,
5_000L));
+ assertTrue("Number of distributed process single messages must be
equal to the cluster size: "
+ + assigns + ", count: " + assigns.size(), waitForCondition(() ->
assigns.size() == 2, 5_000L));
assertTrue(F.isEmpty(res.exceptions()));
assertPartitionsSame(res);
@@ -522,20 +523,7 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
IdleVerifyResultV2 idleVerifyRes = ignite.compute().execute(new
TestVisorBackupPartitionsTask(), arg);
- IdleVerifyResultV2 snpVerifyRes = ignite.compute().execute(
- new TestSnapshotPartitionsVerifyTask(),
- new SnapshotPartitionsVerifyTaskArg(
- new HashSet<>(),
- Collections.singletonMap(ignite.cluster().localNode(),
- Collections.singletonList(snp(ignite).readSnapshotMetadata(
- snp(ignite).snapshotLocalDir(SNAPSHOT_NAME),
- (String)ignite.configuration().getConsistentId()
- ))),
- null,
- 0,
- true
- )
- ).idleVerifyResult();
+ IdleVerifyResultV2 snpVerifyRes =
snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
Map<PartitionKeyV2, List<PartitionHashRecordV2>> idleVerifyHashes =
jobResults.get(TestVisorBackupPartitionsTask.class);
Map<PartitionKeyV2, List<PartitionHashRecordV2>> snpCheckHashes =
jobResults.get(TestVisorBackupPartitionsTask.class);
@@ -550,7 +538,7 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotCheckWithTwoCachesCheckNullInput() throws
Exception {
- SnapshotPartitionsVerifyTaskResult res =
checkSnapshotWithTwoCachesWhenOneIsCorrupted(null);
+ SnapshotPartitionsVerifyResult res =
checkSnapshotWithTwoCachesWhenOneIsCorrupted(null);
StringBuilder b = new StringBuilder();
res.idleVerifyResult().print(b::append, true);
@@ -564,7 +552,7 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotCheckWithTwoCachesCheckNotCorrupted()
throws Exception {
- SnapshotPartitionsVerifyTaskResult res =
checkSnapshotWithTwoCachesWhenOneIsCorrupted(Collections.singletonList(
+ SnapshotPartitionsVerifyResult res =
checkSnapshotWithTwoCachesWhenOneIsCorrupted(Collections.singletonList(
OPTIONAL_CACHE_NAME));
StringBuilder b = new StringBuilder();
@@ -579,7 +567,7 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotCheckWithTwoCachesCheckTwoCaches() throws
Exception {
- SnapshotPartitionsVerifyTaskResult res =
checkSnapshotWithTwoCachesWhenOneIsCorrupted(Arrays.asList(
+ SnapshotPartitionsVerifyResult res =
checkSnapshotWithTwoCachesWhenOneIsCorrupted(Arrays.asList(
OPTIONAL_CACHE_NAME, DEFAULT_CACHE_NAME));
StringBuilder b = new StringBuilder();
@@ -790,7 +778,7 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
}
}
- /** Tests that concurrent snapshot full check and restoration (without
checking) are allowed for different snapshots. */
+ /** Tests that concurrent snapshot full check and restoration (without
full checking) are allowed for different snapshots. */
@Test
public void testConcurrentDifferentSnpFullCheckAndRestorationAllowed()
throws Exception {
prepareGridsAndSnapshot(3, 2, 2, false);
@@ -902,9 +890,9 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
}
}
- /** Tests that concurrent full check and restoration (without checking) of
the same snapshot are allowed. */
+ /** Tests that concurrent full check and restoration (without full
checking) of the same snapshot are declined. */
@Test
- public void testConcurrentTheSameSnpFullCheckAndRestoreAllowed() throws
Exception {
+ public void testConcurrentTheSameSnpFullCheckAndRestoreDeclined() throws
Exception {
prepareGridsAndSnapshot(3, 2, 2, true);
for (int i = 0; i < G.allGrids().size(); ++i) {
@@ -917,8 +905,8 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
() -> new
IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)),
() -> snp(grid(j0)).restoreSnapshot(SNAPSHOT_NAME, null),
CHECK_SNAPSHOT_METAS,
- RESTORE_CACHE_GROUP_SNAPSHOT_START,
- false,
+ CHECK_SNAPSHOT_METAS,
+ true,
false,
null,
() -> grid(0).destroyCache(DEFAULT_CACHE_NAME)
@@ -1326,7 +1314,7 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
snp(ignite).createSnapshot(SNAPSHOT_NAME).get(timeout);
- SnapshotPartitionsVerifyTaskResult res =
snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get(timeout);
+ SnapshotPartitionsVerifyResult res =
snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get(timeout);
assertFalse(res.idleVerifyResult().hasConflicts());
}
@@ -1356,7 +1344,7 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
* @return Check result.
* @throws Exception If fails.
*/
- private SnapshotPartitionsVerifyTaskResult
checkSnapshotWithTwoCachesWhenOneIsCorrupted(
+ private SnapshotPartitionsVerifyResult
checkSnapshotWithTwoCachesWhenOneIsCorrupted(
Collection<String> cachesToCheck
) throws Exception {
Random rnd = new Random();
@@ -1427,16 +1415,4 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
return res;
}
}
-
- /** Test compute task to collect partition data hashes when the snapshot
check procedure ends. */
- private class TestSnapshotPartitionsVerifyTask extends
SnapshotPartitionsVerifyTask {
- /** {@inheritDoc} */
- @Override public @Nullable SnapshotPartitionsVerifyTaskResult
reduce(List<ComputeJobResult> results) throws IgniteException {
- SnapshotPartitionsVerifyTaskResult res = super.reduce(results);
-
- saveHashes(TestSnapshotPartitionsVerifyTask.class, results);
-
- return res;
- }
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index 3fd9d6dde40..335078fc7ab 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -280,8 +280,8 @@ public class IgniteClusterSnapshotRestoreSelfTest extends
IgniteClusterSnapshotR
}
catch (Exception e) {
assertTrue("Unexpected exception: " +
Throwables.getStackTraceAsString(e),
- X.hasCause(e, "The previous snapshot restore operation was
not completed.",
- IgniteCheckedException.class, IgniteException.class));
+ X.hasCause(e, "The previous snapshot restore operation was
not completed", IgniteException.class)
+ || X.hasCause(e, "has already started",
IgniteException.class));
failCnt.incrementAndGet();
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
index 2d0b0e65bb4..941cd8b8571 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
@@ -244,7 +244,7 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
stopGrid(0);
stopGrid(1);
- createAndCheckSnapshot(client, false,
DataStreamerUpdatesHandler.WRN_MSG,
+ createAndCheckSnapshot(client, false, false,
DataStreamerUpdatesHandler.WRN_MSG,
SnapshotPartitionsQuickVerifyHandler.WRN_MSG);
}
@@ -351,7 +351,7 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
String notExpWrn = allowOverwrite ? null :
SnapshotPartitionsQuickVerifyHandler.WRN_MSG;
try {
- SnapshotPartitionsVerifyTaskResult checkRes =
createAndCheckSnapshot(snpHnd, true, expectedWrn,
+ SnapshotPartitionsVerifyResult checkRes =
createAndCheckSnapshot(snpHnd, true, true, expectedWrn,
notExpWrn);
if (expectedWrn != null) {
@@ -417,9 +417,9 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
waitForCondition(loadFut::isDone, getTestTimeout());
if (allowOverwrite)
- createAndCheckSnapshot(snpHnd, true, null, null);
+ createAndCheckSnapshot(snpHnd, true, true, null, null);
else {
- createAndCheckSnapshot(snpHnd, true,
SnapshotPartitionsQuickVerifyHandler.WRN_MSG,
+ createAndCheckSnapshot(snpHnd, true, true,
SnapshotPartitionsQuickVerifyHandler.WRN_MSG,
DataStreamerUpdatesHandler.WRN_MSG);
}
}
@@ -462,7 +462,7 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
}
/** */
- private SnapshotPartitionsVerifyTaskResult createAndCheckSnapshot(IgniteEx
snpHnd, boolean create,
+ private SnapshotPartitionsVerifyResult createAndCheckSnapshot(IgniteEx
snpHnd, boolean create, boolean expectVerifyErr,
String expWrn, String notExpWrn) throws Exception {
assert notExpWrn == null || expWrn != null;
@@ -498,11 +498,11 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
}
}
- SnapshotPartitionsVerifyTaskResult checkRes =
snp(snpHnd).checkSnapshot(SNAPSHOT_NAME, null).get();
+ SnapshotPartitionsVerifyResult checkRes =
snp(snpHnd).checkSnapshot(SNAPSHOT_NAME, null).get();
assertTrue(checkRes.exceptions().isEmpty());
- if (!onlyPrimary)
+ if (!onlyPrimary && expectVerifyErr)
assertTrue((expWrn != null) ==
checkRes.idleVerifyResult().hasConflicts());
if (expWrn != null) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
index 91b88a45461..d1a804ada61 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
@@ -40,6 +40,7 @@ import org.junit.Test;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.SNAPSHOT_RESTORE_METRICS;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -262,7 +263,8 @@ public class IgniteSnapshotMXBeanTest extends
AbstractSnapshotSelfTest {
awaitPartitionMapExchange();
- spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage);
+ spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage
+ && ((SingleNodeMessage)msg).type() ==
RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());
fut = srv.snapshot().restoreSnapshot(SNAPSHOT_NAME,
F.asList(DEFAULT_CACHE_NAME));
@@ -283,7 +285,8 @@ public class IgniteSnapshotMXBeanTest extends
AbstractSnapshotSelfTest {
awaitPartitionMapExchange();
- spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage);
+ spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage
+ && ((SingleNodeMessage)msg).type() ==
RESTORE_CACHE_GROUP_SNAPSHOT_START.ordinal());
fut = srv.snapshot().restoreSnapshot(SNAPSHOT_NAME,
F.asList(DEFAULT_CACHE_NAME), 1);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/ConcurrentTxsIncrementalSnapshotTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/ConcurrentTxsIncrementalSnapshotTest.java
index e2fadff5dc0..1501554ddc6 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/ConcurrentTxsIncrementalSnapshotTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/ConcurrentTxsIncrementalSnapshotTest.java
@@ -32,7 +32,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyResult;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
@@ -267,7 +267,7 @@ public class ConcurrentTxsIncrementalSnapshotTest extends
AbstractIncrementalSna
checkWalsConsistency(txCnt.get(), SNP_CNT);
for (int i = 0; i < SNP_CNT; i++) {
- SnapshotPartitionsVerifyTaskResult res =
snp(grid(0)).checkSnapshot(SNP, null, i).get(getTestTimeout());
+ SnapshotPartitionsVerifyResult res =
snp(grid(0)).checkSnapshot(SNP, null, i).get(getTestTimeout());
assertTrue(F.isEmpty(res.exceptions()));
assertFalse(res.idleVerifyResult().hasConflicts());
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotCheckBeforeRestoreTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotCheckBeforeRestoreTest.java
index ad94d401065..87232343816 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotCheckBeforeRestoreTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotCheckBeforeRestoreTest.java
@@ -29,7 +29,7 @@ import
org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotSelfTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotMetadata;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyResult;
import
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -97,7 +97,7 @@ public class IncrementalSnapshotCheckBeforeRestoreTest
extends AbstractSnapshotS
for (IgniteEx n: F.asList(grid(0), grid(GRID_CNT))) {
for (int i = 0; i <= incSnpCnt; i++) {
- SnapshotPartitionsVerifyTaskResult res =
snp(n).checkSnapshot(SNP, null, null, false, i, DFLT_CHECK_ON_RESTORE)
+ SnapshotPartitionsVerifyResult res = snp(n).checkSnapshot(SNP,
null, null, false, i, DFLT_CHECK_ON_RESTORE)
.get(getTestTimeout());
assertTrue(res.exceptions().isEmpty());
@@ -123,7 +123,7 @@ public class IncrementalSnapshotCheckBeforeRestoreTest
extends AbstractSnapshotS
createIncrementalSnapshots(1);
for (IgniteEx n : F.asList(grid(0), grid(GRID_CNT))) {
- SnapshotPartitionsVerifyTaskResult res = snp(n).checkSnapshot(SNP,
null, null, false, 1, DFLT_CHECK_ON_RESTORE)
+ SnapshotPartitionsVerifyResult res = snp(n).checkSnapshot(SNP,
null, null, false, 1, DFLT_CHECK_ON_RESTORE)
.get(getTestTimeout());
assertTrue(res.exceptions().isEmpty());
@@ -160,7 +160,7 @@ public class IncrementalSnapshotCheckBeforeRestoreTest
extends AbstractSnapshotS
U.delete(snp(srv).incrementalSnapshotLocalDir(SNP, null, 1));
for (IgniteEx n : F.asList(srv, grid(GRID_CNT))) {
- SnapshotPartitionsVerifyTaskResult res = snp(n).checkSnapshot(SNP,
null, null, false, 0, DFLT_CHECK_ON_RESTORE)
+ SnapshotPartitionsVerifyResult res = snp(n).checkSnapshot(SNP,
null, null, false, 0, DFLT_CHECK_ON_RESTORE)
.get(getTestTimeout());
assertTrue(res.exceptions().isEmpty());
@@ -188,7 +188,7 @@ public class IncrementalSnapshotCheckBeforeRestoreTest
extends AbstractSnapshotS
deleteWalSegment(0);
for (IgniteEx n : F.asList(srv, grid(GRID_CNT))) {
- SnapshotPartitionsVerifyTaskResult res = snp(n).checkSnapshot(SNP,
null, null, false, 0, DFLT_CHECK_ON_RESTORE)
+ SnapshotPartitionsVerifyResult res = snp(n).checkSnapshot(SNP,
null, null, false, 0, DFLT_CHECK_ON_RESTORE)
.get(getTestTimeout());
assertTrue(res.exceptions().isEmpty());