This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bf6675a55b5 IGNITE-25361 Fix StreamerSubscriber user code invoke under 
lock (#5908)
bf6675a55b5 is described below

commit bf6675a55b5ea9c58d7331206c3b276a9b174678
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed May 28 12:37:43 2025 +0300

    IGNITE-25361 Fix StreamerSubscriber user code invoke under lock (#5908)
    
    Fix `requestMore` and `close`:
    * Minimize contention
    * Do not call user code under lock
    * Handle exceptions in user code
---
 docs/_docs/developers-guide/clients/java.adoc      |  2 +-
 .../ignite/internal/client/ClientMetricSource.java |  2 +-
 .../internal/streamer/StreamerSubscriber.java      | 73 +++++++++++++++-------
 .../internal/streamer/StreamerSubscriberTest.java  | 12 ++--
 4 files changed, 58 insertions(+), 31 deletions(-)

diff --git a/docs/_docs/developers-guide/clients/java.adoc 
b/docs/_docs/developers-guide/clients/java.adoc
index 4ffb31ca7b2..9100457ddaf 100644
--- a/docs/_docs/developers-guide/clients/java.adoc
+++ b/docs/_docs/developers-guide/clients/java.adoc
@@ -123,7 +123,7 @@ After that, client metrics will be available to any Java 
monitoring tool, for ex
 |BytesReceived|The amount of bytes received.
 |StreamerBatchesSent|The number of data streamer batches sent.
 |StreamerItemsSent|The number of data streamer items sent.
-|StreamerBatchesActive|The number of existing data streamer batches.
+|StreamerBatchesActive|The number of in-flight data streamer batches.
 |StreamerItemsQueued|The number of queued data streamer items.
 
 |=======================================================================
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
index 24bde4dfd47..707f0e034aa 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
@@ -490,7 +490,7 @@ public class ClientMetricSource extends 
AbstractMetricSource<ClientMetricSource.
                 "StreamerItemsSent", "Total number of data streamer items 
sent");
 
         private final AtomicLongMetric streamerBatchesActive = new 
AtomicLongMetric(
-                "StreamerBatchesActive", "Total number of existing data 
streamer batches");
+                "StreamerBatchesActive", "Total number of in-flight data 
streamer batches");
 
         private final AtomicLongMetric streamerItemsQueued = new 
AtomicLongMetric(
                 "StreamerItemsQueued", "Total number of queued data streamer 
items (rows)");
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index 7139aae0529..46690323115 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -305,20 +305,31 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
         return resultSubscription;
     }
 
-    private synchronized void close(@Nullable Throwable throwable) {
-        if (closed) {
-            return;
-        }
+    private void close(@Nullable Throwable throwable) {
+        Subscription subscription0;
+        ScheduledFuture<?> flushTask0;
+
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
 
-        closed = true;
+            closed = true;
+            subscription0 = subscription;
+            flushTask0 = flushTask;
+        }
 
-        if (flushTask != null) {
-            flushTask.cancel(false);
+        if (flushTask0 != null) {
+            flushTask0.cancel(false);
         }
 
-        var sub = subscription;
-        if (sub != null) {
-            sub.cancel();
+        if (subscription0 != null) {
+            try {
+                // User code: call outside of lock, handle exceptions.
+                subscription0.cancel();
+            } catch (Throwable e) {
+                log.error("Failed to cancel subscription: " + e.getMessage(), 
e);
+            }
         }
 
         if (throwable == null) {
@@ -356,24 +367,37 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
         }
     }
 
-    private synchronized void requestMore() {
-        if (closed || subscription == null) {
-            return;
-        }
+    private void requestMore() {
+        int toRequest;
+        Subscription subscription0;
 
-        // This method controls backpressure. We won't get more items than we 
requested.
-        // The idea is to have perPartitionParallelOperations batches in 
flight for every connection.
-        var pending = pendingItemCount.get();
-        var desiredInFlight = Math.max(1, buffers.size()) * options.pageSize() 
* options.perPartitionParallelOperations();
-        var inFlight = inFlightItemCount.get();
-        var count = desiredInFlight - inFlight - pending;
+        synchronized (this) {
+            if (closed || subscription == null) {
+                return;
+            }
 
-        if (count <= 0) {
-            return;
+            // This method controls backpressure. We won't get more items than 
we requested.
+            // The idea is to have perPartitionParallelOperations batches in 
flight for every connection.
+            var pending = pendingItemCount.get();
+            var desiredInFlight = Math.max(1, buffers.size()) * 
options.pageSize() * options.perPartitionParallelOperations();
+            var inFlight = inFlightItemCount.get();
+            toRequest = desiredInFlight - inFlight - pending;
+
+            if (toRequest <= 0) {
+                return;
+            }
+
+            pendingItemCount.addAndGet(toRequest);
+            subscription0 = subscription;
         }
 
-        subscription.request(count);
-        pendingItemCount.addAndGet(count);
+        try {
+            // User code: call outside of lock, handle exceptions.
+            subscription0.request(toRequest);
+        } catch (Throwable e) {
+            log.error("Failed to request more items: " + e.getMessage(), e);
+            close(e);
+        }
     }
 
     private synchronized void initFlushTimer() {
@@ -391,6 +415,7 @@ public class StreamerSubscriber<T, E, V, R, P> implements 
Subscriber<E> {
     }
 
     private void flushBuffers() {
+        // TODO IGNITE-25509 Data Streamer ignores backpressure in flush timer.
         buffers.values().forEach(StreamerBuffer::flush);
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
index 7208a784591..ab990684f79 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
@@ -158,9 +158,11 @@ class StreamerSubscriberTest extends 
BaseIgniteAbstractTest {
 
         var metrics = new Metrics();
 
-        var options = new Options(2, 1, 1000);
+        var options = new Options(2, 3, 1000);
 
-        long expectedBatches = itemsCount / options.batchSize;
+        long expectedActiveBatches = 
Math.min(options.perPartitionParallelOperations, itemsCount / 
options.batchSize);
+
+        long expectedItemsQueued = Math.min(itemsCount, expectedActiveBatches 
* options.batchSize);
 
         var partitionProvider = new StreamerPartitionAwarenessProvider<Long, 
String>() {
             @Override
@@ -193,9 +195,9 @@ class StreamerSubscriberTest extends BaseIgniteAbstractTest 
{
 
         publisher.subscribe(subscriber);
 
-        assertThat(metrics.batchesActive.longValue(), is(expectedBatches));
+        assertThat(metrics.batchesActive.longValue(), 
is(expectedActiveBatches));
         assertThat(metrics.batchesSent.longValue(), is(0L));
-        assertThat(metrics.itemsQueued.longValue(), is(itemsCount));
+        assertThat(metrics.itemsQueued.longValue(), is(expectedItemsQueued));
         assertThat(metrics.itemsSent.longValue(), is(0L));
 
         sendFuture.complete(null);
@@ -203,7 +205,7 @@ class StreamerSubscriberTest extends BaseIgniteAbstractTest 
{
         assertThat(subscriber.completionFuture(), willCompleteSuccessfully());
 
         assertThat(metrics.batchesActive.longValue(), is(0L));
-        assertThat(metrics.batchesSent.longValue(), is(expectedBatches));
+        assertThat(metrics.batchesSent.longValue(), is(itemsCount / 
options.batchSize));
         assertThat(metrics.itemsQueued.longValue(), is(0L));
         assertThat(metrics.itemsSent.longValue(), is(itemsCount));
     }

Reply via email to