This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bf6675a55b5 IGNITE-25361 Fix StreamerSubscriber user code invoke under
lock (#5908)
bf6675a55b5 is described below
commit bf6675a55b5ea9c58d7331206c3b276a9b174678
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed May 28 12:37:43 2025 +0300
IGNITE-25361 Fix StreamerSubscriber user code invoke under lock (#5908)
Fix `requestMore` and `close`:
* Minimize contention
* Do not call user code under lock
* Handle exceptions in user code
---
docs/_docs/developers-guide/clients/java.adoc | 2 +-
.../ignite/internal/client/ClientMetricSource.java | 2 +-
.../internal/streamer/StreamerSubscriber.java | 73 +++++++++++++++-------
.../internal/streamer/StreamerSubscriberTest.java | 12 ++--
4 files changed, 58 insertions(+), 31 deletions(-)
diff --git a/docs/_docs/developers-guide/clients/java.adoc
b/docs/_docs/developers-guide/clients/java.adoc
index 4ffb31ca7b2..9100457ddaf 100644
--- a/docs/_docs/developers-guide/clients/java.adoc
+++ b/docs/_docs/developers-guide/clients/java.adoc
@@ -123,7 +123,7 @@ After that, client metrics will be available to any Java
monitoring tool, for ex
|BytesReceived|The amount of bytes received.
|StreamerBatchesSent|The number of data streamer batches sent.
|StreamerItemsSent|The number of data streamer items sent.
-|StreamerBatchesActive|The number of existing data streamer batches.
+|StreamerBatchesActive|The number of in-flight data streamer batches.
|StreamerItemsQueued|The number of queued data streamer items.
|=======================================================================
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
index 24bde4dfd47..707f0e034aa 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
@@ -490,7 +490,7 @@ public class ClientMetricSource extends
AbstractMetricSource<ClientMetricSource.
"StreamerItemsSent", "Total number of data streamer items
sent");
private final AtomicLongMetric streamerBatchesActive = new
AtomicLongMetric(
- "StreamerBatchesActive", "Total number of existing data
streamer batches");
+ "StreamerBatchesActive", "Total number of in-flight data
streamer batches");
private final AtomicLongMetric streamerItemsQueued = new
AtomicLongMetric(
"StreamerItemsQueued", "Total number of queued data streamer
items (rows)");
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index 7139aae0529..46690323115 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -305,20 +305,31 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
return resultSubscription;
}
- private synchronized void close(@Nullable Throwable throwable) {
- if (closed) {
- return;
- }
+ private void close(@Nullable Throwable throwable) {
+ Subscription subscription0;
+ ScheduledFuture<?> flushTask0;
+
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
- closed = true;
+ closed = true;
+ subscription0 = subscription;
+ flushTask0 = flushTask;
+ }
- if (flushTask != null) {
- flushTask.cancel(false);
+ if (flushTask0 != null) {
+ flushTask0.cancel(false);
}
- var sub = subscription;
- if (sub != null) {
- sub.cancel();
+ if (subscription0 != null) {
+ try {
+ // User code: call outside of lock, handle exceptions.
+ subscription0.cancel();
+ } catch (Throwable e) {
+ log.error("Failed to cancel subscription: " + e.getMessage(),
e);
+ }
}
if (throwable == null) {
@@ -356,24 +367,37 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
}
}
- private synchronized void requestMore() {
- if (closed || subscription == null) {
- return;
- }
+ private void requestMore() {
+ int toRequest;
+ Subscription subscription0;
- // This method controls backpressure. We won't get more items than we
requested.
- // The idea is to have perPartitionParallelOperations batches in
flight for every connection.
- var pending = pendingItemCount.get();
- var desiredInFlight = Math.max(1, buffers.size()) * options.pageSize()
* options.perPartitionParallelOperations();
- var inFlight = inFlightItemCount.get();
- var count = desiredInFlight - inFlight - pending;
+ synchronized (this) {
+ if (closed || subscription == null) {
+ return;
+ }
- if (count <= 0) {
- return;
+ // This method controls backpressure. We won't get more items than
we requested.
+ // The idea is to have perPartitionParallelOperations batches in
flight for every connection.
+ var pending = pendingItemCount.get();
+ var desiredInFlight = Math.max(1, buffers.size()) *
options.pageSize() * options.perPartitionParallelOperations();
+ var inFlight = inFlightItemCount.get();
+ toRequest = desiredInFlight - inFlight - pending;
+
+ if (toRequest <= 0) {
+ return;
+ }
+
+ pendingItemCount.addAndGet(toRequest);
+ subscription0 = subscription;
}
- subscription.request(count);
- pendingItemCount.addAndGet(count);
+ try {
+ // User code: call outside of lock, handle exceptions.
+ subscription0.request(toRequest);
+ } catch (Throwable e) {
+ log.error("Failed to request more items: " + e.getMessage(), e);
+ close(e);
+ }
}
private synchronized void initFlushTimer() {
@@ -391,6 +415,7 @@ public class StreamerSubscriber<T, E, V, R, P> implements
Subscriber<E> {
}
private void flushBuffers() {
+ // TODO IGNITE-25509 Data Streamer ignores backpressure in flush timer.
buffers.values().forEach(StreamerBuffer::flush);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
index 7208a784591..ab990684f79 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
@@ -158,9 +158,11 @@ class StreamerSubscriberTest extends
BaseIgniteAbstractTest {
var metrics = new Metrics();
- var options = new Options(2, 1, 1000);
+ var options = new Options(2, 3, 1000);
- long expectedBatches = itemsCount / options.batchSize;
+ long expectedActiveBatches =
Math.min(options.perPartitionParallelOperations, itemsCount /
options.batchSize);
+
+ long expectedItemsQueued = Math.min(itemsCount, expectedActiveBatches
* options.batchSize);
var partitionProvider = new StreamerPartitionAwarenessProvider<Long,
String>() {
@Override
@@ -193,9 +195,9 @@ class StreamerSubscriberTest extends BaseIgniteAbstractTest
{
publisher.subscribe(subscriber);
- assertThat(metrics.batchesActive.longValue(), is(expectedBatches));
+ assertThat(metrics.batchesActive.longValue(),
is(expectedActiveBatches));
assertThat(metrics.batchesSent.longValue(), is(0L));
- assertThat(metrics.itemsQueued.longValue(), is(itemsCount));
+ assertThat(metrics.itemsQueued.longValue(), is(expectedItemsQueued));
assertThat(metrics.itemsSent.longValue(), is(0L));
sendFuture.complete(null);
@@ -203,7 +205,7 @@ class StreamerSubscriberTest extends BaseIgniteAbstractTest
{
assertThat(subscriber.completionFuture(), willCompleteSuccessfully());
assertThat(metrics.batchesActive.longValue(), is(0L));
- assertThat(metrics.batchesSent.longValue(), is(expectedBatches));
+ assertThat(metrics.batchesSent.longValue(), is(itemsCount /
options.batchSize));
assertThat(metrics.itemsQueued.longValue(), is(0L));
assertThat(metrics.itemsSent.longValue(), is(itemsCount));
}