This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1876f0bd20f IGNITE-17435 Updated data streamer documentation in case 
of a snapshot process. (#10273)
1876f0bd20f is described below

commit 1876f0bd20f9bf851ed505d029ad349a4b98fe27
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Nov 7 12:06:24 2022 +0300

    IGNITE-17435 Updated data streamer documentation in case of a snapshot 
process. (#10273)
---
 docs/_docs/data-streaming.adoc                     |  95 ++++++++++++--------
 docs/_docs/images/data_streaming.png               | Bin 159011 -> 106187 bytes
 docs/_docs/snapshots/snapshots.adoc                |   2 +
 docs/_docs/sql-reference/operational-commands.adoc |   7 +-
 .../java/org/apache/ignite/IgniteDataStreamer.java |  25 +++++-
 .../java/org/apache/ignite/IgniteSnapshot.java     |   2 +
 .../datastreamer/DataStreamerCacheUpdaters.java    |  11 +--
 .../processors/datastreamer/DataStreamerImpl.java  |  12 +++
 .../datastreamer/DataStreamerImplSelfTest.java     |  97 +++++++++++++++++++++
 .../apache/ignite/testframework/LogListener.java   |   2 +
 10 files changed, 206 insertions(+), 47 deletions(-)

diff --git a/docs/_docs/data-streaming.adoc b/docs/_docs/data-streaming.adoc
index 8736ec50e21..3b0ceaa1c86 100644
--- a/docs/_docs/data-streaming.adoc
+++ b/docs/_docs/data-streaming.adoc
@@ -18,23 +18,19 @@
 
 == Overview
 
-Ignite provides a Data Streaming API that can be used to inject large amounts 
of continuous streams of data into an Ignite cluster.
-The Data Streaming API is designed to be scalable and fault-tolerant, and 
provides _at-least-once_ delivery semantics for the data streamed into Ignite, 
meaning each entry is processed at least once.
-
-Data is streamed into a cache via a <<Data Streamers, data streamer>> 
associated with the cache. Data streamers automatically buffer the data and 
group it into batches for better performance and send it in parallel to 
multiple nodes.
-
-The Data Streaming API provides the following features:
-
-* The data that is added to a data streamer is automatically partitioned and 
distributed between the nodes.
-* You can process the data concurrently in a colocated fashion.
-* Clients can perform concurrent SQL queries on the data as it is being 
streamed in.
+Ignite provides a Data Streaming API that can be used to inject large amounts 
of data into an Ignite cluster.
+The main goal of streaming is efficient, quick data loading. The data that is 
added to the streamer is
+automatically organized and distributed between the nodes in partition-aware 
and parallel manner.
 
 image:images/data_streaming.png[Data Streaming]
 
-== Data Streamers
+The Data Streaming API is designed to be scalable and provides _at-least-once_ 
delivery semantics
+for the data streamed into Ignite, meaning each entry is processed at least 
once.
+
+== Usage
 A data streamer is associated with a specific cache and provides an interface 
for streaming data into the cache.
 
-In a typical scenario, you obtain a data streamer and use one of its methods 
to stream data into the cache, and Ignite takes care of data partitioning and 
colocation by batching data entries according to partitioning rules to avoid 
unnecessary data movement.
+In a typical scenario, you obtain a data streamer and use one of its methods 
to stream data into the cache, and Ignite takes care of the rest.
 
 You can obtain the data streamer for a specific cache as follows:
 [tabs]
@@ -57,62 +53,87 @@ 
include::code-snippets/dotnet/DataStreaming.cs[tag=dataStreamer1,indent=0]
 tab:C++[unsupported]
 --
 
-== Overwriting Existing Keys
+The streamer can accept data to load by multiple threads.
+
+A good use of streaming is data preloading.
+
+== Limitations
+DataStreamer doesn't guarantee:
+[]
+- By default, data consistency until successfully finished;
+- Immediate data loading. Data can be kept for a while before loading;
+- Data order. Data records may be loaded into a cache in a different order 
compared to load into the streamer;
+- By default, working with external storages.
 
-By default, data streamers do not overwrite existing data and skip entries 
that are already in the cache. You can change that behavior by setting the 
`allowOverwrite` property of the data streamer to `true`.
+If <<overwritting, 'allowOverwrite'>> property is 'false' (default), consider:
+[]
+- You should not have the same keys repeating in the data being streamed;
+- Streamer cancelation or streamer node failure can cause data inconsistency;
+- If loading into a persistent cache, concurrently created snapshot may 
contain inconsistent data and might not be restored entirely.
+
+Most important behavior of Ignite Data Streamer is defined by <<receivers, 
stream receiver>> and
+<<overwritting, 'allowOverwite' setting.>>
+
+== Streamer receivers. [[receivers]]
+
+Ignite DataStreamer is an orchestrator and doesn't write data itself. 
link:{javadoc_base_url}/org/apache/ignite/stream/StreameReceiver.html[StreamerReceiver]
 does. The default receiver is designed for fastest load and fewer network 
requests. With this receiver, streamer focuses on parallel transfer of backup 
and primary records.
+
+You can set your own receiver. See <<transfomer>> and <<visitor>>. The logic 
implemented in a stream receiver is executed on the node where data is to be 
stored.
 
 [tabs]
 --
 tab:Java[]
 [source,java]
 ----
-include::{javaFile}[tag=dataStreamer2,indent=0]
+include::{javaFile}[tag=streamReceiver,indent=0]
 ----
 
 tab:C#/.NET[]
 [source,csharp]
 ----
-include::code-snippets/dotnet/DataStreaming.cs[tag=dataStreamer2,indent=0]
+include::code-snippets/dotnet/DataStreaming.cs[tag=streamReceiver,indent=0]
 ----
 
 tab:C++[unsupported]
 --
 
-NOTE: When `allowOverwrite` is set to `false` (default), the updates are not 
propagated to the link:persistence/external-storage[external storage] (if it is 
used).
+[IMPORTANT]
+====
+The class definitions of the stream receivers to be executed on remote nodes 
must be available on the nodes. This can be achieved in two ways:
 
-== Processing Data
-In cases when you need to execute custom logic before adding new data, you can 
use a stream receiver.
-A stream receiver is used to process the data in a colocated manner before it 
is stored into the cache.
-The logic implemented in a stream receiver is executed on the node where data 
is to be stored.
+* Add the classes to the classpath of the nodes;
+* Enable link:code-deployment/peer-class-loading[peer class loading].
+====
+
+Changing receiver to non-default changes data distribution algorithm. With 
non-default receiver streamer sends data batches only to primary node receiver. 
And primary node needs another requests to send backup writes.
+
+NOTE: A stream receiver does not put data into cache automatically. You need 
to call one of the `put` methods explicitly.
+
+
+== Overwritting data. [[overwritting]]
+
+By default, existing keys aren't overwritten. You can change that behavior by 
setting the `allowOverwrite` property of the data streamer to `true`. Since the 
default receiver does not overwrite data, other provided one is automatically 
chosen. Any non-default receiver is considered as not-overwriting. And 
`allowOverwrite` property says `true`. However, your own receiver may use 
`putIfAbsent` for instance.
+
+NOTE: When `allowOverwrite` is set to `false` (default), the updates are not 
propagated to the link:persistence/external-storage[external storage] (if it is 
used).
 
 [tabs]
 --
 tab:Java[]
 [source,java]
 ----
-include::{javaFile}[tag=streamReceiver,indent=0]
+include::{javaFile}[tag=dataStreamer2,indent=0]
 ----
 
 tab:C#/.NET[]
 [source,csharp]
 ----
-include::code-snippets/dotnet/DataStreaming.cs[tag=streamReceiver,indent=0]
+include::code-snippets/dotnet/DataStreaming.cs[tag=dataStreamer2,indent=0]
 ----
 
 tab:C++[unsupported]
 --
 
-NOTE: Note that a stream receiver does not put data into the cache 
automatically. You need to call one of the `put(...)` methods explicitly.
-
-[IMPORTANT]
-====
-The class definitions of the stream receivers to be executed on remote nodes 
must be available on the nodes. This can be achieved in two ways:
-
-* Add the classes to the classpath of the nodes;
-* Enable link:code-deployment/peer-class-loading[peer class loading].
-====
-
-=== Stream Transformer
+=== Stream Transformer [[transfomer]]
 A stream transformer is a convenient implementation of a stream receiver, that 
updates the data in the stream.
 Stream transformers take advantage of the colocation feature and update the 
data on the node where it is going to be stored.
 
@@ -135,9 +156,9 @@ 
include::code-snippets/dotnet/DataStreaming.cs[tag=streamTransformer,indent=0]
 tab:C++[unsupported]
 --
 
-=== Stream Visitor
+=== Stream Visitor [[visitor]]
 
-A stream visitor is another implementation of a stream receiver, which visits 
every key-value pair in the stream. The visitor does not update the cache. If a 
pair needs to be stored in the cache, one of the `put(...)` methods must be 
called explicitly.
+A stream visitor is another implementation of a stream receiver, which visits 
every key-value pair in the stream.
 
 In the example below, we have 2 caches: "marketData", and "instruments". We 
receive market data ticks and put them into the streamer for the "marketData" 
cache. The stream visitor for the "marketData" streamer is invoked on the 
cluster member mapped to the particular market symbol. Upon receiving 
individual market ticks it updates the "instrument" cache with the latest 
market price.
 
@@ -160,7 +181,7 @@ tab:C++[unsupported]
 --
 
 == Configuring Data Streamer Thread Pool Size
-The data streamer thread pool is dedicated to process messages coming from the 
data streamers.
+The data streamer thread pool is dedicated to process batches coming from the 
data streamers.
 
 The default pool size is `max(8, total number of cores)`.
 Use `IgniteConfiguration.setDataStreamerThreadPoolSize(...)` to change the 
pool size.
diff --git a/docs/_docs/images/data_streaming.png 
b/docs/_docs/images/data_streaming.png
index c407447435c..27a01208992 100644
Binary files a/docs/_docs/images/data_streaming.png and 
b/docs/_docs/images/data_streaming.png differ
diff --git a/docs/_docs/snapshots/snapshots.adoc 
b/docs/_docs/snapshots/snapshots.adoc
index 7a4e71e5696..b9d21c20c73 100644
--- a/docs/_docs/snapshots/snapshots.adoc
+++ b/docs/_docs/snapshots/snapshots.adoc
@@ -321,6 +321,8 @@ The snapshot procedure has some limitations that you should 
be aware of before u
 * You can have only one snapshotting operation running at a time.
 * The snapshot operation is prohibited during a master key change and/or cache 
group key change.
 * The snapshot procedure is interrupted if a server node leaves the cluster.
+* Concurrent updates from 
link:../data-streaming.adoc#_limitations[DataStreamer] with default setting 
'allowOverwrite'
+(false) into a persistent cache can cause that cache data stored inconsistent.
 
 If any of these limitations prevent you from using Apache Ignite, then select 
alternate snapshotting implementations for
 Ignite provided by enterprise vendors.
diff --git a/docs/_docs/sql-reference/operational-commands.adoc 
b/docs/_docs/sql-reference/operational-commands.adoc
index d1f46411c37..308bcf6ca9f 100644
--- a/docs/_docs/sql-reference/operational-commands.adoc
+++ b/docs/_docs/sql-reference/operational-commands.adoc
@@ -64,7 +64,12 @@ SET STREAMING [OFF|ON];
 Using the `SET` command, you can stream data in bulk into a SQL table in your 
cluster. When streaming is enabled, the JDBC/ODBC driver will pack your 
commands in batches and send them to the server (Ignite cluster). On the server 
side, the batch is converted into a stream of cache update commands which are 
distributed asynchronously between server nodes. Performing this asynchronously 
increases peak throughput because at any given time all cluster nodes are busy 
with data loading.
 
 === Usage
-To stream data into your cluster, prepare a file with the `SET STREAMING ON` 
command followed by `INSERT` commands for data that needs to be loaded. For 
example:
+To stream data into your cluster, prepare a file with the `SET STREAMING ON` 
command followed by `INSERT` commands for data that needs to be loaded.
+
+[NOTE]
+====
+Setting 'STREAMING ON' uses 
link:../data-streaming.adoc#_limitations[DataStreamer] which doesn't guarantee 
by default data consistency until successfully finished.
+====
 
 [source,sql]
 ----
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 838b5042f45..323bcb06c81 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -42,9 +42,23 @@ import org.jetbrains.annotations.Nullable;
  * This way batches can be applied within transaction(s) on target node.
  * See {@link #receiver(StreamReceiver)} for details.
  * <p>
- * Note that streamer will stream data concurrently by multiple internal 
threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the streamer.
+ * Data streamer doesn’t guarantee:
+ * <ul>
+ *  <li>Data order. Data records may be loaded into a cache in a different 
order compared to putting into the
+ *  streamer;</li>
+ *  <li>Immediate data loading. Data can be kept for a while before 
loading;</li>
+ *  <li>By default, data consistency until successfully finished;</li>
+ *  <li>By default, working with external storages.</li>
+ * </ul>
+ * <p>
+ * If {@link #allowOverwrite()} setting is {@code false} (default), consider:
+ * <ul>
+ *  <li>You should not have the same keys repeating in the data being 
streamed;</li>
+ *  <li>Streamer cancelation or streamer node failure can cause data 
inconsistency;</li>
+ *  <li>If loading into a persistent cache, concurrently created snapshot may 
contain inconsistent data and might not
+ *  be restored entirely.</li>
+ * </ul>
+ * Most important behaviour of data streamer is defined by {@link 
StreamReceiver} and {@link #allowOverwrite()} property.
  * <p>
  * Also note that {@code IgniteDataStreamer} is not the only way to add data 
into cache.
  * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, 
Object...)}
@@ -138,7 +152,8 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
      * <p>
      * This flag is disabled by default (default is {@code false}).
      *
-     * @return {@code True} if overwriting is allowed, {@code false} 
otherwise..
+     * @return {@code True} if overwriting is allowed or if receiver is 
changed by {@link #receiver(StreamReceiver)}.
+     * {@code False} otherwise.
      */
     public boolean allowOverwrite();
 
@@ -318,6 +333,8 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
 
     /**
      * Sets custom stream receiver to this data streamer.
+     * <p>
+     * Disables {@link #allowOverwrite(boolean)} and sets {@link 
#allowOverwrite()} returning {@code true}.
      *
      * @param rcvr Stream receiver.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
index 5945805b076..c18c9afe003 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
@@ -31,6 +31,8 @@ import org.jetbrains.annotations.Nullable;
  * grantee data consistency between them.</li>
  * <li>Snapshot must be resorted manually on the switched off cluster by 
copying data
  * to the working directory on each cluster node.</li>
+ * <li>Concurrent updates from {@link IgniteDataStreamer} with default {@link 
IgniteDataStreamer#allowOverwrite()}
+ * setting (false) into a persistent cache can cause that cache data stored 
inconsistent.</li>
  * </ul>
  */
 public interface IgniteSnapshot {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
index 5bc9a6f3eff..79d0fd097e9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
@@ -44,8 +44,8 @@ public class DataStreamerCacheUpdaters {
     private static final StreamReceiver BATCHED_SORTED = new BatchedSorted();
 
     /**
-     * Updates cache using independent {@link IgniteCache#put(Object, 
Object)}and
-     * {@link IgniteCache#remove(Object)} operations. Thus it is safe from 
deadlocks but performance
+     * Updates cache using independent {@link IgniteCache#put(Object, Object)} 
and
+     * {@link IgniteCache#remove(Object)} operations. Thus, it is safe from 
deadlocks but performance
      * is not the best.
      *
      * @return Single updater.
@@ -55,8 +55,8 @@ public class DataStreamerCacheUpdaters {
     }
 
     /**
-     * Updates cache using batched methods {@link IgniteCache#putAll(Map)}and
-     * {@link IgniteCache#removeAll()}. Can cause deadlocks if the same keys 
are getting
+     * Updates cache using batched methods {@link IgniteCache#putAll(Map)} and
+     * {@link IgniteCache#removeAll()}. Can cause deadlocks with transactional 
caches if the same keys are getting
      * updated concurrently. Performance is generally better than in {@link 
#individual()}.
      *
      * @return Batched updater.
@@ -68,7 +68,8 @@ public class DataStreamerCacheUpdaters {
     /**
      * Updates cache using batched methods {@link IgniteCache#putAll(Map)} and
      * {@link IgniteCache#removeAll(Set)}. Keys are sorted in natural order 
and if all updates
-     * use the same rule deadlock can not happen. Performance is generally 
better than in {@link #individual()}.
+     * use the same rule deadlock with transactional caches can not happen. 
Performance is generally better than in
+     * {@link #individual()}.
      *
      * @return Batched sorted updater.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a20fb307094..b98adfef6ea 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -129,6 +129,12 @@ import static 
org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
  */
 @SuppressWarnings("unchecked")
 public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, 
Delayed {
+    /** */
+    public static final String WRN_INCONSISTENT_UPDATES = "The Data Streamer 
loads data with 'allowOverwrite' set " +
+        "to false. It doesn't guarantee data consistency until successfully 
finishes. Streamer cancelation or " +
+        "streamer node failure can cause data inconsistency. Concurrently 
created snapshot may contain inconsistent " +
+        "data and might not be restored entirely.";
+
     /** Per thread buffer size. */
     private int bufLdrSzPerThread = DFLT_PER_THREAD_BUFFER_SIZE;
 
@@ -286,6 +292,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     /** */
     private final AtomicBoolean remapOwning = new AtomicBoolean();
 
+    /** Flag to warn into the log only once if streamer is inconsistent until 
successfully finished. */
+    private final AtomicBoolean inconsistencyWarned = new AtomicBoolean();
+
     /**
      * @param ctx Grid kernal context.
      * @param cacheName Cache name.
@@ -642,6 +651,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
         lock(false);
 
+        if (rcvr instanceof IsolatedUpdater && 
inconsistencyWarned.compareAndSet(false, true))
+            log.warning(WRN_INCONSISTENT_UPDATES);
+
         try {
             long threadId = Thread.currentThread().getId();
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index e4d4cf9fe90..b970bf0d0cf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheServerNotFoundException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -50,7 +51,10 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.stream.StreamReceiver;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.logging.log4j.core.appender.WriterAppender;
 import org.junit.Ignore;
@@ -86,6 +90,10 @@ public class DataStreamerImplSelfTest extends 
GridCommonAbstractTest {
         super.afterTest();
 
         stopAllGrids();
+
+        // Unbinds the log listeners from single static log instance.
+        U.<AtomicReference<IgniteLogger>>field(DataStreamerImpl.class, 
"logRef").set(null);
+        GridTestUtils.setFieldValue(null, DataStreamerImpl.class, "log", null);
     }
 
     /** {@inheritDoc} */
@@ -131,6 +139,95 @@ public class DataStreamerImplSelfTest extends 
GridCommonAbstractTest {
             assertTrue(fut.isDone());
     }
 
+    /**
+     * Test inconsistency log warning of the streamer. Default receiver goes 
first and is set again after a consistent
+     * receiver. The warning must appear only once.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testInconsistencyWarningDefaultReceiverFirst() throws 
Exception {
+        doTestInconsistencyWarning(true, null, 
DataStreamerCacheUpdaters.batched(), null);
+    }
+
+    /**
+     * Test inconsistency log warning of the streamer when default receiver 
that is set after a consistent receiver. The
+     * warning must appear only once.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testInconsistencyWarningDefaultReceiverFollows() throws 
Exception {
+        doTestInconsistencyWarning(true, DataStreamerCacheUpdaters.batched(), 
null,
+            DataStreamerCacheUpdaters.individual(), null);
+    }
+
+    /**
+     * Test inconsistency log warning of the streamer. Must not appear with 
consistent receivers.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testInconsistencyWarningOtherReceivers() throws Exception {
+        doTestInconsistencyWarning(false, DataStreamerCacheUpdaters.batched(),
+            DataStreamerCacheUpdaters.individual(), 
DataStreamerCacheUpdaters.batchedSorted());
+    }
+
+    /**
+     * Test inconsistency log warning of the streamer.
+     *
+     * @param mustWarn  {@code True}, if just one warning expected. {@code 
False} if no warning expected at all.
+     * @param receivers The streamer receivers to load with the same streamer 
instance. Must not be empty. {@code Null}
+     *        for the default receiver.
+     * @throws Exception If failed.
+     */
+    private void doTestInconsistencyWarning(boolean mustWarn,
+        StreamReceiver<Integer, Integer>... receivers) throws Exception {
+        assert receivers.length > 0;
+
+        LogListener lsnr = 
LogListener.matches(DataStreamerImpl.WRN_INCONSISTENT_UPDATES)
+            .times(mustWarn ? 1 : 0).build();
+
+        startGrids(2);
+
+        IgniteConfiguration cfg = 
getConfiguration(getTestIgniteInstanceName(G.allGrids().size()));
+
+        ListeningTestLogger log = new ListeningTestLogger(cfg.getGridLogger());
+        log.registerListener(lsnr);
+        cfg.setGridLogger(log);
+
+        IgniteEx ldr = startClientGrid(cfg);
+
+        CacheConfiguration<?, ?> ccfg = defaultCacheConfiguration();
+        ldr.getOrCreateCache(ccfg);
+
+        try (IgniteDataStreamer<Integer, Integer> ds = 
ldr.dataStreamer(ccfg.getName())) {
+            for (StreamReceiver<Integer, Integer> rcvr : receivers) {
+                // Resets default receiver.
+                if (rcvr == null)
+                    ds.allowOverwrite(false);
+                else
+                    ds.receiver(rcvr);
+
+                // Put some amount of data.
+                AtomicInteger loadCnt = new AtomicInteger(KEYS_COUNT);
+
+                GridTestUtils.runMultiThreaded(() -> {
+                    int v;
+
+                    while (loadCnt.get() > 0) {
+                        v = loadCnt.getAndDecrement();
+
+                        if (v >= 0)
+                            ds.addData(v, v);
+                    }
+                }, 3, "testDsLoader");
+            }
+        }
+
+        assertTrue(lsnr.check());
+    }
+
     /**
      * @throws Exception If failed.
      */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java 
b/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
index 64616db1525..3c62e12163e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
@@ -329,6 +329,8 @@ public abstract class LogListener implements 
Consumer<String> {
         private LogMessageListener(@NotNull Function<String, Integer> func, 
@NotNull ValueRange exp) {
             this.func = func;
             this.exp = exp;
+
+            matches.set(0);
         }
 
         /** {@inheritDoc} */

Reply via email to