This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1876f0bd20f IGNITE-17435 Updated data streamer documentation in case
of a snapshot process. (#10273)
1876f0bd20f is described below
commit 1876f0bd20f9bf851ed505d029ad349a4b98fe27
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Nov 7 12:06:24 2022 +0300
IGNITE-17435 Updated data streamer documentation in case of a snapshot
process. (#10273)
---
docs/_docs/data-streaming.adoc | 95 ++++++++++++--------
docs/_docs/images/data_streaming.png | Bin 159011 -> 106187 bytes
docs/_docs/snapshots/snapshots.adoc | 2 +
docs/_docs/sql-reference/operational-commands.adoc | 7 +-
.../java/org/apache/ignite/IgniteDataStreamer.java | 25 +++++-
.../java/org/apache/ignite/IgniteSnapshot.java | 2 +
.../datastreamer/DataStreamerCacheUpdaters.java | 11 +--
.../processors/datastreamer/DataStreamerImpl.java | 12 +++
.../datastreamer/DataStreamerImplSelfTest.java | 97 +++++++++++++++++++++
.../apache/ignite/testframework/LogListener.java | 2 +
10 files changed, 206 insertions(+), 47 deletions(-)
diff --git a/docs/_docs/data-streaming.adoc b/docs/_docs/data-streaming.adoc
index 8736ec50e21..3b0ceaa1c86 100644
--- a/docs/_docs/data-streaming.adoc
+++ b/docs/_docs/data-streaming.adoc
@@ -18,23 +18,19 @@
== Overview
-Ignite provides a Data Streaming API that can be used to inject large amounts
of continuous streams of data into an Ignite cluster.
-The Data Streaming API is designed to be scalable and fault-tolerant, and
provides _at-least-once_ delivery semantics for the data streamed into Ignite,
meaning each entry is processed at least once.
-
-Data is streamed into a cache via a <<Data Streamers, data streamer>>
associated with the cache. Data streamers automatically buffer the data and
group it into batches for better performance and send it in parallel to
multiple nodes.
-
-The Data Streaming API provides the following features:
-
-* The data that is added to a data streamer is automatically partitioned and
distributed between the nodes.
-* You can process the data concurrently in a colocated fashion.
-* Clients can perform concurrent SQL queries on the data as it is being
streamed in.
+Ignite provides a Data Streaming API that can be used to inject large amounts
of data into an Ignite cluster.
+The main goal of streaming is efficient, quick data loading. The data that is
added to the streamer is
+automatically organized and distributed between the nodes in partition-aware
and parallel manner.
image:images/data_streaming.png[Data Streaming]
-== Data Streamers
+The Data Streaming API is designed to be scalable and provides _at-least-once_
delivery semantics
+for the data streamed into Ignite, meaning each entry is processed at least
once.
+
+== Usage
A data streamer is associated with a specific cache and provides an interface
for streaming data into the cache.
-In a typical scenario, you obtain a data streamer and use one of its methods
to stream data into the cache, and Ignite takes care of data partitioning and
colocation by batching data entries according to partitioning rules to avoid
unnecessary data movement.
+In a typical scenario, you obtain a data streamer and use one of its methods
to stream data into the cache, and Ignite takes care of the rest.
You can obtain the data streamer for a specific cache as follows:
[tabs]
@@ -57,62 +53,87 @@
include::code-snippets/dotnet/DataStreaming.cs[tag=dataStreamer1,indent=0]
tab:C++[unsupported]
--
-== Overwriting Existing Keys
+The streamer can accept data to load by multiple threads.
+
+A good use of streaming is data preloading.
+
+== Limitations
+DataStreamer doesn't guarantee:
+[]
+- By default, data consistency until successfully finished;
+- Immediate data loading. Data can be kept for a while before loading;
+- Data order. Data records may be loaded into a cache in a different order
compared to load into the streamer;
+- By default, working with external storages.
-By default, data streamers do not overwrite existing data and skip entries
that are already in the cache. You can change that behavior by setting the
`allowOverwrite` property of the data streamer to `true`.
+If <<overwritting, 'allowOverwrite'>> property is 'false' (default), consider:
+[]
+- You should not have the same keys repeating in the data being streamed;
+- Streamer cancelation or streamer node failure can cause data inconsistency;
+- If loading into a persistent cache, concurrently created snapshot may
contain inconsistent data and might not be restored entirely.
+
+Most important behavior of Ignite Data Streamer is defined by <<receivers,
stream receiver>> and
+<<overwritting, 'allowOverwite' setting.>>
+
+== Streamer receivers. [[receivers]]
+
+Ignite DataStreamer is an orchestrator and doesn't write data itself.
link:{javadoc_base_url}/org/apache/ignite/stream/StreameReceiver.html[StreamerReceiver]
does. The default receiver is designed for fastest load and fewer network
requests. With this receiver, streamer focuses on parallel transfer of backup
and primary records.
+
+You can set your own receiver. See <<transfomer>> and <<visitor>>. The logic
implemented in a stream receiver is executed on the node where data is to be
stored.
[tabs]
--
tab:Java[]
[source,java]
----
-include::{javaFile}[tag=dataStreamer2,indent=0]
+include::{javaFile}[tag=streamReceiver,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
-include::code-snippets/dotnet/DataStreaming.cs[tag=dataStreamer2,indent=0]
+include::code-snippets/dotnet/DataStreaming.cs[tag=streamReceiver,indent=0]
----
tab:C++[unsupported]
--
-NOTE: When `allowOverwrite` is set to `false` (default), the updates are not
propagated to the link:persistence/external-storage[external storage] (if it is
used).
+[IMPORTANT]
+====
+The class definitions of the stream receivers to be executed on remote nodes
must be available on the nodes. This can be achieved in two ways:
-== Processing Data
-In cases when you need to execute custom logic before adding new data, you can
use a stream receiver.
-A stream receiver is used to process the data in a colocated manner before it
is stored into the cache.
-The logic implemented in a stream receiver is executed on the node where data
is to be stored.
+* Add the classes to the classpath of the nodes;
+* Enable link:code-deployment/peer-class-loading[peer class loading].
+====
+
+Changing receiver to non-default changes data distribution algorithm. With
non-default receiver streamer sends data batches only to primary node receiver.
And primary node needs another requests to send backup writes.
+
+NOTE: A stream receiver does not put data into cache automatically. You need
to call one of the `put` methods explicitly.
+
+
+== Overwritting data. [[overwritting]]
+
+By default, existing keys aren't overwritten. You can change that behavior by
setting the `allowOverwrite` property of the data streamer to `true`. Since the
default receiver does not overwrite data, other provided one is automatically
chosen. Any non-default receiver is considered as not-overwriting. And
`allowOverwrite` property says `true`. However, your own receiver may use
`putIfAbsent` for instance.
+
+NOTE: When `allowOverwrite` is set to `false` (default), the updates are not
propagated to the link:persistence/external-storage[external storage] (if it is
used).
[tabs]
--
tab:Java[]
[source,java]
----
-include::{javaFile}[tag=streamReceiver,indent=0]
+include::{javaFile}[tag=dataStreamer2,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
-include::code-snippets/dotnet/DataStreaming.cs[tag=streamReceiver,indent=0]
+include::code-snippets/dotnet/DataStreaming.cs[tag=dataStreamer2,indent=0]
----
tab:C++[unsupported]
--
-NOTE: Note that a stream receiver does not put data into the cache
automatically. You need to call one of the `put(...)` methods explicitly.
-
-[IMPORTANT]
-====
-The class definitions of the stream receivers to be executed on remote nodes
must be available on the nodes. This can be achieved in two ways:
-
-* Add the classes to the classpath of the nodes;
-* Enable link:code-deployment/peer-class-loading[peer class loading].
-====
-
-=== Stream Transformer
+=== Stream Transformer [[transfomer]]
A stream transformer is a convenient implementation of a stream receiver, that
updates the data in the stream.
Stream transformers take advantage of the colocation feature and update the
data on the node where it is going to be stored.
@@ -135,9 +156,9 @@
include::code-snippets/dotnet/DataStreaming.cs[tag=streamTransformer,indent=0]
tab:C++[unsupported]
--
-=== Stream Visitor
+=== Stream Visitor [[visitor]]
-A stream visitor is another implementation of a stream receiver, which visits
every key-value pair in the stream. The visitor does not update the cache. If a
pair needs to be stored in the cache, one of the `put(...)` methods must be
called explicitly.
+A stream visitor is another implementation of a stream receiver, which visits
every key-value pair in the stream.
In the example below, we have 2 caches: "marketData", and "instruments". We
receive market data ticks and put them into the streamer for the "marketData"
cache. The stream visitor for the "marketData" streamer is invoked on the
cluster member mapped to the particular market symbol. Upon receiving
individual market ticks it updates the "instrument" cache with the latest
market price.
@@ -160,7 +181,7 @@ tab:C++[unsupported]
--
== Configuring Data Streamer Thread Pool Size
-The data streamer thread pool is dedicated to process messages coming from the
data streamers.
+The data streamer thread pool is dedicated to process batches coming from the
data streamers.
The default pool size is `max(8, total number of cores)`.
Use `IgniteConfiguration.setDataStreamerThreadPoolSize(...)` to change the
pool size.
diff --git a/docs/_docs/images/data_streaming.png
b/docs/_docs/images/data_streaming.png
index c407447435c..27a01208992 100644
Binary files a/docs/_docs/images/data_streaming.png and
b/docs/_docs/images/data_streaming.png differ
diff --git a/docs/_docs/snapshots/snapshots.adoc
b/docs/_docs/snapshots/snapshots.adoc
index 7a4e71e5696..b9d21c20c73 100644
--- a/docs/_docs/snapshots/snapshots.adoc
+++ b/docs/_docs/snapshots/snapshots.adoc
@@ -321,6 +321,8 @@ The snapshot procedure has some limitations that you should
be aware of before u
* You can have only one snapshotting operation running at a time.
* The snapshot operation is prohibited during a master key change and/or cache
group key change.
* The snapshot procedure is interrupted if a server node leaves the cluster.
+* Concurrent updates from
link:../data-streaming.adoc#_limitations[DataStreamer] with default setting
'allowOverwrite'
+(false) into a persistent cache can cause that cache data stored inconsistent.
If any of these limitations prevent you from using Apache Ignite, then select
alternate snapshotting implementations for
Ignite provided by enterprise vendors.
diff --git a/docs/_docs/sql-reference/operational-commands.adoc
b/docs/_docs/sql-reference/operational-commands.adoc
index d1f46411c37..308bcf6ca9f 100644
--- a/docs/_docs/sql-reference/operational-commands.adoc
+++ b/docs/_docs/sql-reference/operational-commands.adoc
@@ -64,7 +64,12 @@ SET STREAMING [OFF|ON];
Using the `SET` command, you can stream data in bulk into a SQL table in your
cluster. When streaming is enabled, the JDBC/ODBC driver will pack your
commands in batches and send them to the server (Ignite cluster). On the server
side, the batch is converted into a stream of cache update commands which are
distributed asynchronously between server nodes. Performing this asynchronously
increases peak throughput because at any given time all cluster nodes are busy
with data loading.
=== Usage
-To stream data into your cluster, prepare a file with the `SET STREAMING ON`
command followed by `INSERT` commands for data that needs to be loaded. For
example:
+To stream data into your cluster, prepare a file with the `SET STREAMING ON`
command followed by `INSERT` commands for data that needs to be loaded.
+
+[NOTE]
+====
+Setting 'STREAMING ON' uses
link:../data-streaming.adoc#_limitations[DataStreamer] which doesn't guarantee
by default data consistency until successfully finished.
+====
[source,sql]
----
diff --git
a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 838b5042f45..323bcb06c81 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -42,9 +42,23 @@ import org.jetbrains.annotations.Nullable;
* This way batches can be applied within transaction(s) on target node.
* See {@link #receiver(StreamReceiver)} for details.
* <p>
- * Note that streamer will stream data concurrently by multiple internal
threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the streamer.
+ * Data streamer doesn’t guarantee:
+ * <ul>
+ * <li>Data order. Data records may be loaded into a cache in a different
order compared to putting into the
+ * streamer;</li>
+ * <li>Immediate data loading. Data can be kept for a while before
loading;</li>
+ * <li>By default, data consistency until successfully finished;</li>
+ * <li>By default, working with external storages.</li>
+ * </ul>
+ * <p>
+ * If {@link #allowOverwrite()} setting is {@code false} (default), consider:
+ * <ul>
+ * <li>You should not have the same keys repeating in the data being
streamed;</li>
+ * <li>Streamer cancelation or streamer node failure can cause data
inconsistency;</li>
+ * <li>If loading into a persistent cache, concurrently created snapshot may
contain inconsistent data and might not
+ * be restored entirely.</li>
+ * </ul>
+ * Most important behaviour of data streamer is defined by {@link
StreamReceiver} and {@link #allowOverwrite()} property.
* <p>
* Also note that {@code IgniteDataStreamer} is not the only way to add data
into cache.
* Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate,
Object...)}
@@ -138,7 +152,8 @@ public interface IgniteDataStreamer<K, V> extends
AutoCloseable {
* <p>
* This flag is disabled by default (default is {@code false}).
*
- * @return {@code True} if overwriting is allowed, {@code false}
otherwise..
+ * @return {@code True} if overwriting is allowed or if receiver is
changed by {@link #receiver(StreamReceiver)}.
+ * {@code False} otherwise.
*/
public boolean allowOverwrite();
@@ -318,6 +333,8 @@ public interface IgniteDataStreamer<K, V> extends
AutoCloseable {
/**
* Sets custom stream receiver to this data streamer.
+ * <p>
+ * Disables {@link #allowOverwrite(boolean)} and sets {@link
#allowOverwrite()} returning {@code true}.
*
* @param rcvr Stream receiver.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
index 5945805b076..c18c9afe003 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
@@ -31,6 +31,8 @@ import org.jetbrains.annotations.Nullable;
* grantee data consistency between them.</li>
* <li>Snapshot must be resorted manually on the switched off cluster by
copying data
* to the working directory on each cluster node.</li>
+ * <li>Concurrent updates from {@link IgniteDataStreamer} with default {@link
IgniteDataStreamer#allowOverwrite()}
+ * setting (false) into a persistent cache can cause that cache data stored
inconsistent.</li>
* </ul>
*/
public interface IgniteSnapshot {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
index 5bc9a6f3eff..79d0fd097e9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
@@ -44,8 +44,8 @@ public class DataStreamerCacheUpdaters {
private static final StreamReceiver BATCHED_SORTED = new BatchedSorted();
/**
- * Updates cache using independent {@link IgniteCache#put(Object,
Object)}and
- * {@link IgniteCache#remove(Object)} operations. Thus it is safe from
deadlocks but performance
+ * Updates cache using independent {@link IgniteCache#put(Object, Object)}
and
+ * {@link IgniteCache#remove(Object)} operations. Thus, it is safe from
deadlocks but performance
* is not the best.
*
* @return Single updater.
@@ -55,8 +55,8 @@ public class DataStreamerCacheUpdaters {
}
/**
- * Updates cache using batched methods {@link IgniteCache#putAll(Map)}and
- * {@link IgniteCache#removeAll()}. Can cause deadlocks if the same keys
are getting
+ * Updates cache using batched methods {@link IgniteCache#putAll(Map)} and
+ * {@link IgniteCache#removeAll()}. Can cause deadlocks with transactional
caches if the same keys are getting
* updated concurrently. Performance is generally better than in {@link
#individual()}.
*
* @return Batched updater.
@@ -68,7 +68,8 @@ public class DataStreamerCacheUpdaters {
/**
* Updates cache using batched methods {@link IgniteCache#putAll(Map)} and
* {@link IgniteCache#removeAll(Set)}. Keys are sorted in natural order
and if all updates
- * use the same rule deadlock can not happen. Performance is generally
better than in {@link #individual()}.
+ * use the same rule deadlock with transactional caches can not happen.
Performance is generally better than in
+ * {@link #individual()}.
*
* @return Batched sorted updater.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a20fb307094..b98adfef6ea 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -129,6 +129,12 @@ import static
org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
*/
@SuppressWarnings("unchecked")
public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>,
Delayed {
+ /** */
+ public static final String WRN_INCONSISTENT_UPDATES = "The Data Streamer
loads data with 'allowOverwrite' set " +
+ "to false. It doesn't guarantee data consistency until successfully
finishes. Streamer cancelation or " +
+ "streamer node failure can cause data inconsistency. Concurrently
created snapshot may contain inconsistent " +
+ "data and might not be restored entirely.";
+
/** Per thread buffer size. */
private int bufLdrSzPerThread = DFLT_PER_THREAD_BUFFER_SIZE;
@@ -286,6 +292,9 @@ public class DataStreamerImpl<K, V> implements
IgniteDataStreamer<K, V>, Delayed
/** */
private final AtomicBoolean remapOwning = new AtomicBoolean();
+ /** Flag to warn into the log only once if streamer is inconsistent until
successfully finished. */
+ private final AtomicBoolean inconsistencyWarned = new AtomicBoolean();
+
/**
* @param ctx Grid kernal context.
* @param cacheName Cache name.
@@ -642,6 +651,9 @@ public class DataStreamerImpl<K, V> implements
IgniteDataStreamer<K, V>, Delayed
lock(false);
+ if (rcvr instanceof IsolatedUpdater &&
inconsistencyWarned.compareAndSet(false, true))
+ log.warning(WRN_INCONSISTENT_UPDATES);
+
try {
long threadId = Thread.currentThread().getId();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index e4d4cf9fe90..b970bf0d0cf 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -50,7 +51,10 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.logging.log4j.core.appender.WriterAppender;
import org.junit.Ignore;
@@ -86,6 +90,10 @@ public class DataStreamerImplSelfTest extends
GridCommonAbstractTest {
super.afterTest();
stopAllGrids();
+
+ // Unbinds the log listeners from single static log instance.
+ U.<AtomicReference<IgniteLogger>>field(DataStreamerImpl.class,
"logRef").set(null);
+ GridTestUtils.setFieldValue(null, DataStreamerImpl.class, "log", null);
}
/** {@inheritDoc} */
@@ -131,6 +139,95 @@ public class DataStreamerImplSelfTest extends
GridCommonAbstractTest {
assertTrue(fut.isDone());
}
+ /**
+ * Test inconsistency log warning of the streamer. Default receiver goes
first and is set again after a consistent
+ * receiver. The warning must appear only once.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testInconsistencyWarningDefaultReceiverFirst() throws
Exception {
+ doTestInconsistencyWarning(true, null,
DataStreamerCacheUpdaters.batched(), null);
+ }
+
+ /**
+ * Test inconsistency log warning of the streamer when default receiver
that is set after a consistent receiver. The
+ * warning must appear only once.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testInconsistencyWarningDefaultReceiverFollows() throws
Exception {
+ doTestInconsistencyWarning(true, DataStreamerCacheUpdaters.batched(),
null,
+ DataStreamerCacheUpdaters.individual(), null);
+ }
+
+ /**
+ * Test inconsistency log warning of the streamer. Must not appear with
consistent receivers.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testInconsistencyWarningOtherReceivers() throws Exception {
+ doTestInconsistencyWarning(false, DataStreamerCacheUpdaters.batched(),
+ DataStreamerCacheUpdaters.individual(),
DataStreamerCacheUpdaters.batchedSorted());
+ }
+
+ /**
+ * Test inconsistency log warning of the streamer.
+ *
+ * @param mustWarn {@code True}, if just one warning expected. {@code
False} if no warning expected at all.
+ * @param receivers The streamer receivers to load with the same streamer
instance. Must not be empty. {@code Null}
+ * for the default receiver.
+ * @throws Exception If failed.
+ */
+ private void doTestInconsistencyWarning(boolean mustWarn,
+ StreamReceiver<Integer, Integer>... receivers) throws Exception {
+ assert receivers.length > 0;
+
+ LogListener lsnr =
LogListener.matches(DataStreamerImpl.WRN_INCONSISTENT_UPDATES)
+ .times(mustWarn ? 1 : 0).build();
+
+ startGrids(2);
+
+ IgniteConfiguration cfg =
getConfiguration(getTestIgniteInstanceName(G.allGrids().size()));
+
+ ListeningTestLogger log = new ListeningTestLogger(cfg.getGridLogger());
+ log.registerListener(lsnr);
+ cfg.setGridLogger(log);
+
+ IgniteEx ldr = startClientGrid(cfg);
+
+ CacheConfiguration<?, ?> ccfg = defaultCacheConfiguration();
+ ldr.getOrCreateCache(ccfg);
+
+ try (IgniteDataStreamer<Integer, Integer> ds =
ldr.dataStreamer(ccfg.getName())) {
+ for (StreamReceiver<Integer, Integer> rcvr : receivers) {
+ // Resets default receiver.
+ if (rcvr == null)
+ ds.allowOverwrite(false);
+ else
+ ds.receiver(rcvr);
+
+ // Put some amount of data.
+ AtomicInteger loadCnt = new AtomicInteger(KEYS_COUNT);
+
+ GridTestUtils.runMultiThreaded(() -> {
+ int v;
+
+ while (loadCnt.get() > 0) {
+ v = loadCnt.getAndDecrement();
+
+ if (v >= 0)
+ ds.addData(v, v);
+ }
+ }, 3, "testDsLoader");
+ }
+ }
+
+ assertTrue(lsnr.check());
+ }
+
/**
* @throws Exception If failed.
*/
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
b/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
index 64616db1525..3c62e12163e 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
@@ -329,6 +329,8 @@ public abstract class LogListener implements
Consumer<String> {
private LogMessageListener(@NotNull Function<String, Integer> func,
@NotNull ValueRange exp) {
this.func = func;
this.exp = exp;
+
+ matches.set(0);
}
/** {@inheritDoc} */