This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5f332ddc80c IGNITE-24140 Fix data streamer failedItems (#5788)
5f332ddc80c is described below
commit 5f332ddc80c8b29ad5313a4ea17e7982de682f96
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu May 8 18:23:35 2025 +0300
IGNITE-24140 Fix data streamer failedItems (#5788)
Add under lock to ensure that a given item is either streamed or ends up in
failedItems.
Fixes regression from IGNITE-25301 (#5771)
---
.../ignite/internal/streamer/StreamerSubscriber.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index 4d228d9beb9..7139aae0529 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -168,12 +168,6 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
/** {@inheritDoc} */
@Override
public void onNext(E item) {
- synchronized (this) {
- if (closed) {
- throw new IllegalStateException("Streamer is closed, can't add
items.");
- }
- }
-
pendingItemCount.decrementAndGet();
T key = keyFunc.apply(item);
@@ -183,7 +177,14 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
partition,
p -> new StreamerBuffer<>(options.pageSize(), items ->
enlistBatch(p, items)));
- buf.add(item);
+ synchronized (this) {
+ if (closed) {
+ throw new IllegalStateException("Streamer is closed, can't add
items.");
+ }
+
+ buf.add(item);
+ }
+
this.metrics.streamerItemsQueuedAdd(1);
requestMore();