This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 159526f9d1b IGNITE-19564 Fixed snapshot restore metrics is case of an
error on prepare. (#10735)
159526f9d1b is described below
commit 159526f9d1bfd5851595866c0b73945f2a1ddf82
Author: Vladimir Steshin <[email protected]>
AuthorDate: Wed May 31 13:47:33 2023 +0300
IGNITE-19564 Fixed snapshot restore metrics is case of an error on prepare.
(#10735)
---
.../snapshot/SnapshotRestoreProcess.java | 107 +++++++++++----------
.../snapshot/IgniteClusterSnapshotMetricsTest.java | 39 ++++++++
2 files changed, 93 insertions(+), 53 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index d34e4cedceb..08f2c127c48 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -514,13 +514,22 @@ public class SnapshotRestoreProcess {
log.info(OP_FINISHED_MSG + " [reqId=" + reqId + "].");
SnapshotRestoreContext opCtx0 = opCtx;
+ SnapshotRestoreContext lastOpCtx0 = lastOpCtx;
+
+ long endTime = U.currentTimeMillis();
if (opCtx0 != null && reqId.equals(opCtx0.reqId)) {
opCtx = null;
- opCtx0.endTime = U.currentTimeMillis();
+ opCtx0.endTime = endTime;
}
+ // Actual operation context may not be assigned because some previous
checks failed. The last operation context
+ // is assigned before the preparation to keep all operation statuses
including failures of the preceding checks.
+ // @See #prepare(SnapshotOperationRequest).
+ if (lastOpCtx0 != opCtx0 && reqId.equals(lastOpCtx0.reqId))
+ lastOpCtx0.endTime = endTime;
+
synchronized (this) {
ClusterSnapshotFuture fut0 = fut;
@@ -633,7 +642,23 @@ public class SnapshotRestoreProcess {
if (ctx.clientNode())
return new GridFinishedFuture<>();
+ if (log.isInfoEnabled()) {
+ log.info("Starting local snapshot prepare restore operation" +
+ " [reqId=" + req.requestId() +
+ ", snapshot=" + req.snapshotName() +
+ ", caches=" + req.groups() + ']');
+ }
+
+ SnapshotRestoreContext opCtx0 = new SnapshotRestoreContext(req);
+
try {
+ if (opCtx != null) {
+ throw new IgniteCheckedException(OP_REJECT_MSG +
+ "The previous snapshot restore operation was not
completed.");
+ }
+
+ lastOpCtx = opCtx0;
+
DiscoveryDataClusterState state = ctx.state().clusterState();
IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
@@ -662,25 +687,9 @@ public class SnapshotRestoreProcess {
}
}
- if (log.isInfoEnabled()) {
- log.info("Starting local snapshot prepare restore operation" +
- " [reqId=" + req.requestId() +
- ", snapshot=" + req.snapshotName() +
- ", caches=" + req.groups() + ']');
- }
-
List<SnapshotMetadata> locMetas =
snpMgr.readSnapshotMetadatas(req.snapshotName(), req.snapshotPath());
- SnapshotRestoreContext opCtx0 = prepareContext(req, locMetas);
-
- synchronized (this) {
- lastOpCtx = opCtx = opCtx0;
-
- ClusterSnapshotFuture fut0 = fut;
-
- if (fut0 != null)
- opCtx0.errHnd.accept(fut0.interruptEx);
- }
+ enrichContext(opCtx0, req, locMetas);
// Ensure that shared cache groups has no conflicts.
for (StoredCacheData cfg : opCtx0.cfgs.values()) {
@@ -693,9 +702,20 @@ public class SnapshotRestoreProcess {
if (ctx.isStopping())
throw new NodeStoppingException("The node is stopping: " +
ctx.localNodeId());
+ synchronized (this) {
+ ClusterSnapshotFuture fut0 = fut;
+
+ if (fut0 != null)
+ opCtx0.errHnd.accept(fut0.interruptEx);
+
+ opCtx = opCtx0;
+ }
+
return new GridFinishedFuture<>(new
SnapshotRestoreOperationResponse(opCtx0.cfgs.values(), locMetas));
}
- catch (IgniteIllegalStateException | IgniteCheckedException |
RejectedExecutionException e) {
+ catch (Exception e) {
+ opCtx0.errHnd.accept(e);
+
log.error("Unable to restore cache group(s) from the snapshot " +
"[reqId=" + req.requestId() + ", snapshot=" +
req.snapshotName() + ']', e);
@@ -724,31 +744,22 @@ public class SnapshotRestoreProcess {
}
/**
+ * @param curOpCtx Restore operation context to enrich.
* @param req Request to prepare cache group restore from the snapshot.
* @param metas Local snapshot metadatas.
- * @return Snapshot restore operation context.
* @throws IgniteCheckedException If failed.
*/
- private SnapshotRestoreContext prepareContext(
+ private void enrichContext(
+ SnapshotRestoreContext curOpCtx,
SnapshotOperationRequest req,
Collection<SnapshotMetadata> metas
) throws IgniteCheckedException {
- if (opCtx != null) {
- throw new IgniteCheckedException(OP_REJECT_MSG +
- "The previous snapshot restore operation was not completed.");
- }
- GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
-
- // Collection of baseline nodes that must survive and additional
discovery data required for the affinity calculation.
- DiscoCache discoCache = ctx.discovery().discoCache();
+ assert req.requestId().equals(curOpCtx.reqId);
- if (!F.transform(discoCache.aliveBaselineNodes(),
F.node2id()).containsAll(req.nodes()))
- throw new IgniteCheckedException("Restore context cannot be inited
since the required baseline nodes missed: " + discoCache);
-
- DiscoCache discoCache0 = discoCache.copy(discoCache.version(), null);
+ GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
if (F.isEmpty(metas))
- return new SnapshotRestoreContext(req, discoCache0,
Collections.emptyMap());
+ return;
if (F.first(metas).pageSize() != cctx.database().pageSize()) {
throw new IgniteCheckedException("Incompatible memory page size " +
@@ -826,10 +837,8 @@ public class SnapshotRestoreProcess {
}
}
- Map<Integer, StoredCacheData> cfgsById =
+ curOpCtx.cfgs =
cfgsByName.values().stream().collect(Collectors.toMap(v ->
CU.cacheId(v.config().getName()), v -> v));
-
- return new SnapshotRestoreContext(req, discoCache0, cfgsById);
}
/**
@@ -1005,7 +1014,7 @@ public class SnapshotRestoreProcess {
for (StoredCacheData data : opCtx0.cfgs.values()) {
affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
- grp -> calculateAffinity(ctx, data.config(),
opCtx0.discoCache));
+ grp -> calculateAffinity(ctx, data.config(),
ctx.discovery().discoCache()));
}
Map<Integer, Set<PartitionRestoreFuture>> allParts = new
HashMap<>();
@@ -1862,8 +1871,8 @@ public class SnapshotRestoreProcess {
/** Snapshot directory path. */
private final String snpPath;
- /** Baseline discovery cache for node IDs that must be alive to
complete the operation.*/
- private final DiscoCache discoCache;
+ /** IDs of the required nodes. */
+ private final Set<UUID> nodes;
/** Operational node id. */
private final UUID opNodeId;
@@ -1892,7 +1901,7 @@ public class SnapshotRestoreProcess {
/** Compressed groups. */
private final Set<Integer> comprGrps = new HashSet<>();
- /** Cache ID to configuration mapping. */
+ /** Cache ID to configuration mapping. Empty if the context is not
initialized yet. */
private volatile Map<Integer, StoredCacheData> cfgs =
Collections.emptyMap();
/** Graceful shutdown future. */
@@ -1925,37 +1934,29 @@ public class SnapshotRestoreProcess {
snpName = "";
startTime = 0;
opNodeId = null;
- discoCache = null;
+ nodes = null;
snpPath = null;
incIdx = 0;
}
/**
* @param req Request to prepare cache group restore from the snapshot.
- * @param discoCache Baseline discovery cache for node IDs that must
be alive to complete the operation.
- * @param cfgs Cache ID to configuration mapping.
*/
- protected SnapshotRestoreContext(
- SnapshotOperationRequest req,
- DiscoCache discoCache,
- Map<Integer, StoredCacheData> cfgs
- ) {
+ protected SnapshotRestoreContext(SnapshotOperationRequest req) {
reqId = req.requestId();
snpName = req.snapshotName();
snpPath = req.snapshotPath();
opNodeId = req.operationalNodeId();
incIdx = req.incrementIndex();
startTime = U.currentTimeMillis();
-
- this.discoCache = discoCache;
- this.cfgs = cfgs;
+ nodes = req.nodes();
}
/**
* @return Required baseline nodeIds that must be alive to complete
restore operation.
*/
public Collection<UUID> nodes() {
- return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+ return nodes;
}
/** */
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotMetricsTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotMetricsTest.java
index 4ba805c6fd0..4041b5a3dc7 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotMetricsTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotMetricsTest.java
@@ -232,6 +232,45 @@ public class IgniteClusterSnapshotMetricsTest extends
IgniteClusterSnapshotResto
assertTrue(failFlag.get());
}
+ /** @throws Exception If fails. */
+ @Test
+ public void testUnableToRestoreSnapshotError() throws Exception {
+ Ignite ignite = startGridsWithCache(DEDICATED_CNT, CACHE_KEYS_RANGE,
key -> key, dfltCacheCfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ checkMetricsDefaults();
+
+ try {
+ ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get();
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+
+ String errMsg = "Unable to restore cache group - directory is not
empty. Cache group should be destroyed " +
+ "manually before perform restore operation";
+
+ for (Ignite ig : G.allGrids()) {
+ DynamicMBean mReg = metricRegistry(ig.name(), null,
SNAPSHOT_RESTORE_METRICS);
+
+ assertTrue("Wrong 'endTime' metric on " + ig.name(),
+ GridTestUtils.waitForCondition(() -> getNumMetric("endTime",
mReg) > 0, TIMEOUT));
+
+ assertEquals("Wrong 'totalPartitions' metric on" + ig.name(), -1,
+ getNumMetric("totalPartitions", mReg));
+
+ assertEquals("Wrong 'processedPartitions' metric on " + ig.name(),
0,
+ getNumMetric("processedPartitions", mReg));
+
+ assertEquals("Wrong 'snapshotName' metric on " + ig.name(),
SNAPSHOT_NAME,
+ mReg.getAttribute("snapshotName"));
+
+ assertTrue("Wrong 'error' metric on " + ig.name(),
+ mReg.getAttribute("error").toString().contains(errMsg));
+ }
+ }
+
/** @throws Exception If fails. */
@Test
public void testCreateSnapshotProgress() throws Exception {