This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 39b5b461e3c IGNITE-17369 Added a snapshot exception if a data streamer 
is loading data with no consistency guarantee. (#10286)
39b5b461e3c is described below

commit 39b5b461e3cf1a6f6f21afe455016e6406335baf
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Nov 7 12:01:23 2022 +0300

    IGNITE-17369 Added a snapshot exception if a data streamer is loading data 
with no consistency guarantee. (#10286)
---
 .../apache/ignite/util/GridCommandHandlerTest.java |  67 ++++++
 .../snapshot/DataStreamerUpdatesHandler.java       |  59 ++++++
 .../snapshot/IgniteSnapshotManager.java            |  55 ++++-
 .../persistence/snapshot/SnapshotHandler.java      |   4 +-
 .../snapshot/SnapshotHandlerContext.java           |  16 +-
 .../snapshot/SnapshotHandlerRestoreTask.java       |   4 +-
 .../snapshot/SnapshotHandlerWarningException.java  |  34 +++
 .../persistence/snapshot/SnapshotMetadata.java     |   3 +-
 .../snapshot/SnapshotOperationRequest.java         |  40 ++++
 .../snapshot/SnapshotPartitionsVerifyTask.java     |   2 +-
 .../processors/datastreamer/DataStreamerImpl.java  |  12 ++
 .../IgniteClusterSnapshotStreamerTest.java         | 230 +++++++++++++++++++++
 .../ignite/testsuites/IgniteSnapshotTestSuite.java |   4 +-
 13 files changed, 515 insertions(+), 15 deletions(-)

diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index fe1be09f520..358f143c69c 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -95,7 +95,13 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.IgniteCacheGro
 import 
org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.dumpprocessors.ToFileDumpProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandler;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandler;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerContext;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerWarningException;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import 
org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
@@ -124,6 +130,9 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.PluginContext;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.metric.LongMetric;
@@ -3061,6 +3070,64 @@ public class GridCommandHandlerTest extends 
GridCommandHandlerClusterPerMethodAb
         doClusterSnapshotCreate(true);
     }
 
+    /**
+     * Test that 'not OK' status of snapshot operation is set if the operation 
produces a warning.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testClusterCreateSnapshotWarning() throws Exception {
+        IgniteConfiguration cfg = 
getConfiguration(getTestIgniteInstanceName(0));
+        cfg.getConnectorConfiguration().setHost("localhost");
+
+        cfg.setPluginProviders(new AbstractTestPluginProvider() {
+            /** {@inheritDoc} */
+            @Override public void initExtensions(PluginContext ctx, 
ExtensionRegistry registry) {
+                super.initExtensions(ctx, registry);
+
+                // Simulates warning occurs at snapshot creation.
+                registry.registerExtension(SnapshotHandler.class, new 
SnapshotHandler<Void>() {
+                    /** {@inheritDoc} */
+                    @Override public SnapshotHandlerType type() {
+                        return SnapshotHandlerType.CREATE;
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public void complete(String name,
+                        Collection<SnapshotHandlerResult<Void>> results) 
throws Exception {
+                        throw new 
SnapshotHandlerWarningException(DataStreamerUpdatesHandler.WRN_MSG);
+                    }
+
+                    /** {@inheritDoc} */
+                    @Nullable @Override public Void 
invoke(SnapshotHandlerContext ctx) {
+                        return null;
+                    }
+                });
+            }
+
+            /** {@inheritDoc} */
+            @Override public String name() {
+                return "SnapshotWarningSimulationPlugin";
+            }
+        });
+
+        IgniteEx ig = startGrid(cfg);
+        ig.cluster().state(ACTIVE);
+        createCacheAndPreload(ig, 100);
+
+        injectTestSystemOut();
+
+        CommandHandler hnd = new CommandHandler();
+
+        List<String> args = new ArrayList<>(F.asList("--snapshot", "create", 
"testDsSnp", "--sync"));
+
+        int code = execute(hnd, args);
+
+        assertEquals(EXIT_CODE_UNEXPECTED_ERROR, code);
+
+        assertContains(log, testOut.toString(), 
DataStreamerUpdatesHandler.WRN_MSG);
+    }
+
     /**
      * @param syncMode Execute operation synchrnously.
      * @throws Exception If failed.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java
new file mode 100644
index 00000000000..a99ce2ce820
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.util.typedef.F;
+
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType.CREATE;
+
+/**
+ * A snapshot haldler that monitors and warns of inconsistent by nature 
updates from DataStreamer which can issue
+ * data inconsistency in snapshot.
+ */
+public class DataStreamerUpdatesHandler implements SnapshotHandler<Boolean> {
+    /** */
+    public static final String WRN_MSG = "DataStreamer with property 
'allowOverwrite' set to `false` was working " +
+        "during the snapshot creation. Such streaming updates are inconsistent 
by nature and should be successfully " +
+        "finished before data usage. Snapshot might not be entirely restored. 
However, you would be able to restore " +
+        "the caches which were not streamed into.";
+
+    /** {@inheritDoc} */
+    @Override public SnapshotHandlerType type() {
+        return CREATE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean invoke(SnapshotHandlerContext ctx) {
+        return ctx.streamerWarning();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void complete(String name, 
Collection<SnapshotHandlerResult<Boolean>> results)
+        throws SnapshotHandlerWarningException {
+        Collection<UUID> nodes = F.viewReadOnly(results, r -> r.node().id(), 
SnapshotHandlerResult::data);
+
+        if (!nodes.isEmpty()) {
+            throw new SnapshotHandlerWarningException(WRN_MSG + " Updates from 
DataStreamer detected on the nodes: " +
+                
nodes.stream().map(UUID::toString).collect(Collectors.joining(", ")));
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index e19e5c38a75..a08d8d372c4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -848,7 +848,8 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
 
                 log.info("Snapshot metafile has been created: " + 
smf.getAbsolutePath());
 
-                SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, 
req.groups(), cctx.localNode(), snpDir);
+                SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, 
req.groups(), cctx.localNode(),
+                    snpDir, req.streamerWarning());
 
                 return new 
SnapshotOperationResponse(handlers.invokeAll(SnapshotHandlerType.CREATE, ctx));
             }
@@ -951,7 +952,8 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
 
             handlers().execSvc.submit(() -> {
                 try {
-                    handlers.completeAll(SnapshotHandlerType.CREATE, 
req.snapshotName(), clusterHndResults, req.nodes());
+                    handlers.completeAll(SnapshotHandlerType.CREATE, 
req.snapshotName(), clusterHndResults, req.nodes(),
+                        req::warnings);
 
                     resultFut.onDone();
                 }
@@ -1015,7 +1017,15 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         synchronized (snpOpMux) {
             if (clusterSnpFut != null) {
                 if (endFail.isEmpty() && snpReq.error() == null) {
-                    clusterSnpFut.onDone();
+                    if (!F.isEmpty(snpReq.warnings())) {
+                        IgniteException wrn = new IgniteException("Snapshot 
task '" + snpReq.snapshotName() +
+                            "' completed with the warnings:" + U.nl() + '\t' + 
String.join(U.nl() + '\t',
+                            snpReq.warnings()));
+
+                        clusterSnpFut.onDone(wrn);
+                    }
+                    else
+                        clusterSnpFut.onDone();
 
                     if (log.isInfoEnabled())
                         log.info(SNAPSHOT_FINISHED_MSG + snpReq);
@@ -1045,6 +1055,16 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         }
     }
 
+    /**
+     * Sets the streamer warning flag to current snapshot process if it is 
active.
+     */
+    public void streamerWarning() {
+        SnapshotOperationRequest snpTask = currentCreateRequest();
+
+        if (snpTask != null)
+            snpTask.streamerWarning(true);
+    }
+
     /** @return Current create snapshot request. {@code Null} if there is no 
create snapshot operation in progress. */
     @Nullable public SnapshotOperationRequest currentCreateRequest() {
         return clusterSnpReq;
@@ -2207,8 +2227,10 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
             this.execSvc = execSvc;
 
             // Register system default snapshot integrity check that is used 
before the restore operation.
-            SnapshotHandler<?> sysCheck = new 
SnapshotPartitionsVerifyHandler(ctx.cache().context());
-            handlers.put(sysCheck.type(), new 
ArrayList<>(F.asList((SnapshotHandler<Object>)sysCheck)));
+            registerHandler(new 
SnapshotPartitionsVerifyHandler(ctx.cache().context()));
+
+            // Register system default DataStreamer updates check.
+            registerHandler(new DataStreamerUpdatesHandler());
 
             // Register custom handlers.
             SnapshotHandler<Object>[] extHnds = 
(SnapshotHandler<Object>[])ctx.plugins().extensions(SnapshotHandler.class);
@@ -2217,7 +2239,7 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                 return;
 
             for (SnapshotHandler<Object> extHnd : extHnds)
-                handlers.computeIfAbsent(extHnd.type(), v -> new 
ArrayList<>()).add(extHnd);
+                registerHandler(extHnd);
         }
 
         /**
@@ -2253,6 +2275,7 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
          * @param snpName Snapshot name.
          * @param res Results from all nodes and handlers with the specified 
type.
          * @param reqNodes Node IDs on which the handlers were executed.
+         * @param wrnsHnd A handler of snapshot operation warnings.
          * @throws Exception If failed.
          */
         @SuppressWarnings({"rawtypes", "unchecked"})
@@ -2260,7 +2283,8 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
             SnapshotHandlerType type,
             String snpName,
             Map<String, List<SnapshotHandlerResult<?>>> res,
-            Collection<UUID> reqNodes
+            Collection<UUID> reqNodes,
+            Consumer<List<String>> wrnsHnd
         ) throws Exception {
             if (res.isEmpty())
                 return;
@@ -2274,6 +2298,8 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                     ", rmtHnds=" + res.keySet() + "].");
             }
 
+            List<String> wrns = new ArrayList<>();
+
             for (SnapshotHandler hnd : hnds) {
                 List<SnapshotHandlerResult<?>> nodesRes = 
res.get(hnd.getClass().getName());
 
@@ -2288,8 +2314,16 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                         "The current operation will be aborted [missing=" + 
missing + "].");
                 }
 
-                hnd.complete(snpName, nodesRes);
+                try {
+                    hnd.complete(snpName, nodesRes);
+                }
+                catch (SnapshotHandlerWarningException e) {
+                    wrns.add(e.getMessage());
+                }
             }
+
+            if (!F.isEmpty(wrns))
+                wrnsHnd.accept(wrns);
         }
 
         /**
@@ -2308,6 +2342,11 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                 return new SnapshotHandlerResult<>(null, e, ctx.localNode());
             }
         }
+
+        /** */
+        private void registerHandler(SnapshotHandler hnd) {
+            handlers.computeIfAbsent(hnd.type(), v -> new 
ArrayList<>()).add(hnd);
+        }
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
index 6357c7d43e8..93b767b018d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
@@ -61,10 +61,12 @@ public interface SnapshotHandler<T> extends Extension {
      *
      * @param name Snapshot name.
      * @param results Results from all nodes.
+     * @throws SnapshotHandlerWarningException If a warning of snapshot 
operation occured.
      * @throws Exception If the snapshot operation needs to be aborted.
      * @see SnapshotHandlerResult
      */
-    public default void complete(String name, 
Collection<SnapshotHandlerResult<T>> results) throws Exception {
+    public default void complete(String name, 
Collection<SnapshotHandlerResult<T>> results)
+        throws SnapshotHandlerWarningException, Exception {
         for (SnapshotHandlerResult<T> res : results) {
             if (res.error() == null)
                 continue;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
index ffe18ec3c4e..37cdb529b97 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
@@ -38,17 +38,24 @@ public class SnapshotHandlerContext {
     /** Local node. */
     private final ClusterNode locNode;
 
+    /** Warning flag of concurrent inconsistent-by-nature streamer updates. */
+    private final boolean streamerWrn;
+
     /**
      * @param metadata Snapshot metadata.
      * @param grps The names of the cache groups on which the operation is 
performed.
+     * {@code False} otherwise. Always {@code false} for snapshot restoration.
      * @param locNode Local node.
      * @param snpDir The full path to the snapshot files.
+     * @param streamerWrn {@code True} if concurrent streaming updates occured 
during snapshot operation.
      */
-    public SnapshotHandlerContext(SnapshotMetadata metadata, @Nullable 
Collection<String> grps, ClusterNode locNode, File snpDir) {
+    public SnapshotHandlerContext(SnapshotMetadata metadata, @Nullable 
Collection<String> grps, ClusterNode locNode,
+        File snpDir, boolean streamerWrn) {
         this.metadata = metadata;
         this.grps = grps;
         this.locNode = locNode;
         this.snpDir = snpDir;
+        this.streamerWrn = streamerWrn;
     }
 
     /**
@@ -79,4 +86,11 @@ public class SnapshotHandlerContext {
     public ClusterNode localNode() {
         return locNode;
     }
+
+    /**
+     * @return {@code True} if concurrent streaming updates occured during 
snapshot operation. {@code False} otherwise.
+     */
+    public boolean streamerWarning() {
+        return streamerWrn;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
index 947085ab87f..afc72f98f46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
@@ -85,7 +85,7 @@ public class SnapshotHandlerRestoreTask extends 
AbstractSnapshotVerificationTask
 
         try {
             
ignite.context().cache().context().snapshotMgr().handlers().completeAll(
-                SnapshotHandlerType.RESTORE, snapshotName, clusterResults, 
execNodes);
+                SnapshotHandlerType.RESTORE, snapshotName, clusterResults, 
execNodes, wrns -> {});
         }
         catch (Exception e) {
             log.warning("The snapshot operation will be aborted due to a 
handler error [snapshot=" + snapshotName + "].", e);
@@ -142,7 +142,7 @@ public class SnapshotHandlerRestoreTask extends 
AbstractSnapshotVerificationTask
                 SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpDir, 
consistentId);
 
                 return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
-                    new SnapshotHandlerContext(meta, grps, ignite.localNode(), 
snpDir));
+                    new SnapshotHandlerContext(meta, grps, ignite.localNode(), 
snpDir, false));
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerWarningException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerWarningException.java
new file mode 100644
index 00000000000..dec861c2071
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerWarningException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Snapshot operation warning. Warnings do not interrupt snapshot process but 
raise exception at the end to make the
+ * operation status 'not OK' if no other error occured.
+ */
+public class SnapshotHandlerWarningException extends IgniteCheckedException {
+    /** Serialization version. */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public SnapshotHandlerWarningException(String wrnMsg) {
+        super(wrnMsg);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index 009c0b8849b..65bd3131f8e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -253,7 +253,8 @@ public class SnapshotMetadata implements Serializable {
             snpName.equals(meta.snpName) &&
             consId.equals(meta.consId) &&
             Objects.equals(grpIds, meta.grpIds) &&
-            Objects.equals(bltNodes, meta.bltNodes);
+            Objects.equals(bltNodes, meta.bltNodes) &&
+            Arrays.equals(masterKeyDigest, meta.masterKeyDigest);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
index 14c72f661ea..8adcea9e5f9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.util.distributed.DistributedProcess;
@@ -58,6 +59,17 @@ public class SnapshotOperationRequest implements 
Serializable {
     /** Exception occurred during snapshot operation processing. */
     private volatile Throwable err;
 
+    /**
+     * Snapshot operation warnings. Warnings do not interrupt snapshot process 
but raise exception at the end to make
+     * the operation status 'not OK' if no other error occured.
+     */
+    private volatile List<String> warnings;
+
+    /**
+     * Warning flag of concurrent inconsistent-by-nature streamer updates.
+     */
+    private transient volatile boolean streamerWrn;
+
     /** Flag indicating that the {@link DistributedProcessType#START_SNAPSHOT} 
phase has completed. */
     private transient volatile boolean startStageEnded;
 
@@ -164,6 +176,34 @@ public class SnapshotOperationRequest implements 
Serializable {
         this.startStageEnded = startStageEnded;
     }
 
+    /**
+     * @return Warnings of snapshot operation.
+     */
+    public List<String> warnings() {
+        return warnings;
+    }
+
+    /**
+     * @param warnings Warnings of snapshot operation.
+     */
+    public void warnings(List<String> warnings) {
+        this.warnings = warnings;
+    }
+
+    /**
+     * {@code True} If the streamer warning flag is set. {@code False} 
otherwise.
+     */
+    public boolean streamerWarning() {
+        return streamerWrn;
+    }
+
+    /**
+     * Sets the streamer warning flag.
+     */
+    public boolean streamerWarning(boolean val) {
+        return streamerWrn = val;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SnapshotOperationRequest.class, this);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index 05f5856b903..d083e9afd1d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -117,7 +117,7 @@ public class SnapshotPartitionsVerifyTask extends 
AbstractSnapshotVerificationTa
                 SnapshotMetadata meta = 
cctx.snapshotMgr().readSnapshotMetadata(snpDir, consId);
 
                 return new SnapshotPartitionsVerifyHandler(cctx)
-                    .invoke(new SnapshotHandlerContext(meta, rqGrps, 
ignite.localNode(), snpDir));
+                    .invoke(new SnapshotHandlerContext(meta, rqGrps, 
ignite.localNode(), snpDir, false));
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 1dca24a1647..a20fb307094 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -2255,6 +2255,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             Collection<Integer> ignoredParts = new HashSet<>();
 
             try {
+                snapshotWarning(cctx);
+
                 for (Entry<KeyCacheObject, CacheObject> e : entries) {
                     cctx.shared().database().checkpointReadLock();
 
@@ -2364,6 +2366,16 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                 }
             }
         }
+
+        /**
+         * Sets the streamer warning flag to current snapshot process if it is 
active.
+         *
+         * @param cctx Cache context.
+         */
+        private static void snapshotWarning(GridCacheContext<?, ?> cctx) {
+            if (cctx.group().persistenceEnabled())
+                
cctx.kernalContext().cache().context().snapshotMgr().streamerWarning();
+        }
     }
 
     /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
new file mode 100644
index 00000000000..c784f3b6f2f
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/**
+ * Tests snapshot is consistent or snapshot process produces proper warning.
+ */
+public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest {
+    /** */
+    private static final String INMEM_DATA_REGION = "inMemDr";
+
+    /** */
+    private IgniteSnapshotManager snpMgr;
+
+    /** */
+    private IgniteEx client;
+
+    /** {@inheritDoc} */
+    @Override public void beforeTestSnapshot() throws Exception {
+        super.beforeTestSnapshot();
+
+        persistence = true;
+
+        dfltCacheCfg.setBackups(2);
+
+        startGrids(3);
+
+        grid(0).cluster().state(ACTIVE);
+
+        client = startClientGrid(G.allGrids().size());
+
+        snpMgr = snp(grid(0));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        // In-memory data region.
+        DataRegionConfiguration inMemDr = new DataRegionConfiguration();
+        inMemDr.setPersistenceEnabled(false);
+        inMemDr.setMaxSize(100L * 1024L * 1024L);
+        inMemDr.setInitialSize(inMemDr.getMaxSize());
+        inMemDr.setName(INMEM_DATA_REGION);
+        inMemDr.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU);
+        cfg.getDataStorageConfiguration().setDataRegionConfigurations(inMemDr);
+
+        return cfg;
+    }
+
+    /**
+     * Tests snapshot consistency wnen streamer starts before snapshot. 
Default receiver.
+     */
+    @Test
+    public void testStreamerWhileSnapshotDefault() throws Exception {
+        doTestDataStreamerWhileSnapshot( false);
+    }
+
+    /**
+     * Tests snapshot consistency wnen streamer starts before snapshot. 
Overwriting receiver.
+     */
+    @Test
+    public void testStreamerWhileSnapshotOverwriting() throws Exception {
+        doTestDataStreamerWhileSnapshot( true);
+    }
+
+    /**
+     * Tests not affected by streamer cache is restorable from snapshot.
+     */
+    @Test
+    public void testOtherCacheRestores() throws Exception {
+        String cname = "cache2";
+
+        grid(0).createCache(new 
CacheConfiguration<>(dfltCacheCfg).setName(cname));
+
+        try (IgniteDataStreamer<Integer, Integer> ds = 
grid(0).dataStreamer(cname)) {
+            for (int i = 0; i < 100; ++i)
+                ds.addData(i, i);
+        }
+
+        AtomicBoolean stopLoad = new AtomicBoolean();
+
+        IgniteInternalFuture<?> loadFut = runLoad(grid(0), false, stopLoad);
+
+        try {
+            assertThrows(null, () -> 
snpMgr.createSnapshot(SNAPSHOT_NAME).get(), IgniteException.class,
+                DataStreamerUpdatesHandler.WRN_MSG);
+        }
+        finally {
+            stopLoad.set(true);
+            loadFut.get();
+        }
+
+        grid(0).destroyCache(cname);
+        grid(0).destroyCache(dfltCacheCfg.getName());
+
+        snpMgr.restoreSnapshot(SNAPSHOT_NAME, 
Collections.singletonList(cname)).get();
+
+        for (int i = 0; i < 100; ++i)
+            assertEquals(i, grid(0).cache(cname).get(i));
+    }
+
+    /**
+     * Tests streaming into in-memory cache doesn't affect snapshot.
+     */
+    @Test
+    public void testStreamingIntoInMememoryDoesntAffectSnapshot() throws 
Exception {
+        String cache2Name = "cache2";
+        int loadCnt = 1000;
+
+        grid(0).createCache(new 
CacheConfiguration<>(dfltCacheCfg).setName(cache2Name));
+
+        try (IgniteDataStreamer<Object, Object> ds = 
grid(0).dataStreamer(cache2Name)) {
+            for (int i = 0; i < loadCnt; ++i)
+                ds.addData(i, i);
+        }
+
+        grid(0).destroyCache(dfltCacheCfg.getName());
+        dfltCacheCfg.setDataRegionName(INMEM_DATA_REGION);
+        dfltCacheCfg.setEncryptionEnabled(false);
+        grid(0).createCache(dfltCacheCfg);
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> loadFut = runLoad(grid(2), false, stop);
+
+        try {
+            snpMgr.createSnapshot(SNAPSHOT_NAME).get();
+        }
+        finally {
+            stop.set(true);
+            loadFut.get();
+        }
+
+        grid(0).destroyCache(cache2Name);
+
+        snpMgr.restoreSnapshot(SNAPSHOT_NAME, null).get();
+
+        for (int i = 0; i < loadCnt; ++i)
+            assertEquals(i, grid(0).cache(cache2Name).get(i));
+    }
+
+    /**
+     * Tests snapshot process throws warning if required.
+     *
+     * @param allowOverwrite 'allowOverwrite' setting.
+     */
+    private void doTestDataStreamerWhileSnapshot(boolean allowOverwrite) 
throws Exception {
+        AtomicBoolean stopLoading = new AtomicBoolean(false);
+
+        IgniteInternalFuture<?> loadFut = runLoad(client, allowOverwrite, 
stopLoading);
+
+        try {
+            if (allowOverwrite)
+                grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
+            else {
+                assertThrows(null, () -> 
snpMgr.createSnapshot(SNAPSHOT_NAME).get(),
+                    IgniteException.class, DataStreamerUpdatesHandler.WRN_MSG);
+            }
+        }
+        finally {
+            stopLoading.set(true);
+            loadFut.get();
+        }
+    }
+
+    /**
+     * Runs DataStreamer asynchronously. Waits while streamer pre-loads some 
amount of data.
+     *
+     * @param ldr Loader node.
+     * @param allowOverwrite 'allowOverwrite' setting.
+     * @param stop Stop load flag.
+     */
+    private IgniteInternalFuture<?> runLoad(Ignite ldr, boolean 
allowOverwrite, AtomicBoolean stop)
+        throws InterruptedException {
+        CountDownLatch preload = new CountDownLatch(10_000);
+
+        IgniteInternalFuture<?> res = GridTestUtils.runAsync(() -> {
+            try (IgniteDataStreamer<Integer, Object> ds = 
ldr.dataStreamer(dfltCacheCfg.getName())) {
+                ds.allowOverwrite(allowOverwrite);
+
+                int idx = 0;
+
+                while (!stop.get()) {
+                    ds.addData(++idx, idx);
+
+                    preload.countDown();
+                }
+            }
+        }, "load-thread");
+
+        preload.await();
+
+        return res;
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
index 7618ff25521..fccc3574389 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
@@ -22,6 +22,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCl
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotHandlerTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotStreamerTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWalRecordTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
@@ -46,7 +47,8 @@ import org.junit.runners.Suite;
     IgniteSnapshotRestoreFromRemoteTest.class,
     PlainSnapshotTest.class,
     EncryptedSnapshotTest.class,
-    IgniteClusterSnapshotWalRecordTest.class
+    IgniteClusterSnapshotWalRecordTest.class,
+    IgniteClusterSnapshotStreamerTest.class
 })
 public class IgniteSnapshotTestSuite {
 }


Reply via email to