This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f332ddc80c IGNITE-24140 Fix data streamer failedItems (#5788)
5f332ddc80c is described below

commit 5f332ddc80c8b29ad5313a4ea17e7982de682f96
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu May 8 18:23:35 2025 +0300

    IGNITE-24140 Fix data streamer failedItems (#5788)
    
    Add under lock to ensure that a given item is either streamed or ends up in 
failedItems.
    
    Fixes regression from IGNITE-25301 (#5771)
---
 .../ignite/internal/streamer/StreamerSubscriber.java      | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index 4d228d9beb9..7139aae0529 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -168,12 +168,6 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
     /** {@inheritDoc} */
     @Override
     public void onNext(E item) {
-        synchronized (this) {
-            if (closed) {
-                throw new IllegalStateException("Streamer is closed, can't add 
items.");
-            }
-        }
-
         pendingItemCount.decrementAndGet();
 
         T key = keyFunc.apply(item);
@@ -183,7 +177,14 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
                 partition,
                 p -> new StreamerBuffer<>(options.pageSize(), items -> 
enlistBatch(p, items)));
 
-        buf.add(item);
+        synchronized (this) {
+            if (closed) {
+                throw new IllegalStateException("Streamer is closed, can't add 
items.");
+            }
+
+            buf.add(item);
+        }
+
         this.metrics.streamerItemsQueuedAdd(1);
 
         requestMore();

Reply via email to