This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2c9e07b095c IGNITE-19972 Added metrics of snapshot check (#12330)
2c9e07b095c is described below
commit 2c9e07b095c05dcf865ab7b837d54d00ceece593
Author: Vladimir Steshin <[email protected]>
AuthorDate: Wed Mar 18 15:13:59 2026 +0300
IGNITE-19972 Added metrics of snapshot check (#12330)
---
.../snapshot/IgniteSnapshotManager.java | 2 +-
.../snapshot/IncrementalSnapshotVerify.java | 27 +-
.../persistence/snapshot/SnapshotCheckProcess.java | 152 ++++++++--
.../persistence/snapshot/SnapshotChecker.java | 47 ++-
.../snapshot/SnapshotHandlerContext.java | 25 +-
.../snapshot/SnapshotHandlerRestoreTask.java | 73 -----
.../snapshot/SnapshotPartitionsVerifyHandler.java | 23 +-
.../main/resources/META-INF/classnames.properties | 2 -
.../snapshot/AbstractSnapshotSelfTest.java | 18 +-
.../snapshot/IgniteClusterSnapshotCheckTest.java | 324 ++++++++++++++++++++-
10 files changed, 561 insertions(+), 132 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index cdd0eb3af30..ed01716a600 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -1070,7 +1070,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
);
SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta,
req.groups(), cctx.localNode(), snpOp.snapshotFileTree(),
- snpOp.streamerWarning(), true);
+ snpOp.streamerWarning(), true, null, null);
snpOp.meta(meta);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerify.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerify.java
index a206415a498..094dceb1820 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerify.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerify.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -50,6 +51,7 @@ import
org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;
@@ -71,11 +73,26 @@ public class IncrementalSnapshotVerify implements
Supplier<IncrementalSnapshotVe
private LongAdder procEntriesCnt;
/** */
- public IncrementalSnapshotVerify(IgniteEx ignite, IgniteLogger log,
SnapshotFileTree sft, int incrementalIdx) {
+ @Nullable private final Consumer<Integer> totalCnsmr;
+
+ /** */
+ @Nullable private final Consumer<Integer> checkedCnsmr;
+
+ /** */
+ public IncrementalSnapshotVerify(
+ IgniteEx ignite,
+ IgniteLogger log,
+ SnapshotFileTree sft,
+ int incrementalIdx,
+ @Nullable Consumer<Integer> totalCnsmr,
+ @Nullable Consumer<Integer> checkedCnsmr
+ ) {
this.ignite = ignite;
this.log = log;
this.sft = sft;
- this.incIdx = incrementalIdx;
+ incIdx = incrementalIdx;
+ this.totalCnsmr = totalCnsmr;
+ this.checkedCnsmr = checkedCnsmr;
}
/**
@@ -106,11 +123,15 @@ public class IncrementalSnapshotVerify implements
Supplier<IncrementalSnapshotVe
ignite.context().cache().context(), sft, incIdx,
txCaches.keySet()
) {
@Override void totalWalSegments(int segCnt) {
- // No-op.
+ if (totalCnsmr != null)
+ totalCnsmr.accept(segCnt);
}
@Override void processedWalSegments(int segCnt) {
procSegCnt.set(segCnt);
+
+ if (checkedCnsmr != null)
+ checkedCnsmr.accept(segCnt);
}
@Override void initWalEntries(LongAdder entriesCnt) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
index 29bd175cb9c..1baacc7d978 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
@@ -44,9 +44,12 @@ import
org.apache.ignite.internal.management.cache.IdleVerifyResult;
import org.apache.ignite.internal.management.cache.PartitionKey;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -60,6 +63,9 @@ import static
org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id;
/** Distributed process of snapshot checking. */
public class SnapshotCheckProcess {
+ /** */
+ public static final String SNAPSHOT_CHECK_METRIC = "snapshot-check";
+
/** */
private final IgniteLogger log;
@@ -157,6 +163,8 @@ public class SnapshotCheckProcess {
return new GridFinishedFuture<>();
}
finally {
+ unregisterMetrics(ctx.req.snapshotName());
+
if (log.isInfoEnabled())
log.info("Finished snapshot validation [req=" + ctx.req + ']');
}
@@ -314,6 +322,8 @@ public class SnapshotCheckProcess {
if (F.isEmpty(ctx.metas))
return new GridFinishedFuture<>();
+ ctx.totalCounter.set(0);
+
GridFutureAdapter<SnapshotCheckResponse> phaseFut = new
GridFutureAdapter<>();
CompletableFuture<SnapshotCheckResponse> workingFut;
@@ -346,7 +356,11 @@ public class SnapshotCheckProcess {
CompletableFuture<SnapshotCheckResponse> resFut = new
CompletableFuture<>();
CompletableFuture<IncrementalSnapshotVerifyResult> workingFut =
snpChecker.checkIncrementalSnapshot(
- ctx.locFileTree.get(meta.consistentId()),
ctx.req.incrementalIndex());
+ ctx.locFileTree.get(meta.consistentId()),
+ ctx.req.incrementalIndex(),
+ ctx.totalCounter::addAndGet,
+ ctx.checkedCounter::addAndGet
+ );
workingFut.whenComplete((res, err) -> {
if (err != null)
@@ -368,22 +382,29 @@ public class SnapshotCheckProcess {
AtomicInteger metasProcessed = new AtomicInteger(ctx.metas.size());
for (SnapshotMetadata meta : ctx.metas) {
- CompletableFuture<Map<PartitionKey, PartitionHashRecord>> metaFut
= snpChecker.checkPartitions(
- meta,
- ctx.locFileTree.get(meta.consistentId()),
- ctx.req.groups(),
- false,
- ctx.req.fullCheck()
- );
-
- metaFut.whenComplete((res, err) -> {
- if (err != null)
- exceptions.put(meta.consistentId(), err);
- else if (!F.isEmpty(res))
- perMetaResults.put(meta.consistentId(), res);
-
- if (metasProcessed.decrementAndGet() == 0)
- composedFut.complete(new
SnapshotCheckResponse(perMetaResults, exceptions));
+ // Run asynchronously to calculate the metric 'total partitions'
faster.
+ kctx.pools().getSnapshotExecutorService().submit(() -> {
+ CompletableFuture<Map<PartitionKey, PartitionHashRecord>>
metaFut = snpChecker.checkPartitions(
+ meta,
+ ctx.locFileTree.get(meta.consistentId()),
+ ctx.req.groups(),
+ false,
+ ctx.req.fullCheck(),
+ ctx.totalCounter::addAndGet,
+ checkedPartId -> ctx.checkedCounter.incrementAndGet()
+ );
+
+ metaFut.whenComplete((res, err) -> {
+ ctx.checkedSnapshotParts.incrementAndGet();
+
+ if (err != null)
+ exceptions.put(meta.consistentId(), err);
+ else if (!F.isEmpty(res))
+ perMetaResults.put(meta.consistentId(), res);
+
+ if (metasProcessed.decrementAndGet() == 0)
+ composedFut.complete(new
SnapshotCheckResponse(perMetaResults, exceptions));
+ });
});
}
@@ -403,17 +424,28 @@ public class SnapshotCheckProcess {
AtomicInteger metasProcessed = new AtomicInteger(ctx.metas.size());
for (SnapshotMetadata meta : ctx.metas) {
- CompletableFuture<Map<String, SnapshotHandlerResult<Object>>>
metaFut = snpChecker.invokeCustomHandlers(meta,
- ctx.locFileTree.get(meta.consistentId()), ctx.req.groups(),
ctx.req.fullCheck());
-
- metaFut.whenComplete((res, err) -> {
- if (err != null)
- exceptions.put(meta.consistentId(), err);
- else if (!F.isEmpty(res))
- perMetaResults.put(meta.consistentId(), res);
-
- if (metasProcessed.decrementAndGet() == 0)
- composedFut.complete(new
SnapshotCheckResponse(perMetaResults, exceptions));
+ // Run asynchronously to calculate the metric 'total partitions'
faster.
+ kctx.pools().getSnapshotExecutorService().submit(() -> {
+ CompletableFuture<Map<String, SnapshotHandlerResult<Object>>>
metaFut = snpChecker.invokeCustomHandlers(
+ meta,
+ ctx.locFileTree.get(meta.consistentId()),
+ ctx.req.groups(),
+ ctx.req.fullCheck(),
+ ctx.totalCounter::addAndGet,
+ processedPart -> ctx.checkedCounter.incrementAndGet()
+ );
+
+ metaFut.whenComplete((res, err) -> {
+ ctx.checkedSnapshotParts.incrementAndGet();
+
+ if (err != null)
+ exceptions.put(meta.consistentId(), err);
+ else if (!F.isEmpty(res))
+ perMetaResults.put(meta.consistentId(), res);
+
+ if (metasProcessed.decrementAndGet() == 0)
+ composedFut.complete(new
SnapshotCheckResponse(perMetaResults, exceptions));
+ });
});
}
@@ -465,6 +497,8 @@ public class SnapshotCheckProcess {
if (!baseline(kctx.localNodeId()))
return new GridFinishedFuture<>();
+ registerMetrics(ctx);
+
Collection<Integer> grpIds = F.isEmpty(req.groups()) ? null :
F.viewReadOnly(req.groups(), CU::cacheId);
GridFutureAdapter<SnapshotCheckResponse> phaseFut = new
GridFutureAdapter<>();
@@ -548,8 +582,11 @@ public class SnapshotCheckProcess {
phase2PartsHashes.start(reqId, ctx.req);
}
catch (Throwable th) {
- if (ctx != null)
+ if (ctx != null) {
+ unregisterMetrics(ctx.req.snapshotName());
+
contexts.remove(ctx.req.snapshotName());
+ }
if (clusterOpFut != null)
clusterOpFut.onDone(th);
@@ -562,12 +599,13 @@ public class SnapshotCheckProcess {
*
* @return Metadatas to process on current node.
*/
- private @Nullable List<SnapshotMetadata> assingMetas(Map<ClusterNode,
List<SnapshotMetadata>> clusterMetas) {
+ private List<SnapshotMetadata> assingMetas(Map<ClusterNode,
List<SnapshotMetadata>> clusterMetas) {
ClusterNode locNode = kctx.cluster().get().localNode();
List<SnapshotMetadata> locMetas = clusterMetas.get(locNode);
+ // Might be empty due to a cache's node filter.
if (F.isEmpty(locMetas))
- return null;
+ return Collections.emptyList();
Set<String> onlineNodesConstIdsStr = new
HashSet<>(clusterMetas.size());
// The nodes are sorted with lesser order.
@@ -681,6 +719,42 @@ public class SnapshotCheckProcess {
return null;
}
+ /** */
+ private void registerMetrics(SnapshotCheckContext ctx) {
+ MetricRegistryImpl mreg =
kctx.metric().registry(MetricUtils.metricName(SNAPSHOT_CHECK_METRIC,
ctx.req.snapshotName()));
+
+ assert !mreg.iterator().hasNext();
+ assert ctx.req.requestId() != null;
+
+ mreg.register("startTime", U::currentTimeMillis,
+ "The system time of the start of the cluster snapshot check
operation on current node.");
+
+ if (ctx.req.incrementalIndex() > 0) {
+ mreg.register("incrementIndex", ctx.req::incrementalIndex,
+ "The index of incremental snapshot of the snapshot check
operation.");
+ mreg.register("totalWalSegments", ctx.totalCounter::get,
+ "The total number of WAL segments in the incremental snapshot
to check on current node.");
+ mreg.register("processedWalSegments", ctx.checkedCounter::get,
+ "The number of checked WAL segments in the incremental
snapshot on current node.");
+ }
+ else {
+ mreg.register("checkPartitions", ctx.req::fullCheck, "Shows
whether full validation of snapshot partitions is enabled.");
+ mreg.register("totalPartitions", ctx.totalCounter::get, "The total
number of partitions to check on current node.");
+ mreg.register("processedPartitions", ctx.checkedCounter::get, "The
number of checked partitions on current node.");
+
+ // Node can hold and process another nodes' snapshot data.
+ mreg.register("snapshotPartsToProcess", () -> ctx.metas == null ?
-1 : ctx.metas.size(),
+ "Number of parts (nodes data) of snapshot to check on current
node.");
+ mreg.register("processedSnapshotParts",
ctx.checkedSnapshotParts::get,
+ "Number of checked snapshot parts (nodes data) on current
node.");
+ }
+ }
+
+ /** */
+ private void unregisterMetrics(String snpName) {
+ kctx.metric().remove(MetricUtils.metricName(SNAPSHOT_CHECK_METRIC,
snpName));
+ }
+
/** Operation context. */
private static final class SnapshotCheckContext {
/** Request. */
@@ -691,7 +765,7 @@ public class SnapshotCheckProcess {
* Metadatas to process on this node. Also indicates the snapshot
parts to check on this node.
* @see #partitionsHashesFuture(SnapshotCheckContext)
*/
- @Nullable private List<SnapshotMetadata> metas;
+ @Nullable private volatile List<SnapshotMetadata> metas;
/** Map of snapshot pathes per consistent id for {@link #metas}. */
@GridToStringInclude
@@ -700,9 +774,23 @@ public class SnapshotCheckProcess {
/** All the snapshot metadatas. */
@Nullable private Map<ClusterNode, List<SnapshotMetadata>>
clusterMetas;
+ /** Common counter of total work units to process on current node. */
+ @GridToStringExclude
+ private final AtomicInteger totalCounter = new AtomicInteger(-1);
+
+ /** Common counter of checked work units on current node. */
+ @GridToStringExclude
+ private final AtomicInteger checkedCounter = new AtomicInteger(0);
+
+ /** Number of checked snapshot parts (nodes data/consistent ids) on
current node. {@code Null} for incremental snapshots. */
+ @GridToStringExclude
+ @Nullable private final AtomicInteger checkedSnapshotParts;
+
/** Creates operation context. */
private SnapshotCheckContext(SnapshotCheckProcessRequest req) {
this.req = req;
+
+ checkedSnapshotParts = req.incrementalIndex() > 0 ? null : new
AtomicInteger(0);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 9c16c0be0a7..4f81025d0a2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -72,12 +73,14 @@ public class SnapshotChecker {
/** */
public CompletableFuture<IncrementalSnapshotVerifyResult>
checkIncrementalSnapshot(
SnapshotFileTree sft,
- int incIdx
+ int incIdx,
+ @Nullable Consumer<Integer> totalCnsmr,
+ @Nullable Consumer<Integer> checkedCnsmr
) {
assert incIdx > 0;
return CompletableFuture.supplyAsync(
- new IncrementalSnapshotVerify(kctx.grid(), log, sft, incIdx),
+ new IncrementalSnapshotVerify(kctx.grid(), log, sft, incIdx,
totalCnsmr, checkedCnsmr),
executor
);
}
@@ -160,12 +163,31 @@ public class SnapshotChecker {
SnapshotMetadata meta,
SnapshotFileTree sft,
@Nullable Collection<String> grps,
- boolean check
+ boolean check,
+ @Nullable Consumer<Integer> totalCnsmr,
+ @Nullable Consumer<Integer> processedCnsmr
) {
// The handlers use or may use the same snapshot pool. If it is
configured with 1 thread, launching waiting task in
// the same pool might block it.
- return CompletableFuture.supplyAsync(
- new SnapshotHandlerRestoreTask(kctx.grid(), log, sft, grps, check)
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ SnapshotHandlerContext hndCnt = new SnapshotHandlerContext(
+ meta,
+ grps,
+ kctx.cluster().get().localNode(),
+ sft,
+ false,
+ check,
+ totalCnsmr == null ? null : (hndCls, totalCnt) ->
totalCnsmr.accept(totalCnt),
+ processedCnsmr == null ? null : (hndCls, partId) ->
processedCnsmr.accept(partId)
+ );
+
+ return
kctx.cache().context().snapshotMgr().handlers().invokeAll(SnapshotHandlerType.RESTORE,
hndCnt);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
);
}
@@ -175,12 +197,21 @@ public class SnapshotChecker {
SnapshotFileTree sft,
@Nullable Collection<String> grps,
boolean forCreation,
- boolean checkParts
+ boolean checkParts,
+ @Nullable Consumer<Integer> totalCnsmr,
+ @Nullable Consumer<Integer> checkedPartCnsmr
) {
- // Await in the default executor to avoid blocking the snapshot
executor if it has just one thread.
return CompletableFuture.supplyAsync(() -> {
SnapshotHandlerContext hctx = new SnapshotHandlerContext(
- meta, grps, kctx.cluster().get().localNode(), sft, false,
checkParts);
+ meta,
+ grps,
+ kctx.cluster().get().localNode(),
+ sft,
+ false,
+ checkParts,
+ totalCnsmr == null ? null : (hndCls, unitsToWork) ->
totalCnsmr.accept(unitsToWork),
+ checkedPartCnsmr == null ? null : (hndCls, partId) ->
checkedPartCnsmr.accept(partId)
+ );
try {
return new
SnapshotPartitionsVerifyHandler(kctx.cache().context()).invoke(hctx);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
index 6f84c968184..90516ac1374 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.util.Collection;
+import java.util.function.BiConsumer;
import org.apache.ignite.cluster.ClusterNode;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import org.jetbrains.annotations.Nullable;
@@ -44,6 +45,12 @@ public class SnapshotHandlerContext {
/** If {@code true}, calculates and compares partition hashes. Otherwise,
only basic snapshot validation is launched.*/
private final boolean check;
+ /** Consumer of total work units per handler type. */
+ private final BiConsumer<Class<? extends SnapshotHandler<?>>, Integer>
totalCnsmr;
+
+ /** Consumer of processed work unit per handler type. */
+ private final BiConsumer<Class<? extends SnapshotHandler<?>>, Integer>
progressCnsmr;
+
/**
* @param metadata Snapshot metadata.
* @param grps The names of the cache groups on which the operation is
performed.
@@ -52,6 +59,8 @@ public class SnapshotHandlerContext {
* @param sft Snapshot file tree.
* @param streamerWrn {@code True} if concurrent streaming updates
occurred during snapshot operation.
* @param check If {@code true}, calculates and compares partition hashes.
Otherwise, only basic snapshot validation is launched.
+ * @param totalCnsmr Consumer of total work units per handler type.
+ * @param progressCnsmr Consumer of processed work unit per handler type.
*/
public SnapshotHandlerContext(
SnapshotMetadata metadata,
@@ -59,7 +68,9 @@ public class SnapshotHandlerContext {
ClusterNode locNode,
SnapshotFileTree sft,
boolean streamerWrn,
- boolean check
+ boolean check,
+ @Nullable BiConsumer<Class<? extends SnapshotHandler<?>>, Integer>
totalCnsmr,
+ @Nullable BiConsumer<Class<? extends SnapshotHandler<?>>, Integer>
progressCnsmr
) {
this.metadata = metadata;
this.grps = grps;
@@ -67,6 +78,8 @@ public class SnapshotHandlerContext {
this.sft = sft;
this.streamerWrn = streamerWrn;
this.check = check;
+ this.totalCnsmr = totalCnsmr;
+ this.progressCnsmr = progressCnsmr;
}
/**
@@ -109,4 +122,14 @@ public class SnapshotHandlerContext {
public boolean check() {
return check;
}
+
+ /** @return Consumer of total work units per handler type. */
+ public @Nullable BiConsumer<Class<? extends SnapshotHandler<?>>, Integer>
totalConsumer() {
+ return totalCnsmr;
+ }
+
+ /** @return Consumer of processed work unit per handler type. */
+ public @Nullable BiConsumer<Class<? extends SnapshotHandler<?>>, Integer>
progressConsumer() {
+ return progressCnsmr;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
deleted file mode 100644
index dccb0bc7c84..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.function.Supplier;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteEx;
-import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
-
-/**
- * Snapshot restore operation handling task.
- */
-public class SnapshotHandlerRestoreTask implements Supplier<Map<String,
SnapshotHandlerResult<Object>>> {
- /** */
- private final IgniteEx ignite;
-
- /** */
- private final SnapshotFileTree sft;
-
- /** */
- private final Collection<String> rqGrps;
-
- /** */
- private final boolean check;
-
- /** */
- SnapshotHandlerRestoreTask(
- IgniteEx ignite,
- IgniteLogger log,
- SnapshotFileTree sft,
- Collection<String> grps,
- boolean check
- ) {
- this.ignite = ignite;
- this.sft = sft;
- this.rqGrps = grps;
- this.check = check;
- }
-
- /** */
- @Override public Map<String, SnapshotHandlerResult<Object>> get() {
- try {
- IgniteSnapshotManager snpMgr =
ignite.context().cache().context().snapshotMgr();
- SnapshotMetadata meta = snpMgr.readSnapshotMetadata(sft.meta());
-
- return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
- new SnapshotHandlerContext(meta, rqGrps, ignite.localNode(),
sft, false, check));
- }
- catch (IgniteCheckedException | IOException e) {
- throw new IgniteException(e);
- }
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 8619b53fc02..597e95cc58c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
@@ -158,6 +159,12 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
continue;
partFiles.add(part);
+
+ if (opCtx.totalConsumer() != null)
+ opCtx.totalConsumer().accept(getClass(), 1);
+
+ if (!opCtx.check() && opCtx.progressConsumer() != null)
+ opCtx.progressConsumer().accept(getClass(), partId);
}
if (parts.isEmpty())
@@ -175,14 +182,16 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
}
if (!opCtx.check()) {
- log.info("Snapshot data integrity check skipped [snpName=" +
meta.snapshotName() + ']');
+ if (log.isInfoEnabled())
+ log.info("Snapshot data integrity check skipped [snpName=" +
meta.snapshotName() + ']');
return Collections.emptyMap();
}
return meta.dump()
? checkDumpFiles(opCtx, partFiles)
- : checkSnapshotFiles(opCtx.snapshotFileTree(), grpDirs, meta,
partFiles, isPunchHoleEnabled(opCtx, grpDirs.keySet()));
+ : checkSnapshotFiles(opCtx.snapshotFileTree(), grpDirs, meta,
partFiles, isPunchHoleEnabled(opCtx, grpDirs.keySet()),
+ opCtx.progressConsumer() == null ? null : partId ->
opCtx.progressConsumer().accept(getClass(), partId));
}
/** */
@@ -191,9 +200,11 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
Map<Integer, List<File>> grpDirs,
SnapshotMetadata meta,
Set<File> partFiles,
- boolean punchHoleEnabled
+ boolean punchHoleEnabled,
+ @Nullable Consumer<Integer> checkedCnsmr
) throws IgniteCheckedException {
- Map<PartitionKey, PartitionHashRecord> res = new ConcurrentHashMap<>();
+ Map<PartitionKey, PartitionHashRecord> res = new
ConcurrentHashMap<>(partFiles.size(), 1.0f);
+
ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() ->
ByteBuffer.allocateDirect(meta.pageSize())
.order(ByteOrder.nativeOrder()));
@@ -313,6 +324,10 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
catch (IOException e) {
throw new IgniteCheckedException(e);
}
+ finally {
+ if (checkedCnsmr != null)
+ checkedCnsmr.accept(partId);
+ }
return null;
}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 5f22afc2a72..cb6e27862f6 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1305,8 +1305,6 @@
org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnap
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesFailureMessage
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesRequestMessage
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerRestoreTask
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerRestoreTask$SnapshotHandlerRestoreJob
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 715b771d611..b74dfddcae4 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -88,6 +88,7 @@ import
org.apache.ignite.internal.processors.marshaller.MappedName;
import
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -180,19 +181,22 @@ public abstract class AbstractSnapshotSelfTest extends
GridCommonAbstractTest {
/** Parameters. */
@Parameterized.Parameters(name = "encryption={0}, onlyPrimay={1}")
public static Collection<Object[]> params() {
- boolean[] encVals = DISK_PAGE_COMPRESSION !=
DiskPageCompression.DISABLED
- ? new boolean[] {false}
- : new boolean[] {false, true};
-
List<Object[]> res = new ArrayList<>();
- for (boolean enc: encVals)
- for (boolean onlyPrimary: new boolean[] {true, false})
- res.add(new Object[] { enc, onlyPrimary});
+ for (boolean enc : encryptionParameters())
+ for (boolean onlyPrimary : new boolean[] {true, false})
+ res.add(new Object[] {enc, onlyPrimary});
return res;
}
+ /** */
+ protected static Collection<Boolean> encryptionParameters() {
+ return DISK_PAGE_COMPRESSION != DiskPageCompression.DISABLED
+ ? F.asList(false)
+ : F.asList(false, true);
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index 93f175a15a0..306d44224a4 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
@@ -47,6 +48,7 @@ import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -76,6 +78,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
@@ -94,21 +97,31 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.spi.metric.BooleanMetric;
+import org.apache.ignite.spi.metric.IntMetric;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.ListeningTestLogger;
import org.jetbrains.annotations.Nullable;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runners.Parameterized;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_THREAD_POOL_SIZE;
import static
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
import static
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckProcess.SNAPSHOT_CHECK_METRIC;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
+import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CHECK_SNAPSHOT_METAS;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CHECK_SNAPSHOT_PARTS;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.assertNotContains;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.junit.Assume.assumeFalse;
@@ -116,6 +129,9 @@ import static org.junit.Assume.assumeFalse;
* Cluster-wide snapshot check procedure tests.
*/
public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
+ /** The default cache partitions number. */
+ protected static final int CACHE_PARTS_CNT = 32;
+
/** Map of intermediate compute task results collected prior performing
reduce operation on them. */
private final Map<Class<?>, Map<PartitionKey, List<PartitionHashRecord>>>
jobResults = new ConcurrentHashMap<>();
@@ -125,12 +141,44 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
/** Optional cache name to be created on demand. */
private static final String OPTIONAL_CACHE_NAME = "CacheName";
+ /** */
+ private ListeningTestLogger listeningLog;
+
+ /** */
+ @Parameterized.Parameter(2)
+ public int snpThrdPoolSz;
+
+ /** Parameters. */
+ @Parameterized.Parameters(name = "encryption={0}, onlyPrimary={1},
snpThrdPoolSz={2}")
+ public static Collection<Object[]> params() {
+ return cartesianProduct(
+ encryptionParameters(),
+ F.asList(false, true),
+ F.asList(DFLT_SNAPSHOT_THREAD_POOL_SIZE, 1)
+ );
+ }
+
/** Cleanup data of task execution results if need. */
@Before
public void beforeCheck() {
jobResults.clear();
}
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg =
super.getConfiguration(igniteInstanceName).setSnapshotThreadPoolSize(snpThrdPoolSz);
+
+ if (listeningLog != null)
+ cfg.setGridLogger(listeningLog);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected <K, V> CacheConfiguration<K, V>
txCacheConfig(CacheConfiguration<K, V> ccfg) {
+ return super.txCacheConfig(ccfg).setAffinity(new
RendezvousAffinityFunction(false, CACHE_PARTS_CNT));
+ }
+
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotCheck() throws Exception {
@@ -553,6 +601,279 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
assertNotContains(log, b.toString(), "Failed to read page (CRC
validation failed)");
}
+ /** */
+ @Test
+ public void testSnapshotCheckMetricsEntireTopology() throws Exception {
+ doTestSnapshotMetricsAllRuns(true, false);
+ }
+
+ /** */
+ @Test
+ public void testSnapshotCheckMetricsLesserTopology() throws Exception {
+ doTestSnapshotMetricsAllRuns(false, false);
+ }
+
+ /** */
+ @Test
+ public void testIncrementalSnapshotCheckMetrics() throws Exception {
+ assumeFalse(encryption);
+
+ doTestSnapshotMetricsAllRuns(true, true);
+ }
+
+ /** */
+ private void doTestSnapshotMetricsAllRuns(boolean entireTop, boolean
incremental) throws Exception {
+ assert entireTop || !incremental : "Incremental snapshot supports only
entire topology";
+
+ listeningLog = new ListeningTestLogger(log);
+
+ prepareGridsAndSnapshot(3, 2, 1, false);
+
+ if (incremental) {
+ try (IgniteDataStreamer<Integer, Integer> ds =
grid(0).dataStreamer(DEFAULT_CACHE_NAME)) {
+ for (int i = 0; i < 100; ++i)
+ ds.addData(i, i);
+ }
+
+ grid(1).snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get();
+ }
+
+ grid(1).destroyCache(DEFAULT_CACHE_NAME);
+
+ awaitPartitionMapExchange();
+
+ if (!entireTop)
+ stopGrid(0);
+
+ // From a baseline, non-baseline and client (0,1 - servers, 2 -
baseline, 3 - client).
+ for (int initiator : F.asList(1, 2, 3)) {
+ // Check partitions hashes.
+ for (boolean fullCheck : F.asList(true, false)) {
+ // Invoke all the snapshot handlers.
+ for (boolean allHandlers : F.asList(false, true)) {
+ // Restore snapshot instead of just checking.
+ for (boolean restore : F.asList(false, true)) {
+ // The all-handlers-task currently fails on client
because it has no snapshot handlers. Thus,
+ // a 'handlers mismatch configuration' error occures.
+ // Also, snapshot restoration isn't allowed from
clients.
+ if (initiator > 2 && (allHandlers || restore))
+ continue;
+ // On the restoration, all the snapshot checking
handlers are always invoked for a non-incremental snapshot.
+ // Incremental snapshot doesn't support snapshot
handlers and do not check partitions.
+ if (restore && !allHandlers || incremental &&
(allHandlers || fullCheck))
+ continue;
+
+ if (log.isInfoEnabled()) {
+ log.info("Testing snapshot metrics with the
parameters: entireTop=" + entireTop
+ + "initiator=" + initiator + ", fullCheck=" +
fullCheck + ", allHandlers=" + allHandlers
+ + ", restore=" + restore + ", incremental=" +
incremental);
+ }
+
+ if (entireTop)
+ doTestSnapshotCheckMetricsCertainRun(initiator,
incremental, fullCheck, allHandlers, restore);
+ else
+ doTestSnapshotCheckMetricsCertainRun(initiator,
incremental, fullCheck, allHandlers, restore, 0);
+
+ if (restore) {
+ grid(1).destroyCache(DEFAULT_CACHE_NAME);
+
+ awaitPartitionMapExchange();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /** */
+ private void doTestSnapshotCheckMetricsCertainRun(
+ int initiator,
+ boolean incremental,
+ boolean fullCheck,
+ boolean allHandlers,
+ boolean restore,
+ int... stoppedNodes
+ ) throws Exception {
+ assert !restore || allHandlers;
+ assert !incremental || (!allHandlers && stoppedNodes.length == 0 &&
!fullCheck);
+
+ Set<Integer> stoppedNodes0 = Collections.emptySet();
+ int coordId = 0;
+
+ if (stoppedNodes.length > 0) {
+ stoppedNodes0 =
IntStream.of(stoppedNodes).boxed().collect(Collectors.toSet());
+
+ for (int i = 0; i < G.allGrids().size(); ++i) {
+ if (stoppedNodes0.contains(i))
+ continue;
+
+ coordId = i;
+
+ break;
+ }
+ }
+
+ assert U.isLocalNodeCoordinator(grid(coordId).context().discovery());
+
+ Set<Object> baseline =
grid(coordId).cluster().currentBaselineTopology().stream().map(BaselineNode::consistentId)
+ .collect(Collectors.toSet());
+
+ IgniteInternalFuture<SnapshotPartitionsVerifyResult> fut1 = null;
+ IgniteFuture<?> fut2 = null;
+
+ try {
+ discoSpi(grid(coordId)).block(msg -> msg instanceof FullMessage
+ && ((FullMessage<?>)msg).type() ==
CHECK_SNAPSHOT_METAS.ordinal());
+
+ long mills = System.currentTimeMillis();
+
+ if (restore) {
+ fut2 = snp(grid(initiator)).restoreSnapshot(SNAPSHOT_NAME,
null, null, incremental ? 1 : 0,
+ fullCheck);
+
+ assertTrue(!fut2.isDone());
+ }
+ else {
+ fut1 = snp(grid(initiator)).checkSnapshot(SNAPSHOT_NAME, null,
null, allHandlers, incremental ? 1 : 0,
+ fullCheck);
+
+ assertTrue(!fut1.isDone());
+ }
+
+ discoSpi(grid(coordId)).waitBlocked(getTestTimeout());
+
+ for (int i = 0; i < G.allGrids().size(); ++i) {
+ if (stoppedNodes0.contains(i))
+ continue;
+
+ MetricRegistry mreg =
grid(i).context().metric().registry(metricName(SNAPSHOT_CHECK_METRIC,
SNAPSHOT_NAME));
+
+ // Clients and non-baseline nodes doesn't have data and
doesn't run snapshots.
+ if (!baseline.contains(grid(i).localNode().consistentId())) {
+ assertFalse(mreg.iterator().hasNext());
+
+ continue;
+ }
+
+ assertTrue(mreg.iterator().hasNext());
+
+ assertTrue(mreg.<LongMetric>findMetric("startTime").value() >
mills);
+
+ if (incremental) {
+ assertEquals(1,
mreg.<IntMetric>findMetric("incrementIndex").value());
+
+ // Full-snapshot metrics aren't expected.
+
assertNull(mreg.<BooleanMetric>findMetric("checkPartitions"));
+
assertNull(mreg.<BooleanMetric>findMetric("processedPartitions"));
+
assertNull(mreg.<BooleanMetric>findMetric("snapshotPartsToProcess"));
+
assertNull(mreg.<BooleanMetric>findMetric("processedSnapshotParts"));
+
assertNull(mreg.<BooleanMetric>findMetric("totalPartitions"));
+
+ // Initial metrics, aren't set yet.
+ assertEquals(-1,
mreg.<IntMetric>findMetric("totalWalSegments").value());
+ assertEquals(0,
mreg.<IntMetric>findMetric("processedWalSegments").value());
+ }
+ else {
+ assertNull(mreg.findMetric("incrementIndex"));
+ assertEquals(fullCheck,
mreg.<BooleanMetric>findMetric("checkPartitions").value());
+
+ // Incremental snapshot metrics aren't expected.
+ assertNull(mreg.<IntMetric>findMetric("totalWalSegments"));
+
assertNull(mreg.<IntMetric>findMetric("processedWalSegments"));
+
+ // Initial metrics, aren't set yet.
+ assertEquals(0,
mreg.<IntMetric>findMetric("processedPartitions").value());
+ assertEquals(-1,
mreg.<IntMetric>findMetric("snapshotPartsToProcess").value());
+ assertEquals(0,
mreg.<IntMetric>findMetric("processedSnapshotParts").value());
+ assertEquals(-1,
mreg.<IntMetric>findMetric("totalPartitions").value());
+ }
+ }
+
+ discoSpi(grid(coordId)).blockNextAndRelease(msg -> msg instanceof
FullMessage
+ && ((FullMessage<?>)msg).type() ==
CHECK_SNAPSHOT_PARTS.ordinal());
+
+ discoSpi(grid(coordId)).waitBlocked(getTestTimeout());
+
+ int totalSnpPartsDetected = 0;
+ boolean moreThatOneSnpPartProcessed = false;
+
+ for (int i = 0; i < G.allGrids().size(); ++i) {
+ if (stoppedNodes0.contains(i))
+ continue;
+
+ MetricRegistry mreg =
grid(i).context().metric().registry(metricName(SNAPSHOT_CHECK_METRIC,
SNAPSHOT_NAME));
+
+ // Clients and non-baseline nodes doesn't have data and
doesn't run snapshots.
+ if (!baseline.contains(grid(i).localNode().consistentId())) {
+ assertFalse(mreg.iterator().hasNext());
+
+ continue;
+ }
+
+ assertTrue(mreg.iterator().hasNext());
+
+ if (incremental) {
+ int walSegmentsDetected =
mreg.<IntMetric>findMetric("totalWalSegments").value();
+
+ assertTrue(walSegmentsDetected > 0);
+ assertEquals(walSegmentsDetected,
mreg.<IntMetric>findMetric("processedWalSegments").value());
+ }
+ else {
+ int snpPartsDetected =
mreg.<IntMetric>findMetric("processedSnapshotParts").value();
+
+ assertTrue(snpPartsDetected > 0);
+
+ totalSnpPartsDetected += snpPartsDetected;
+
+ assertEquals(snpPartsDetected,
mreg.<IntMetric>findMetric("snapshotPartsToProcess").value());
+
+ if (snpPartsDetected > 1)
+ moreThatOneSnpPartProcessed = true;
+
+ SnapshotFileTree sft = snapshotFileTree(grid(i),
SNAPSHOT_NAME);
+
+ long partsFilesCnt = Stream.of(new File(sft.nodeStorage(),
"cache-" + DEFAULT_CACHE_NAME).list())
+ .filter(fn -> fn.endsWith(".bin")).count();
+
+ // Partitions + metastorage.
+
assertEquals(mreg.<IntMetric>findMetric("totalPartitions").value(),
snpPartsDetected * (partsFilesCnt + 1));
+
+
assertEquals(mreg.<IntMetric>findMetric("totalPartitions").value(),
+
mreg.<IntMetric>findMetric("processedPartitions").value());
+ }
+ }
+
+ if (!incremental) {
+ assertEquals(baseline.size(), totalSnpPartsDetected);
+ assertTrue(stoppedNodes0.isEmpty() ||
moreThatOneSnpPartProcessed);
+ }
+ }
+ finally {
+ discoSpi(grid(coordId)).unblock();
+
+ if (fut1 != null) {
+ assert fut2 == null;
+
+ fut1.get(getTestTimeout());
+ }
+ else
+ fut2.get(getTestTimeout());
+ }
+
+ for (int i = 0; i < G.allGrids().size(); ++i) {
+ if (stoppedNodes0.contains(i))
+ continue;
+
+ int i0 = i;
+
+ waitForCondition(() -> {
+ MetricRegistry mreg =
grid(i0).context().metric().registry(metricName(SNAPSHOT_CHECK_METRIC,
SNAPSHOT_NAME));
+
+ return !mreg.iterator().hasNext();
+ }, getTestTimeout());
+ }
+ }
+
/** */
@Test
public void testCheckFromLesserTopology() throws Exception {
@@ -1074,7 +1395,8 @@ public class IgniteClusterSnapshotCheckTest extends
AbstractSnapshotSelfTest {
}
try (IgniteDataStreamer<Integer, Integer> ds =
grid(0).dataStreamer(DEFAULT_CACHE_NAME)) {
- for (int i = 0; i < 100; ++i)
+ // Ensure all the partitions are created: several records per
partition.
+ for (int i = 0; i < CACHE_PARTS_CNT * 4; ++i)
ds.addData(i, i);
}