This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a4e662b5f IGNITE-23666 Fix data streamer failed items (#4722)
1a4e662b5f is described below

commit 1a4e662b5f06930404406a3d460f3ddf60a6f2a6
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Nov 14 12:47:19 2024 +0200

    IGNITE-23666 Fix data streamer failed items (#4722)
---
 .../src/test/java/org/apache/ignite/client/DataStreamerTest.java      | 2 +-
 .../main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java | 4 ++++
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
index bca9099035..9b6849de7b 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
@@ -636,7 +636,7 @@ public class DataStreamerTest extends 
AbstractClientTableTest {
         assertInstanceOf(CompletionException.class, subscriberErr.getCause());
         assertInstanceOf(ArithmeticException.class, 
subscriberErr.getCause().getCause());
 
-        assertEquals(3, ((DataStreamerException) 
subscriberErr).failedItems().size());
+        assertEquals(0, ((DataStreamerException) 
subscriberErr).failedItems().size());
     }
 
     @Test
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
index 20d9cb1cd7..5d88f88a0b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
@@ -77,6 +77,10 @@ class StreamerBuffer<T> {
     }
 
     synchronized void forEach(Consumer<T> consumer) {
+        if (closed) {
+            return;
+        }
+
         buf.forEach(consumer);
     }
 }

Reply via email to