This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 423d0e9a337 IGNITE-24988 Sql. Improve error handling in 
PartitionScanPublisher (#5669)
423d0e9a337 is described below

commit 423d0e9a33797aa53b13da2c695f6d1be2465d3e
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Wed Apr 23 18:38:26 2025 +0300

    IGNITE-24988 Sql. Improve error handling in PartitionScanPublisher (#5669)
---
 .../distributed/storage/PartitionScanPublisher.java | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java
index dd6f8ea6759..5a6dd8534ab 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java
@@ -150,7 +150,12 @@ public abstract class PartitionScanPublisher<T> implements 
Publisher<T> {
                 }
 
                 if (shouldRetrieveBatch) {
-                    serializationFuture = serializationFuture.thenCompose(v -> 
retrieveAndProcessBatch());
+                    serializationFuture = serializationFuture.thenCompose(v -> 
retrieveAndProcessBatch())
+                            .whenComplete((v, err) -> {
+                                if (err != null) {
+                                    completeSubscription(err);
+                                }
+                            });
                 }
             }
         }
@@ -224,12 +229,7 @@ public abstract class PartitionScanPublisher<T> implements 
Publisher<T> {
 
             return retrieveBatch(scanId, batchSize)
                     .whenComplete((batch, err) -> 
inflightBatchRequestTracker.onRequestEnd())
-                    .thenAccept(batch -> processBatch(batch, batchSize))
-                    .whenComplete((v, err) -> {
-                        if (err != null) {
-                            completeSubscription(err);
-                        }
-                    });
+                    .thenAccept(batch -> processBatch(batch, batchSize));
         }
 
         private void processBatch(Collection<T> batch, int requestedCnt) {
@@ -249,7 +249,12 @@ public abstract class PartitionScanPublisher<T> implements 
Publisher<T> {
                     requestedItemsCnt -= batch.size();
 
                     if (requestedItemsCnt > 0) {
-                        serializationFuture = 
serializationFuture.thenCompose(v -> retrieveAndProcessBatch());
+                        serializationFuture = 
serializationFuture.thenCompose(v -> retrieveAndProcessBatch())
+                                .whenComplete((v, err) -> {
+                                    if (err != null) {
+                                        completeSubscription(err);
+                                    }
+                                });
                     }
                 }
             }

Reply via email to