This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3534e59edc4 IGNITE-27389 SQL Calcite: Reduce contention by 
QueryBlockingTaskExecutor - Fixes #12589.
3534e59edc4 is described below

commit 3534e59edc4c827fa34104b393ca5a29796faa67
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Dec 24 16:37:29 2025 +0300

    IGNITE-27389 SQL Calcite: Reduce contention by QueryBlockingTaskExecutor - 
Fixes #12589.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../exec/task/QueryBlockingTaskExecutor.java       |  5 +--
 .../query/calcite/exec/task/QueryTasksQueue.java   | 49 +++++++++++++++-------
 .../calcite/exec/task/QueryTasksQueueTest.java     | 35 +++++++---------
 3 files changed, 53 insertions(+), 36 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
index c4feb74b8e9..fa202f5ca6e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
@@ -33,9 +33,6 @@ import static 
org.apache.ignite.internal.processors.pool.PoolProcessor.THREAD_PO
  * Query task executor based on queue with query blocking.
  */
 public class QueryBlockingTaskExecutor extends AbstractQueryTaskExecutor {
-    /** */
-    private final QueryTasksQueue tasksQueue = new QueryTasksQueue();
-
     /** */
     private IgniteThreadPoolExecutor executor;
 
@@ -57,6 +54,8 @@ public class QueryBlockingTaskExecutor extends 
AbstractQueryTaskExecutor {
     @Override public void onStart(GridKernalContext ctx) {
         super.onStart(ctx);
 
+        QueryTasksQueue tasksQueue = new 
QueryTasksQueue(ctx.config().getQueryThreadPoolSize());
+
         executor = new IgniteThreadPoolExecutor(
             THREAD_PREFIX,
             ctx.igniteInstanceName(),
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java
index 6bdc044076a..fbb2893b71b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java
@@ -69,11 +69,18 @@ class QueryTasksQueue {
     /** Set of blocked (currently running) queries. */
     private final Set<QueryKey> blockedQrys = new HashSet<>();
 
+    /** All tasks blocked by currently running queries. Can be false-negative. 
*/
+    private boolean allTasksBlocked;
+
+    /** Count of not parked and not currently busy task processing threads. */
+    private int freeThreadsCnt;
+
     /**
      * Creates a {@code LinkedBlockingQueue}.
      */
-    QueryTasksQueue() {
+    QueryTasksQueue(int threadsCnt) {
         last = head = new Node(null);
+        freeThreadsCnt = threadsCnt;
     }
 
     /** Queue size. */
@@ -83,16 +90,22 @@ class QueryTasksQueue {
 
     /** Add a task to the queue. */
     public void addTask(QueryAwareTask task) {
+        Node node = new Node(task);
+
         lock.lock();
 
         try {
             assert last.next == null : "Unexpected last.next: " + last.next;
 
-            last = last.next = new Node(task);
+            last = last.next = node;
 
-            cnt.getAndIncrement();
+            int tasksCnt = cnt.incrementAndGet();
 
-            notEmpty.signal();
+            // Do not wake up new threads if it's enough free threads to 
process the new task.
+            if (tasksCnt > freeThreadsCnt && !(allTasksBlocked && 
freeThreadsCnt > 0))
+                notEmpty.signal();
+
+            allTasksBlocked = false;
         }
         finally {
             lock.unlock();
@@ -100,17 +113,20 @@ class QueryTasksQueue {
     }
 
     /** Poll task and block query. */
-    public QueryAwareTask pollTaskAndBlockQuery(long timeout, TimeUnit unit) 
throws InterruptedException {
+    public @Nullable QueryAwareTask pollTaskAndBlockQuery(long nanos) throws 
InterruptedException {
         lock.lockInterruptibly();
 
         try {
+            freeThreadsCnt--;
+
             QueryAwareTask res;
 
-            long nanos = unit.toNanos(timeout);
+            while (cnt.get() == 0 || allTasksBlocked || (allTasksBlocked = 
(res = dequeue()) == null)) {
+                if (nanos <= 0L) {
+                    freeThreadsCnt++;
 
-            while (cnt.get() == 0 || (res = dequeue()) == null) {
-                if (nanos <= 0L)
                     return null;
+                }
 
                 nanos = notEmpty.awaitNanos(nanos);
             }
@@ -121,6 +137,11 @@ class QueryTasksQueue {
 
             return res;
         }
+        catch (Throwable e) {
+            freeThreadsCnt++;
+
+            throw e;
+        }
         finally {
             lock.unlock();
         }
@@ -141,8 +162,7 @@ class QueryTasksQueue {
 
                 unlink(pred, cur);
 
-                if (cnt.decrementAndGet() > 0)
-                    notEmpty.signal();
+                cnt.getAndDecrement();
 
                 return res;
             }
@@ -160,8 +180,9 @@ class QueryTasksQueue {
 
             assert removed;
 
-            if (cnt.get() > 0)
-                notEmpty.signal();
+            allTasksBlocked = false;
+
+            freeThreadsCnt++;
         }
         finally {
             lock.unlock();
@@ -290,11 +311,11 @@ class QueryTasksQueue {
             }
 
             @Override public @NotNull Runnable take() throws 
InterruptedException {
-                return pollTaskAndBlockQuery(Long.MAX_VALUE, 
TimeUnit.NANOSECONDS);
+                return pollTaskAndBlockQuery(Long.MAX_VALUE);
             }
 
             @Override public @Nullable Runnable poll(long timeout, @NotNull 
TimeUnit unit) throws InterruptedException {
-                return pollTaskAndBlockQuery(0, TimeUnit.NANOSECONDS);
+                return pollTaskAndBlockQuery(unit.toNanos(timeout));
             }
 
             @Override public Runnable remove() {
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java
index 134bf1d15e8..e132d4fd683 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java
@@ -31,8 +31,9 @@ public class QueryTasksQueueTest extends 
GridCommonAbstractTest {
     @Test
     public void testQueryBlockingUnblocking() throws Exception {
         long waitTimeout = 10_000L;
+        long waitTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(waitTimeout);
 
-        QueryTasksQueue queue = new QueryTasksQueue();
+        QueryTasksQueue queue = new QueryTasksQueue(1);
         UUID qryId1 = UUID.randomUUID();
         UUID qryId2 = UUID.randomUUID();
         QueryKey qryKey1 = new QueryKey(qryId1, 0);
@@ -46,13 +47,13 @@ public class QueryTasksQueueTest extends 
GridCommonAbstractTest {
         queue.addTask(new TestQueryAwareTask(qryKey1));
         queue.addTask(new TestQueryAwareTask(qryKey3));
 
-        QueryAwareTask task = queue.pollTaskAndBlockQuery(waitTimeout, 
TimeUnit.MILLISECONDS);
+        QueryAwareTask task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
         assertEquals(qryKey1, task.queryKey());
 
-        task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
+        task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
         assertEquals(qryKey2, task.queryKey());
 
-        task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
+        task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
         assertEquals(qryKey3, task.queryKey());
 
         // Test threads parking and unparking.
@@ -60,22 +61,20 @@ public class QueryTasksQueueTest extends 
GridCommonAbstractTest {
 
         Runnable pollAndStoreResult = () -> {
             try {
-                res[0] = queue.pollTaskAndBlockQuery(waitTimeout, 
TimeUnit.MILLISECONDS);
+                res[0] = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
             }
             catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
         };
 
-        // Unparking on unblock query.
+        // Query unblock check.
         Thread thread1 = new Thread(pollAndStoreResult);
 
-        thread1.start();
-
-        assertTrue(GridTestUtils.waitForCondition(() -> thread1.getState() == 
Thread.State.TIMED_WAITING, waitTimeout));
-
         queue.unblockQuery(qryKey2);
 
+        thread1.start();
+
         thread1.join(waitTimeout);
 
         assertFalse(thread1.isAlive());
@@ -104,18 +103,16 @@ public class QueryTasksQueueTest extends 
GridCommonAbstractTest {
         queue.unblockQuery(qryKey2);
         queue.unblockQuery(qryKey3);
 
-        task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
+        task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
         assertEquals(qryKey1, task.queryKey());
 
-        // Unparking on unblock query second time.
+        // Query unblock check second time.
         Thread thread3 = new Thread(pollAndStoreResult);
 
-        thread3.start();
-
-        assertTrue(GridTestUtils.waitForCondition(() -> thread3.getState() == 
Thread.State.TIMED_WAITING, waitTimeout));
-
         queue.unblockQuery(qryKey1);
 
+        thread3.start();
+
         thread3.join(waitTimeout);
 
         assertFalse(thread3.isAlive());
@@ -128,7 +125,7 @@ public class QueryTasksQueueTest extends 
GridCommonAbstractTest {
     /** */
     @Test
     public void testToArray() {
-        QueryTasksQueue queue = new QueryTasksQueue();
+        QueryTasksQueue queue = new QueryTasksQueue(1);
 
         QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0);
         QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1);
@@ -169,7 +166,7 @@ public class QueryTasksQueueTest extends 
GridCommonAbstractTest {
     /** */
     @Test
     public void testDrainTo() {
-        QueryTasksQueue queue = new QueryTasksQueue();
+        QueryTasksQueue queue = new QueryTasksQueue(1);
 
         QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0);
         QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1);
@@ -210,7 +207,7 @@ public class QueryTasksQueueTest extends 
GridCommonAbstractTest {
     /** */
     @Test
     public void testRemove() {
-        QueryTasksQueue queue = new QueryTasksQueue();
+        QueryTasksQueue queue = new QueryTasksQueue(1);
 
         QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0);
         QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1);

Reply via email to