This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 423d0e9a337 IGNITE-24988 Sql. Improve error handling in
PartitionScanPublisher (#5669)
423d0e9a337 is described below
commit 423d0e9a33797aa53b13da2c695f6d1be2465d3e
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Wed Apr 23 18:38:26 2025 +0300
IGNITE-24988 Sql. Improve error handling in PartitionScanPublisher (#5669)
---
.../distributed/storage/PartitionScanPublisher.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java
index dd6f8ea6759..5a6dd8534ab 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.java
@@ -150,7 +150,12 @@ public abstract class PartitionScanPublisher<T> implements
Publisher<T> {
}
if (shouldRetrieveBatch) {
- serializationFuture = serializationFuture.thenCompose(v ->
retrieveAndProcessBatch());
+ serializationFuture = serializationFuture.thenCompose(v ->
retrieveAndProcessBatch())
+ .whenComplete((v, err) -> {
+ if (err != null) {
+ completeSubscription(err);
+ }
+ });
}
}
}
@@ -224,12 +229,7 @@ public abstract class PartitionScanPublisher<T> implements
Publisher<T> {
return retrieveBatch(scanId, batchSize)
.whenComplete((batch, err) ->
inflightBatchRequestTracker.onRequestEnd())
- .thenAccept(batch -> processBatch(batch, batchSize))
- .whenComplete((v, err) -> {
- if (err != null) {
- completeSubscription(err);
- }
- });
+ .thenAccept(batch -> processBatch(batch, batchSize));
}
private void processBatch(Collection<T> batch, int requestedCnt) {
@@ -249,7 +249,12 @@ public abstract class PartitionScanPublisher<T> implements
Publisher<T> {
requestedItemsCnt -= batch.size();
if (requestedItemsCnt > 0) {
- serializationFuture =
serializationFuture.thenCompose(v -> retrieveAndProcessBatch());
+ serializationFuture =
serializationFuture.thenCompose(v -> retrieveAndProcessBatch())
+ .whenComplete((v, err) -> {
+ if (err != null) {
+ completeSubscription(err);
+ }
+ });
}
}
}