This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3534e59edc4 IGNITE-27389 SQL Calcite: Reduce contention by
QueryBlockingTaskExecutor - Fixes #12589.
3534e59edc4 is described below
commit 3534e59edc4c827fa34104b393ca5a29796faa67
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Dec 24 16:37:29 2025 +0300
IGNITE-27389 SQL Calcite: Reduce contention by QueryBlockingTaskExecutor -
Fixes #12589.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../exec/task/QueryBlockingTaskExecutor.java | 5 +--
.../query/calcite/exec/task/QueryTasksQueue.java | 49 +++++++++++++++-------
.../calcite/exec/task/QueryTasksQueueTest.java | 35 +++++++---------
3 files changed, 53 insertions(+), 36 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
index c4feb74b8e9..fa202f5ca6e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
@@ -33,9 +33,6 @@ import static
org.apache.ignite.internal.processors.pool.PoolProcessor.THREAD_PO
* Query task executor based on queue with query blocking.
*/
public class QueryBlockingTaskExecutor extends AbstractQueryTaskExecutor {
- /** */
- private final QueryTasksQueue tasksQueue = new QueryTasksQueue();
-
/** */
private IgniteThreadPoolExecutor executor;
@@ -57,6 +54,8 @@ public class QueryBlockingTaskExecutor extends
AbstractQueryTaskExecutor {
@Override public void onStart(GridKernalContext ctx) {
super.onStart(ctx);
+ QueryTasksQueue tasksQueue = new
QueryTasksQueue(ctx.config().getQueryThreadPoolSize());
+
executor = new IgniteThreadPoolExecutor(
THREAD_PREFIX,
ctx.igniteInstanceName(),
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java
index 6bdc044076a..fbb2893b71b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java
@@ -69,11 +69,18 @@ class QueryTasksQueue {
/** Set of blocked (currently running) queries. */
private final Set<QueryKey> blockedQrys = new HashSet<>();
+ /** All tasks blocked by currently running queries. Can be false-negative.
*/
+ private boolean allTasksBlocked;
+
+ /** Count of not parked and not currently busy task processing threads. */
+ private int freeThreadsCnt;
+
/**
* Creates a {@code LinkedBlockingQueue}.
*/
- QueryTasksQueue() {
+ QueryTasksQueue(int threadsCnt) {
last = head = new Node(null);
+ freeThreadsCnt = threadsCnt;
}
/** Queue size. */
@@ -83,16 +90,22 @@ class QueryTasksQueue {
/** Add a task to the queue. */
public void addTask(QueryAwareTask task) {
+ Node node = new Node(task);
+
lock.lock();
try {
assert last.next == null : "Unexpected last.next: " + last.next;
- last = last.next = new Node(task);
+ last = last.next = node;
- cnt.getAndIncrement();
+ int tasksCnt = cnt.incrementAndGet();
- notEmpty.signal();
+ // Do not wake up new threads if it's enough free threads to
process the new task.
+ if (tasksCnt > freeThreadsCnt && !(allTasksBlocked &&
freeThreadsCnt > 0))
+ notEmpty.signal();
+
+ allTasksBlocked = false;
}
finally {
lock.unlock();
@@ -100,17 +113,20 @@ class QueryTasksQueue {
}
/** Poll task and block query. */
- public QueryAwareTask pollTaskAndBlockQuery(long timeout, TimeUnit unit)
throws InterruptedException {
+ public @Nullable QueryAwareTask pollTaskAndBlockQuery(long nanos) throws
InterruptedException {
lock.lockInterruptibly();
try {
+ freeThreadsCnt--;
+
QueryAwareTask res;
- long nanos = unit.toNanos(timeout);
+ while (cnt.get() == 0 || allTasksBlocked || (allTasksBlocked =
(res = dequeue()) == null)) {
+ if (nanos <= 0L) {
+ freeThreadsCnt++;
- while (cnt.get() == 0 || (res = dequeue()) == null) {
- if (nanos <= 0L)
return null;
+ }
nanos = notEmpty.awaitNanos(nanos);
}
@@ -121,6 +137,11 @@ class QueryTasksQueue {
return res;
}
+ catch (Throwable e) {
+ freeThreadsCnt++;
+
+ throw e;
+ }
finally {
lock.unlock();
}
@@ -141,8 +162,7 @@ class QueryTasksQueue {
unlink(pred, cur);
- if (cnt.decrementAndGet() > 0)
- notEmpty.signal();
+ cnt.getAndDecrement();
return res;
}
@@ -160,8 +180,9 @@ class QueryTasksQueue {
assert removed;
- if (cnt.get() > 0)
- notEmpty.signal();
+ allTasksBlocked = false;
+
+ freeThreadsCnt++;
}
finally {
lock.unlock();
@@ -290,11 +311,11 @@ class QueryTasksQueue {
}
@Override public @NotNull Runnable take() throws
InterruptedException {
- return pollTaskAndBlockQuery(Long.MAX_VALUE,
TimeUnit.NANOSECONDS);
+ return pollTaskAndBlockQuery(Long.MAX_VALUE);
}
@Override public @Nullable Runnable poll(long timeout, @NotNull
TimeUnit unit) throws InterruptedException {
- return pollTaskAndBlockQuery(0, TimeUnit.NANOSECONDS);
+ return pollTaskAndBlockQuery(unit.toNanos(timeout));
}
@Override public Runnable remove() {
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java
index 134bf1d15e8..e132d4fd683 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java
@@ -31,8 +31,9 @@ public class QueryTasksQueueTest extends
GridCommonAbstractTest {
@Test
public void testQueryBlockingUnblocking() throws Exception {
long waitTimeout = 10_000L;
+ long waitTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(waitTimeout);
- QueryTasksQueue queue = new QueryTasksQueue();
+ QueryTasksQueue queue = new QueryTasksQueue(1);
UUID qryId1 = UUID.randomUUID();
UUID qryId2 = UUID.randomUUID();
QueryKey qryKey1 = new QueryKey(qryId1, 0);
@@ -46,13 +47,13 @@ public class QueryTasksQueueTest extends
GridCommonAbstractTest {
queue.addTask(new TestQueryAwareTask(qryKey1));
queue.addTask(new TestQueryAwareTask(qryKey3));
- QueryAwareTask task = queue.pollTaskAndBlockQuery(waitTimeout,
TimeUnit.MILLISECONDS);
+ QueryAwareTask task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
assertEquals(qryKey1, task.queryKey());
- task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
+ task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
assertEquals(qryKey2, task.queryKey());
- task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
+ task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
assertEquals(qryKey3, task.queryKey());
// Test threads parking and unparking.
@@ -60,22 +61,20 @@ public class QueryTasksQueueTest extends
GridCommonAbstractTest {
Runnable pollAndStoreResult = () -> {
try {
- res[0] = queue.pollTaskAndBlockQuery(waitTimeout,
TimeUnit.MILLISECONDS);
+ res[0] = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
- // Unparking on unblock query.
+ // Query unblock check.
Thread thread1 = new Thread(pollAndStoreResult);
- thread1.start();
-
- assertTrue(GridTestUtils.waitForCondition(() -> thread1.getState() ==
Thread.State.TIMED_WAITING, waitTimeout));
-
queue.unblockQuery(qryKey2);
+ thread1.start();
+
thread1.join(waitTimeout);
assertFalse(thread1.isAlive());
@@ -104,18 +103,16 @@ public class QueryTasksQueueTest extends
GridCommonAbstractTest {
queue.unblockQuery(qryKey2);
queue.unblockQuery(qryKey3);
- task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
+ task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
assertEquals(qryKey1, task.queryKey());
- // Unparking on unblock query second time.
+ // Query unblock check second time.
Thread thread3 = new Thread(pollAndStoreResult);
- thread3.start();
-
- assertTrue(GridTestUtils.waitForCondition(() -> thread3.getState() ==
Thread.State.TIMED_WAITING, waitTimeout));
-
queue.unblockQuery(qryKey1);
+ thread3.start();
+
thread3.join(waitTimeout);
assertFalse(thread3.isAlive());
@@ -128,7 +125,7 @@ public class QueryTasksQueueTest extends
GridCommonAbstractTest {
/** */
@Test
public void testToArray() {
- QueryTasksQueue queue = new QueryTasksQueue();
+ QueryTasksQueue queue = new QueryTasksQueue(1);
QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0);
QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1);
@@ -169,7 +166,7 @@ public class QueryTasksQueueTest extends
GridCommonAbstractTest {
/** */
@Test
public void testDrainTo() {
- QueryTasksQueue queue = new QueryTasksQueue();
+ QueryTasksQueue queue = new QueryTasksQueue(1);
QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0);
QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1);
@@ -210,7 +207,7 @@ public class QueryTasksQueueTest extends
GridCommonAbstractTest {
/** */
@Test
public void testRemove() {
- QueryTasksQueue queue = new QueryTasksQueue();
+ QueryTasksQueue queue = new QueryTasksQueue(1);
QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0);
QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1);