This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 09970bbf6d IGNITE-23072 Sql. Fix error on concurrent cursor closing 
(#4327)
09970bbf6d is described below

commit 09970bbf6d83a0684ededd4918f1d3bb31b1b24e
Author: korlov42 <[email protected]>
AuthorDate: Wed Sep 4 17:32:27 2024 +0300

    IGNITE-23072 Sql. Fix error on concurrent cursor closing (#4327)
---
 .../internal/sql/engine/exec/rel/AsyncRootNode.java     | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index e0026a757d..20b42ce50c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -214,14 +214,15 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
     }
 
     private void flush() throws Exception {
-        // flush may be triggered by prefetching, so let's do nothing in this 
case
-        if (pendingRequests.isEmpty()) {
-            return;
-        }
-
         PendingRequest<OutRowT> currentReq = pendingRequests.peek();
 
-        assert currentReq != null;
+        // There may be no pending requests in two cases:
+        //   1) flush has been triggered by prefetch
+        //   2) concurrent cancellation already cleared the queue
+        // In both cases we should just return immediately.
+        if (currentReq == null) {
+            return;
+        }
 
         taskScheduled.set(false);
 
@@ -232,7 +233,9 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
         boolean hasMoreRows = waiting != -1 || !buff.isEmpty();
 
         if (currentReq.buff.size() == currentReq.requested || !hasMoreRows) {
-            pendingRequests.remove();
+            // use poll() instead of remove() because latter throws exception 
when queue is empty,
+            // and queue may be cleared concurrently by cancellation
+            pendingRequests.poll();
 
             currentReq.fut.complete(new BatchedResult<>(currentReq.buff, 
hasMoreRows));
         }

Reply via email to