This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-23299 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit eebb0b2abc5fda7a28151419fb00d69750314363 Author: amashenkov <[email protected]> AuthorDate: Fri Sep 27 11:16:40 2024 +0300 Fix inflights tracking for RO transactions on cursor close when transaction is rolling back. --- .../internal/table/distributed/storage/PartitionScanPublisher.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java index fbe90bc9b2..6196098a45 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java @@ -179,12 +179,11 @@ public abstract class PartitionScanPublisher<T> implements Publisher<T> { inflightBatchRequestTracker.onRequestBegin(); - retrieveBatch(scanId, n).thenAccept(binaryRows -> { + CompletableFuture<Collection<T>> retriveBatchFuture = retrieveBatch(scanId, n); + retriveBatchFuture.whenComplete((batch, err) -> inflightBatchRequestTracker.onRequestEnd()).thenAccept(binaryRows -> { assert binaryRows != null; assert binaryRows.size() <= n : "Rows more then requested " + binaryRows.size() + " " + n; - inflightBatchRequestTracker.onRequestEnd(); - binaryRows.forEach(subscriber::onNext); if (binaryRows.size() < n) { @@ -197,8 +196,6 @@ public abstract class PartitionScanPublisher<T> implements Publisher<T> { } } }).exceptionally(t -> { - inflightBatchRequestTracker.onRequestEnd(); - cancel(t, false); return null;
