This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 39b5b461e3c IGNITE-17369 Added a snapshot exception if a data streamer
is loading data with no consistency guarantee. (#10286)
39b5b461e3c is described below
commit 39b5b461e3cf1a6f6f21afe455016e6406335baf
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Nov 7 12:01:23 2022 +0300
IGNITE-17369 Added a snapshot exception if a data streamer is loading data
with no consistency guarantee. (#10286)
---
.../apache/ignite/util/GridCommandHandlerTest.java | 67 ++++++
.../snapshot/DataStreamerUpdatesHandler.java | 59 ++++++
.../snapshot/IgniteSnapshotManager.java | 55 ++++-
.../persistence/snapshot/SnapshotHandler.java | 4 +-
.../snapshot/SnapshotHandlerContext.java | 16 +-
.../snapshot/SnapshotHandlerRestoreTask.java | 4 +-
.../snapshot/SnapshotHandlerWarningException.java | 34 +++
.../persistence/snapshot/SnapshotMetadata.java | 3 +-
.../snapshot/SnapshotOperationRequest.java | 40 ++++
.../snapshot/SnapshotPartitionsVerifyTask.java | 2 +-
.../processors/datastreamer/DataStreamerImpl.java | 12 ++
.../IgniteClusterSnapshotStreamerTest.java | 230 +++++++++++++++++++++
.../ignite/testsuites/IgniteSnapshotTestSuite.java | 4 +-
13 files changed, 515 insertions(+), 15 deletions(-)
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index fe1be09f520..358f143c69c 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -95,7 +95,13 @@ import
org.apache.ignite.internal.processors.cache.persistence.db.IgniteCacheGro
import
org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.dumpprocessors.ToFileDumpProcessor;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandler;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandler;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerContext;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerWarningException;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import
org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
@@ -124,6 +130,9 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.metric.LongMetric;
@@ -3061,6 +3070,64 @@ public class GridCommandHandlerTest extends
GridCommandHandlerClusterPerMethodAb
doClusterSnapshotCreate(true);
}
+ /**
+ * Test that 'not OK' status of snapshot operation is set if the operation
produces a warning.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testClusterCreateSnapshotWarning() throws Exception {
+ IgniteConfiguration cfg =
getConfiguration(getTestIgniteInstanceName(0));
+ cfg.getConnectorConfiguration().setHost("localhost");
+
+ cfg.setPluginProviders(new AbstractTestPluginProvider() {
+ /** {@inheritDoc} */
+ @Override public void initExtensions(PluginContext ctx,
ExtensionRegistry registry) {
+ super.initExtensions(ctx, registry);
+
+ // Simulates warning occurs at snapshot creation.
+ registry.registerExtension(SnapshotHandler.class, new
SnapshotHandler<Void>() {
+ /** {@inheritDoc} */
+ @Override public SnapshotHandlerType type() {
+ return SnapshotHandlerType.CREATE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void complete(String name,
+ Collection<SnapshotHandlerResult<Void>> results)
throws Exception {
+ throw new
SnapshotHandlerWarningException(DataStreamerUpdatesHandler.WRN_MSG);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Void
invoke(SnapshotHandlerContext ctx) {
+ return null;
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "SnapshotWarningSimulationPlugin";
+ }
+ });
+
+ IgniteEx ig = startGrid(cfg);
+ ig.cluster().state(ACTIVE);
+ createCacheAndPreload(ig, 100);
+
+ injectTestSystemOut();
+
+ CommandHandler hnd = new CommandHandler();
+
+ List<String> args = new ArrayList<>(F.asList("--snapshot", "create",
"testDsSnp", "--sync"));
+
+ int code = execute(hnd, args);
+
+ assertEquals(EXIT_CODE_UNEXPECTED_ERROR, code);
+
+ assertContains(log, testOut.toString(),
DataStreamerUpdatesHandler.WRN_MSG);
+ }
+
/**
* @param syncMode Execute operation synchrnously.
* @throws Exception If failed.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java
new file mode 100644
index 00000000000..a99ce2ce820
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.util.typedef.F;
+
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType.CREATE;
+
+/**
+ * A snapshot haldler that monitors and warns of inconsistent by nature
updates from DataStreamer which can issue
+ * data inconsistency in snapshot.
+ */
+public class DataStreamerUpdatesHandler implements SnapshotHandler<Boolean> {
+ /** */
+ public static final String WRN_MSG = "DataStreamer with property
'allowOverwrite' set to `false` was working " +
+ "during the snapshot creation. Such streaming updates are inconsistent
by nature and should be successfully " +
+ "finished before data usage. Snapshot might not be entirely restored.
However, you would be able to restore " +
+ "the caches which were not streamed into.";
+
+ /** {@inheritDoc} */
+ @Override public SnapshotHandlerType type() {
+ return CREATE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean invoke(SnapshotHandlerContext ctx) {
+ return ctx.streamerWarning();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void complete(String name,
Collection<SnapshotHandlerResult<Boolean>> results)
+ throws SnapshotHandlerWarningException {
+ Collection<UUID> nodes = F.viewReadOnly(results, r -> r.node().id(),
SnapshotHandlerResult::data);
+
+ if (!nodes.isEmpty()) {
+ throw new SnapshotHandlerWarningException(WRN_MSG + " Updates from
DataStreamer detected on the nodes: " +
+
nodes.stream().map(UUID::toString).collect(Collectors.joining(", ")));
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index e19e5c38a75..a08d8d372c4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -848,7 +848,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
log.info("Snapshot metafile has been created: " +
smf.getAbsolutePath());
- SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta,
req.groups(), cctx.localNode(), snpDir);
+ SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta,
req.groups(), cctx.localNode(),
+ snpDir, req.streamerWarning());
return new
SnapshotOperationResponse(handlers.invokeAll(SnapshotHandlerType.CREATE, ctx));
}
@@ -951,7 +952,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
handlers().execSvc.submit(() -> {
try {
- handlers.completeAll(SnapshotHandlerType.CREATE,
req.snapshotName(), clusterHndResults, req.nodes());
+ handlers.completeAll(SnapshotHandlerType.CREATE,
req.snapshotName(), clusterHndResults, req.nodes(),
+ req::warnings);
resultFut.onDone();
}
@@ -1015,7 +1017,15 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
synchronized (snpOpMux) {
if (clusterSnpFut != null) {
if (endFail.isEmpty() && snpReq.error() == null) {
- clusterSnpFut.onDone();
+ if (!F.isEmpty(snpReq.warnings())) {
+ IgniteException wrn = new IgniteException("Snapshot
task '" + snpReq.snapshotName() +
+ "' completed with the warnings:" + U.nl() + '\t' +
String.join(U.nl() + '\t',
+ snpReq.warnings()));
+
+ clusterSnpFut.onDone(wrn);
+ }
+ else
+ clusterSnpFut.onDone();
if (log.isInfoEnabled())
log.info(SNAPSHOT_FINISHED_MSG + snpReq);
@@ -1045,6 +1055,16 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
}
+ /**
+ * Sets the streamer warning flag to current snapshot process if it is
active.
+ */
+ public void streamerWarning() {
+ SnapshotOperationRequest snpTask = currentCreateRequest();
+
+ if (snpTask != null)
+ snpTask.streamerWarning(true);
+ }
+
/** @return Current create snapshot request. {@code Null} if there is no
create snapshot operation in progress. */
@Nullable public SnapshotOperationRequest currentCreateRequest() {
return clusterSnpReq;
@@ -2207,8 +2227,10 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
this.execSvc = execSvc;
// Register system default snapshot integrity check that is used
before the restore operation.
- SnapshotHandler<?> sysCheck = new
SnapshotPartitionsVerifyHandler(ctx.cache().context());
- handlers.put(sysCheck.type(), new
ArrayList<>(F.asList((SnapshotHandler<Object>)sysCheck)));
+ registerHandler(new
SnapshotPartitionsVerifyHandler(ctx.cache().context()));
+
+ // Register system default DataStreamer updates check.
+ registerHandler(new DataStreamerUpdatesHandler());
// Register custom handlers.
SnapshotHandler<Object>[] extHnds =
(SnapshotHandler<Object>[])ctx.plugins().extensions(SnapshotHandler.class);
@@ -2217,7 +2239,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
return;
for (SnapshotHandler<Object> extHnd : extHnds)
- handlers.computeIfAbsent(extHnd.type(), v -> new
ArrayList<>()).add(extHnd);
+ registerHandler(extHnd);
}
/**
@@ -2253,6 +2275,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @param snpName Snapshot name.
* @param res Results from all nodes and handlers with the specified
type.
* @param reqNodes Node IDs on which the handlers were executed.
+ * @param wrnsHnd A handler of snapshot operation warnings.
* @throws Exception If failed.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -2260,7 +2283,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
SnapshotHandlerType type,
String snpName,
Map<String, List<SnapshotHandlerResult<?>>> res,
- Collection<UUID> reqNodes
+ Collection<UUID> reqNodes,
+ Consumer<List<String>> wrnsHnd
) throws Exception {
if (res.isEmpty())
return;
@@ -2274,6 +2298,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
", rmtHnds=" + res.keySet() + "].");
}
+ List<String> wrns = new ArrayList<>();
+
for (SnapshotHandler hnd : hnds) {
List<SnapshotHandlerResult<?>> nodesRes =
res.get(hnd.getClass().getName());
@@ -2288,8 +2314,16 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
"The current operation will be aborted [missing=" +
missing + "].");
}
- hnd.complete(snpName, nodesRes);
+ try {
+ hnd.complete(snpName, nodesRes);
+ }
+ catch (SnapshotHandlerWarningException e) {
+ wrns.add(e.getMessage());
+ }
}
+
+ if (!F.isEmpty(wrns))
+ wrnsHnd.accept(wrns);
}
/**
@@ -2308,6 +2342,11 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
return new SnapshotHandlerResult<>(null, e, ctx.localNode());
}
}
+
+ /** */
+ private void registerHandler(SnapshotHandler hnd) {
+ handlers.computeIfAbsent(hnd.type(), v -> new
ArrayList<>()).add(hnd);
+ }
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
index 6357c7d43e8..93b767b018d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
@@ -61,10 +61,12 @@ public interface SnapshotHandler<T> extends Extension {
*
* @param name Snapshot name.
* @param results Results from all nodes.
+ * @throws SnapshotHandlerWarningException If a warning of snapshot
operation occured.
* @throws Exception If the snapshot operation needs to be aborted.
* @see SnapshotHandlerResult
*/
- public default void complete(String name,
Collection<SnapshotHandlerResult<T>> results) throws Exception {
+ public default void complete(String name,
Collection<SnapshotHandlerResult<T>> results)
+ throws SnapshotHandlerWarningException, Exception {
for (SnapshotHandlerResult<T> res : results) {
if (res.error() == null)
continue;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
index ffe18ec3c4e..37cdb529b97 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
@@ -38,17 +38,24 @@ public class SnapshotHandlerContext {
/** Local node. */
private final ClusterNode locNode;
+ /** Warning flag of concurrent inconsistent-by-nature streamer updates. */
+ private final boolean streamerWrn;
+
/**
* @param metadata Snapshot metadata.
* @param grps The names of the cache groups on which the operation is
performed.
+ * {@code False} otherwise. Always {@code false} for snapshot restoration.
* @param locNode Local node.
* @param snpDir The full path to the snapshot files.
+ * @param streamerWrn {@code True} if concurrent streaming updates occured
during snapshot operation.
*/
- public SnapshotHandlerContext(SnapshotMetadata metadata, @Nullable
Collection<String> grps, ClusterNode locNode, File snpDir) {
+ public SnapshotHandlerContext(SnapshotMetadata metadata, @Nullable
Collection<String> grps, ClusterNode locNode,
+ File snpDir, boolean streamerWrn) {
this.metadata = metadata;
this.grps = grps;
this.locNode = locNode;
this.snpDir = snpDir;
+ this.streamerWrn = streamerWrn;
}
/**
@@ -79,4 +86,11 @@ public class SnapshotHandlerContext {
public ClusterNode localNode() {
return locNode;
}
+
+ /**
+ * @return {@code True} if concurrent streaming updates occured during
snapshot operation. {@code False} otherwise.
+ */
+ public boolean streamerWarning() {
+ return streamerWrn;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
index 947085ab87f..afc72f98f46 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
@@ -85,7 +85,7 @@ public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask
try {
ignite.context().cache().context().snapshotMgr().handlers().completeAll(
- SnapshotHandlerType.RESTORE, snapshotName, clusterResults,
execNodes);
+ SnapshotHandlerType.RESTORE, snapshotName, clusterResults,
execNodes, wrns -> {});
}
catch (Exception e) {
log.warning("The snapshot operation will be aborted due to a
handler error [snapshot=" + snapshotName + "].", e);
@@ -142,7 +142,7 @@ public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask
SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpDir,
consistentId);
return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
- new SnapshotHandlerContext(meta, grps, ignite.localNode(),
snpDir));
+ new SnapshotHandlerContext(meta, grps, ignite.localNode(),
snpDir, false));
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerWarningException.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerWarningException.java
new file mode 100644
index 00000000000..dec861c2071
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerWarningException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Snapshot operation warning. Warnings do not interrupt snapshot process but
raise exception at the end to make the
+ * operation status 'not OK' if no other error occured.
+ */
+public class SnapshotHandlerWarningException extends IgniteCheckedException {
+ /** Serialization version. */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public SnapshotHandlerWarningException(String wrnMsg) {
+ super(wrnMsg);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index 009c0b8849b..65bd3131f8e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -253,7 +253,8 @@ public class SnapshotMetadata implements Serializable {
snpName.equals(meta.snpName) &&
consId.equals(meta.consId) &&
Objects.equals(grpIds, meta.grpIds) &&
- Objects.equals(bltNodes, meta.bltNodes);
+ Objects.equals(bltNodes, meta.bltNodes) &&
+ Arrays.equals(masterKeyDigest, meta.masterKeyDigest);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
index 14c72f661ea..8adcea9e5f9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.Serializable;
import java.util.Collection;
+import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
@@ -58,6 +59,17 @@ public class SnapshotOperationRequest implements
Serializable {
/** Exception occurred during snapshot operation processing. */
private volatile Throwable err;
+ /**
+ * Snapshot operation warnings. Warnings do not interrupt snapshot process
but raise exception at the end to make
+ * the operation status 'not OK' if no other error occured.
+ */
+ private volatile List<String> warnings;
+
+ /**
+ * Warning flag of concurrent inconsistent-by-nature streamer updates.
+ */
+ private transient volatile boolean streamerWrn;
+
/** Flag indicating that the {@link DistributedProcessType#START_SNAPSHOT}
phase has completed. */
private transient volatile boolean startStageEnded;
@@ -164,6 +176,34 @@ public class SnapshotOperationRequest implements
Serializable {
this.startStageEnded = startStageEnded;
}
+ /**
+ * @return Warnings of snapshot operation.
+ */
+ public List<String> warnings() {
+ return warnings;
+ }
+
+ /**
+ * @param warnings Warnings of snapshot operation.
+ */
+ public void warnings(List<String> warnings) {
+ this.warnings = warnings;
+ }
+
+ /**
+ * {@code True} If the streamer warning flag is set. {@code False}
otherwise.
+ */
+ public boolean streamerWarning() {
+ return streamerWrn;
+ }
+
+ /**
+ * Sets the streamer warning flag.
+ */
+ public boolean streamerWarning(boolean val) {
+ return streamerWrn = val;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SnapshotOperationRequest.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index 05f5856b903..d083e9afd1d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -117,7 +117,7 @@ public class SnapshotPartitionsVerifyTask extends
AbstractSnapshotVerificationTa
SnapshotMetadata meta =
cctx.snapshotMgr().readSnapshotMetadata(snpDir, consId);
return new SnapshotPartitionsVerifyHandler(cctx)
- .invoke(new SnapshotHandlerContext(meta, rqGrps,
ignite.localNode(), snpDir));
+ .invoke(new SnapshotHandlerContext(meta, rqGrps,
ignite.localNode(), snpDir, false));
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 1dca24a1647..a20fb307094 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -2255,6 +2255,8 @@ public class DataStreamerImpl<K, V> implements
IgniteDataStreamer<K, V>, Delayed
Collection<Integer> ignoredParts = new HashSet<>();
try {
+ snapshotWarning(cctx);
+
for (Entry<KeyCacheObject, CacheObject> e : entries) {
cctx.shared().database().checkpointReadLock();
@@ -2364,6 +2366,16 @@ public class DataStreamerImpl<K, V> implements
IgniteDataStreamer<K, V>, Delayed
}
}
}
+
+ /**
+ * Sets the streamer warning flag to current snapshot process if it is
active.
+ *
+ * @param cctx Cache context.
+ */
+ private static void snapshotWarning(GridCacheContext<?, ?> cctx) {
+ if (cctx.group().persistenceEnabled())
+
cctx.kernalContext().cache().context().snapshotMgr().streamerWarning();
+ }
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
new file mode 100644
index 00000000000..c784f3b6f2f
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/**
+ * Tests snapshot is consistent or snapshot process produces proper warning.
+ */
+public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest {
+ /** */
+ private static final String INMEM_DATA_REGION = "inMemDr";
+
+ /** */
+ private IgniteSnapshotManager snpMgr;
+
+ /** */
+ private IgniteEx client;
+
+ /** {@inheritDoc} */
+ @Override public void beforeTestSnapshot() throws Exception {
+ super.beforeTestSnapshot();
+
+ persistence = true;
+
+ dfltCacheCfg.setBackups(2);
+
+ startGrids(3);
+
+ grid(0).cluster().state(ACTIVE);
+
+ client = startClientGrid(G.allGrids().size());
+
+ snpMgr = snp(grid(0));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ // In-memory data region.
+ DataRegionConfiguration inMemDr = new DataRegionConfiguration();
+ inMemDr.setPersistenceEnabled(false);
+ inMemDr.setMaxSize(100L * 1024L * 1024L);
+ inMemDr.setInitialSize(inMemDr.getMaxSize());
+ inMemDr.setName(INMEM_DATA_REGION);
+ inMemDr.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU);
+ cfg.getDataStorageConfiguration().setDataRegionConfigurations(inMemDr);
+
+ return cfg;
+ }
+
+ /**
+ * Tests snapshot consistency wnen streamer starts before snapshot.
Default receiver.
+ */
+ @Test
+ public void testStreamerWhileSnapshotDefault() throws Exception {
+ doTestDataStreamerWhileSnapshot( false);
+ }
+
+ /**
+ * Tests snapshot consistency wnen streamer starts before snapshot.
Overwriting receiver.
+ */
+ @Test
+ public void testStreamerWhileSnapshotOverwriting() throws Exception {
+ doTestDataStreamerWhileSnapshot( true);
+ }
+
+ /**
+ * Tests not affected by streamer cache is restorable from snapshot.
+ */
+ @Test
+ public void testOtherCacheRestores() throws Exception {
+ String cname = "cache2";
+
+ grid(0).createCache(new
CacheConfiguration<>(dfltCacheCfg).setName(cname));
+
+ try (IgniteDataStreamer<Integer, Integer> ds =
grid(0).dataStreamer(cname)) {
+ for (int i = 0; i < 100; ++i)
+ ds.addData(i, i);
+ }
+
+ AtomicBoolean stopLoad = new AtomicBoolean();
+
+ IgniteInternalFuture<?> loadFut = runLoad(grid(0), false, stopLoad);
+
+ try {
+ assertThrows(null, () ->
snpMgr.createSnapshot(SNAPSHOT_NAME).get(), IgniteException.class,
+ DataStreamerUpdatesHandler.WRN_MSG);
+ }
+ finally {
+ stopLoad.set(true);
+ loadFut.get();
+ }
+
+ grid(0).destroyCache(cname);
+ grid(0).destroyCache(dfltCacheCfg.getName());
+
+ snpMgr.restoreSnapshot(SNAPSHOT_NAME,
Collections.singletonList(cname)).get();
+
+ for (int i = 0; i < 100; ++i)
+ assertEquals(i, grid(0).cache(cname).get(i));
+ }
+
+ /**
+ * Tests streaming into in-memory cache doesn't affect snapshot.
+ */
+ @Test
+ public void testStreamingIntoInMememoryDoesntAffectSnapshot() throws
Exception {
+ String cache2Name = "cache2";
+ int loadCnt = 1000;
+
+ grid(0).createCache(new
CacheConfiguration<>(dfltCacheCfg).setName(cache2Name));
+
+ try (IgniteDataStreamer<Object, Object> ds =
grid(0).dataStreamer(cache2Name)) {
+ for (int i = 0; i < loadCnt; ++i)
+ ds.addData(i, i);
+ }
+
+ grid(0).destroyCache(dfltCacheCfg.getName());
+ dfltCacheCfg.setDataRegionName(INMEM_DATA_REGION);
+ dfltCacheCfg.setEncryptionEnabled(false);
+ grid(0).createCache(dfltCacheCfg);
+
+ AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> loadFut = runLoad(grid(2), false, stop);
+
+ try {
+ snpMgr.createSnapshot(SNAPSHOT_NAME).get();
+ }
+ finally {
+ stop.set(true);
+ loadFut.get();
+ }
+
+ grid(0).destroyCache(cache2Name);
+
+ snpMgr.restoreSnapshot(SNAPSHOT_NAME, null).get();
+
+ for (int i = 0; i < loadCnt; ++i)
+ assertEquals(i, grid(0).cache(cache2Name).get(i));
+ }
+
+ /**
+ * Tests snapshot process throws warning if required.
+ *
+ * @param allowOverwrite 'allowOverwrite' setting.
+ */
+ private void doTestDataStreamerWhileSnapshot(boolean allowOverwrite)
throws Exception {
+ AtomicBoolean stopLoading = new AtomicBoolean(false);
+
+ IgniteInternalFuture<?> loadFut = runLoad(client, allowOverwrite,
stopLoading);
+
+ try {
+ if (allowOverwrite)
+ grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
+ else {
+ assertThrows(null, () ->
snpMgr.createSnapshot(SNAPSHOT_NAME).get(),
+ IgniteException.class, DataStreamerUpdatesHandler.WRN_MSG);
+ }
+ }
+ finally {
+ stopLoading.set(true);
+ loadFut.get();
+ }
+ }
+
+ /**
+ * Runs DataStreamer asynchronously. Waits while streamer pre-loads some
amount of data.
+ *
+ * @param ldr Loader node.
+ * @param allowOverwrite 'allowOverwrite' setting.
+ * @param stop Stop load flag.
+ */
+ private IgniteInternalFuture<?> runLoad(Ignite ldr, boolean
allowOverwrite, AtomicBoolean stop)
+ throws InterruptedException {
+ CountDownLatch preload = new CountDownLatch(10_000);
+
+ IgniteInternalFuture<?> res = GridTestUtils.runAsync(() -> {
+ try (IgniteDataStreamer<Integer, Object> ds =
ldr.dataStreamer(dfltCacheCfg.getName())) {
+ ds.allowOverwrite(allowOverwrite);
+
+ int idx = 0;
+
+ while (!stop.get()) {
+ ds.addData(++idx, idx);
+
+ preload.countDown();
+ }
+ }
+ }, "load-thread");
+
+ preload.await();
+
+ return res;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
index 7618ff25521..fccc3574389 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
@@ -22,6 +22,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCl
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotHandlerTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreSelfTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotStreamerTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWalRecordTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
@@ -46,7 +47,8 @@ import org.junit.runners.Suite;
IgniteSnapshotRestoreFromRemoteTest.class,
PlainSnapshotTest.class,
EncryptedSnapshotTest.class,
- IgniteClusterSnapshotWalRecordTest.class
+ IgniteClusterSnapshotWalRecordTest.class,
+ IgniteClusterSnapshotStreamerTest.class
})
public class IgniteSnapshotTestSuite {
}