This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 09970bbf6d IGNITE-23072 Sql. Fix error on concurrent cursor closing
(#4327)
09970bbf6d is described below
commit 09970bbf6d83a0684ededd4918f1d3bb31b1b24e
Author: korlov42 <[email protected]>
AuthorDate: Wed Sep 4 17:32:27 2024 +0300
IGNITE-23072 Sql. Fix error on concurrent cursor closing (#4327)
---
.../internal/sql/engine/exec/rel/AsyncRootNode.java | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index e0026a757d..20b42ce50c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -214,14 +214,15 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
}
private void flush() throws Exception {
- // flush may be triggered by prefetching, so let's do nothing in this
case
- if (pendingRequests.isEmpty()) {
- return;
- }
-
PendingRequest<OutRowT> currentReq = pendingRequests.peek();
- assert currentReq != null;
+ // There may be no pending requests in two cases:
+ // 1) flush has been triggered by prefetch
+ // 2) concurrent cancellation already cleared the queue
+ // In both cases we should just return immediately.
+ if (currentReq == null) {
+ return;
+ }
taskScheduled.set(false);
@@ -232,7 +233,9 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
boolean hasMoreRows = waiting != -1 || !buff.isEmpty();
if (currentReq.buff.size() == currentReq.requested || !hasMoreRows) {
- pendingRequests.remove();
+ // use poll() instead of remove() because latter throws exception
when queue is empty,
+ // and queue may be cleared concurrently by cancellation
+ pendingRequests.poll();
currentReq.fut.complete(new BatchedResult<>(currentReq.buff,
hasMoreRows));
}