This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8be2e2cd54f IGNITE-22412: Unify snapshot validation jobs (#11368)
8be2e2cd54f is described below
commit 8be2e2cd54f7a7a8930086f5476c7bca0663d4ed
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Jun 6 09:11:52 2024 +0300
IGNITE-22412: Unify snapshot validation jobs (#11368)
---
.../snapshot/AbstractSnapshotVerificationTask.java | 86 ++++++++++++++++------
.../IncrementalSnapshotVerificationTask.java | 51 ++-----------
.../snapshot/SnapshotHandlerContext.java | 6 +-
.../snapshot/SnapshotHandlerRestoreTask.java | 60 ++-------------
.../snapshot/SnapshotPartitionsVerifyTask.java | 50 ++-----------
.../snapshot/SnapshotPartitionsVerifyTaskArg.java | 10 +--
6 files changed, 89 insertions(+), 174 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
index cff0b0e110d..e8d348fc6da 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java
@@ -24,14 +24,17 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -49,6 +52,10 @@ public abstract class AbstractSnapshotVerificationTask
extends
@IgniteInstanceResource
protected IgniteEx ignite;
+ /** Injected logger. */
+ @LoggerResource
+ protected IgniteLogger log;
+
/** {@inheritDoc} */
@Override public Map<ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid, SnapshotPartitionsVerifyTaskArg arg) {
Map<ClusterNode, List<SnapshotMetadata>> clusterMetas =
arg.clusterMetadata();
@@ -72,17 +79,7 @@ public abstract class AbstractSnapshotVerificationTask
extends
if (meta == null)
continue;
- jobs.put(
- createJob(
- meta.snapshotName(),
- arg.snapshotPath(),
- arg.incrementIndex(),
- meta.consistentId(),
- arg.cacheGroupNames(),
- arg.check()
- ),
- e.getKey()
- );
+ jobs.put(createJob(meta.snapshotName(), meta.consistentId(),
arg), e.getKey());
if (allMetas.isEmpty())
break;
@@ -100,19 +97,60 @@ public abstract class AbstractSnapshotVerificationTask
extends
/**
* @param name Snapshot name.
- * @param path Snapshot directory path.
- * @param incIdx Incremental snapshot index.
- * @param constId Snapshot metadata file name.
- * @param groups Cache groups to be restored from the snapshot. May be
empty if all cache groups are being restored.
- * @param check If {@code true} check snapshot before restore.
+ * @param consId Consistent id of the related node.
+ * @param args Check snapshot parameters.
+ *
* @return Compute job.
*/
- protected abstract ComputeJob createJob(
- String name,
- @Nullable String path,
- int incIdx,
- String constId,
- Collection<String> groups,
- boolean check
- );
+ protected abstract AbstractSnapshotVerificationJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args);
+
+ /** */
+ protected abstract static class AbstractSnapshotVerificationJob extends
ComputeJobAdapter {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Ignite instance. */
+ @IgniteInstanceResource
+ protected IgniteEx ignite;
+
+ /** Injected logger. */
+ @LoggerResource
+ protected IgniteLogger log;
+
+ /** Snapshot name. */
+ protected final String snpName;
+
+ /** Snapshot directory path. */
+ @Nullable protected final String snpPath;
+
+ /** Consistent id of the related node. */
+ protected final String consId;
+
+ /** Set of cache groups to be checked in the snapshot. {@code Null} or
empty to check everything. */
+ @Nullable protected final Collection<String> rqGrps;
+
+ /** If {@code true}, calculates and compares partition hashes.
Otherwise, only basic snapshot validation is launched. */
+ protected final boolean check;
+
+ /**
+ * @param snpName Snapshot name.
+ * @param snpPath Snapshot directory path.
+ * @param consId Consistent id of the related node.
+ * @param rqGrps Set of cache groups to be checked in the snapshot.
{@code Null} or empty to check everything.
+ * @param check If {@code true}, calculates and compares partition
hashes. Otherwise, only basic snapshot validation is launched.
+ */
+ protected AbstractSnapshotVerificationJob(
+ String snpName,
+ @Nullable String snpPath,
+ String consId,
+ @Nullable Collection<String> rqGrps,
+ boolean check
+ ) {
+ this.snpName = snpName;
+ this.snpPath = snpPath;
+ this.consId = consId;
+ this.rqGrps = rqGrps;
+ this.check = check;
+ }
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
index 6cde9cfff44..a6779e8e7e7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java
@@ -36,13 +36,9 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
@@ -57,8 +53,6 @@ import
org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.marshaller.MarshallerUtils;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
@@ -71,14 +65,6 @@ public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerific
/** Serial version uid. */
private static final long serialVersionUID = 0L;
- /** Ignite instance. */
- @IgniteInstanceResource
- private IgniteEx ignite;
-
- /** Injected logger. */
- @LoggerResource
- private IgniteLogger log;
-
/** {@inheritDoc} */
@Override public SnapshotPartitionsVerifyTaskResult
reduce(List<ComputeJobResult> results) throws IgniteException {
Map<Object, Map<Object, TransactionsHashRecord>> nodeTxHashMap = new
HashMap<>();
@@ -147,42 +133,18 @@ public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerific
}
/** {@inheritDoc} */
- @Override protected ComputeJob createJob(
- String name,
- @Nullable String path,
- int incIdx,
- String constId,
- Collection<String> groups,
- boolean check
- ) {
- return new VerifyIncrementalSnapshotJob(name, path, incIdx, constId);
+ @Override protected VerifyIncrementalSnapshotJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args) {
+ return new VerifyIncrementalSnapshotJob(name, args.snapshotPath(),
args.incrementIndex(), consId);
}
/** */
- private static class VerifyIncrementalSnapshotJob extends
ComputeJobAdapter {
+ private static class VerifyIncrementalSnapshotJob extends
AbstractSnapshotVerificationJob {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
- /** Ignite instance. */
- @IgniteInstanceResource
- private IgniteEx ignite;
-
- /** Injected logger. */
- @LoggerResource
- private IgniteLogger log;
-
- /** Snapshot name to validate. */
- private final String snpName;
-
- /** Snapshot directory path. */
- private final String snpPath;
-
/** Incremental snapshot index. */
private final int incIdx;
- /** Consistent ID. */
- private final String consId;
-
/** */
private LongAdder procEntriesCnt;
@@ -190,7 +152,7 @@ public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerific
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
* @param incIdx Incremental snapshot index.
- * @param consId Consistent ID.
+ * @param consId Consistent id of the related node.
*/
public VerifyIncrementalSnapshotJob(
String snpName,
@@ -198,10 +160,9 @@ public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerific
int incIdx,
String consId
) {
- this.snpName = snpName;
- this.snpPath = snpPath;
+ super(snpName, snpPath, consId, null, true);
+
this.incIdx = incIdx;
- this.consId = consId;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
index ceceb3c785b..07b9946dfcc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
@@ -41,7 +41,7 @@ public class SnapshotHandlerContext {
/** Warning flag of concurrent inconsistent-by-nature streamer updates. */
private final boolean streamerWrn;
- /** If {@code true} check snapshot integrity. */
+ /** If {@code true}, calculates and compares partition hashes. Otherwise,
only basic snapshot validation is launched.*/
private final boolean check;
/**
@@ -51,7 +51,7 @@ public class SnapshotHandlerContext {
* @param locNode Local node.
* @param snpDir The full path to the snapshot files.
* @param streamerWrn {@code True} if concurrent streaming updates
occurred during snapshot operation.
- * @param check If {@code true} check snapshot integrity.
+ * @param check If {@code true}, calculates and compares partition hashes.
Otherwise, only basic snapshot validation is launched.
*/
public SnapshotHandlerContext(
SnapshotMetadata metadata,
@@ -105,7 +105,7 @@ public class SnapshotHandlerContext {
return streamerWrn;
}
- /** @return If {@code true} check snapshot integrity. */
+ /** @return If {@code true}, calculates and compares partition hashes.
Otherwise, only basic snapshot validation is launched. */
public boolean check() {
return check;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
index 5c34511ce27..c2db78f2093 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
@@ -27,14 +27,8 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -44,20 +38,9 @@ public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask
/** Serial version uid. */
private static final long serialVersionUID = 0L;
- /** Injected ignite logger. */
- @LoggerResource
- private IgniteLogger log;
-
/** {@inheritDoc} */
- @Override protected ComputeJob createJob(
- String name,
- @Nullable String path,
- int incIdx,
- String constId,
- Collection<String> groups,
- boolean check
- ) {
- return new SnapshotHandlerRestoreJob(name, path, constId, groups,
check);
+ @Override protected SnapshotHandlerRestoreJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args) {
+ return new SnapshotHandlerRestoreJob(name, args.snapshotPath(),
consId, args.cacheGroupNames(), args.check());
}
/** {@inheritDoc} */
@@ -100,52 +83,25 @@ public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask
}
/** Invokes all {@link SnapshotHandlerType#RESTORE} handlers locally. */
- private static class SnapshotHandlerRestoreJob extends ComputeJobAdapter {
+ private static class SnapshotHandlerRestoreJob extends
AbstractSnapshotVerificationJob {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
- /** Ignite instance. */
- @IgniteInstanceResource
- private IgniteEx ignite;
-
- /** Injected logger. */
- @LoggerResource
- private IgniteLogger log;
-
- /** Snapshot name. */
- private final String snpName;
-
- /** String representation of the consistent node ID. */
- private final String consistentId;
-
- /** Cache group names. */
- private final Collection<String> grps;
-
- /** Snapshot directory path. */
- private final String snpPath;
-
- /** If {@code true} check snapshot before restore. */
- private final boolean check;
-
/**
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
- * @param consistentId String representation of the consistent node ID.
+ * @param consId Consistent id of the related node.
* @param grps Cache group names.
* @param check If {@code true} check snapshot before restore.
*/
public SnapshotHandlerRestoreJob(
String snpName,
@Nullable String snpPath,
- String consistentId,
+ String consId,
Collection<String> grps,
boolean check
) {
- this.snpName = snpName;
- this.snpPath = snpPath;
- this.consistentId = consistentId;
- this.grps = grps;
- this.check = check;
+ super(snpName, snpPath, consId, grps, check);
}
/** {@inheritDoc} */
@@ -153,10 +109,10 @@ public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask
try {
IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
File snpDir = snpMgr.snapshotLocalDir(snpName, snpPath);
- SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpDir,
consistentId);
+ SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpDir,
consId);
return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
- new SnapshotHandlerContext(meta, grps, ignite.localNode(),
snpDir, false, check));
+ new SnapshotHandlerContext(meta, rqGrps,
ignite.localNode(), snpDir, false, check));
}
catch (IgniteCheckedException | IOException e) {
throw new IgniteException(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index 40fa00fcedf..b08c6dc6619 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -25,18 +25,12 @@ import java.util.Map;
import java.util.Objects;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
import
org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTaskV2;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTaskV2.reduce0;
@@ -52,15 +46,8 @@ public class SnapshotPartitionsVerifyTask extends
AbstractSnapshotVerificationTa
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override protected ComputeJob createJob(
- String name,
- String path,
- int incIdx,
- String constId,
- Collection<String> groups,
- boolean check
- ) {
- return new VerifySnapshotPartitionsJob(name, path, constId, groups,
check);
+ @Override protected VerifySnapshotPartitionsJob createJob(String name,
String consId, SnapshotPartitionsVerifyTaskArg args) {
+ return new VerifySnapshotPartitionsJob(name, args.snapshotPath(),
consId, args.cacheGroupNames(), args.check());
}
/** {@inheritDoc} */
@@ -69,36 +56,13 @@ public class SnapshotPartitionsVerifyTask extends
AbstractSnapshotVerificationTa
}
/** Job that collects update counters of snapshot partitions on the node
it executes. */
- private static class VerifySnapshotPartitionsJob extends ComputeJobAdapter
{
+ private static class VerifySnapshotPartitionsJob extends
AbstractSnapshotVerificationJob {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
- /** Ignite instance. */
- @IgniteInstanceResource
- private IgniteEx ignite;
-
- /** Injected logger. */
- @LoggerResource
- private IgniteLogger log;
-
- /** Snapshot name to validate. */
- private final String snpName;
-
- /** Snapshot directory path. */
- private final String snpPath;
-
- /** Consistent snapshot metadata file name. */
- private final String consId;
-
- /** Set of cache groups to be checked in the snapshot or {@code empty}
to check everything. */
- private final Collection<String> rqGrps;
-
- /** If {@code true} check snapshot before restore. */
- private final boolean check;
-
/**
* @param snpName Snapshot name to validate.
- * @param consId Consistent snapshot metadata file name.
+ * @param consId Consistent id of the related node.
* @param rqGrps Set of cache groups to be checked in the snapshot or
{@code empty} to check everything.
* @param snpPath Snapshot directory path.
* @param check If {@code true} check snapshot before restore.
@@ -110,11 +74,7 @@ public class SnapshotPartitionsVerifyTask extends
AbstractSnapshotVerificationTa
Collection<String> rqGrps,
boolean check
) {
- this.snpName = snpName;
- this.consId = consId;
- this.rqGrps = rqGrps;
- this.snpPath = snpPath;
- this.check = check;
+ super(snpName, snpPath, consId, rqGrps, check);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java
index 90624642654..c3c660d94b5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java
@@ -36,13 +36,13 @@ public class SnapshotPartitionsVerifyTaskArg extends
VisorDataTransferObject {
private static final long serialVersionUID = 0L;
/** Cache group names to be verified. */
- private Collection<String> grpNames;
+ @Nullable private Collection<String> grpNames;
/** The map of distribution of snapshot metadata pieces across the
cluster. */
private Map<ClusterNode, List<SnapshotMetadata>> clusterMetas;
/** Snapshot directory path. */
- private String snpPath;
+ @Nullable private String snpPath;
/** If {@code true} check snapshot integrity. */
private boolean check;
@@ -63,7 +63,7 @@ public class SnapshotPartitionsVerifyTaskArg extends
VisorDataTransferObject {
* @param check If {@code true} check snapshot integrity.
*/
public SnapshotPartitionsVerifyTaskArg(
- Collection<String> grpNames,
+ @Nullable Collection<String> grpNames,
Map<ClusterNode, List<SnapshotMetadata>> clusterMetas,
@Nullable String snpPath,
int incIdx,
@@ -79,7 +79,7 @@ public class SnapshotPartitionsVerifyTaskArg extends
VisorDataTransferObject {
/**
* @return Cache group names to be verified.
*/
- public Collection<String> cacheGroupNames() {
+ @Nullable public Collection<String> cacheGroupNames() {
return grpNames;
}
@@ -93,7 +93,7 @@ public class SnapshotPartitionsVerifyTaskArg extends
VisorDataTransferObject {
/**
* @return Snapshot directory path.
*/
- public String snapshotPath() {
+ @Nullable public String snapshotPath() {
return snpPath;
}