This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 057d1f4d511 [bugfix](wg)refactor query queue for robustness (#37642)
057d1f4d511 is described below
commit 057d1f4d5118136d696cb64d24f75a53aa041b7a
Author: wangbo <[email protected]>
AuthorDate: Thu Jul 11 14:30:46 2024 +0800
[bugfix](wg)refactor query queue for robustness (#37642)
## Proposed changes
To simplify QueryQueue's code for robustness, redefine QueryQueue's
usage in two steps:
1 use QueryQueue.getToken to get token, then token state maybe running
or queued;
2 release QueryToken when coordinator.close,decrement runningQueryNum or
remove it from waiting queue;
We just need to keep this two step is atomic,then whether QueueToken.get
is succ or exception is not important.
So this PR remove ```removeToken``` method and just release QueueToken
in ```coord.close```.
imported: #35929
---
.../src/main/java/org/apache/doris/qe/Coordinator.java | 12 +++++++++---
.../org/apache/doris/resource/workloadgroup/QueryQueue.java | 13 +++----------
.../org/apache/doris/resource/workloadgroup/QueueToken.java | 3 ---
3 files changed, 12 insertions(+), 16 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 1c742038b50..e17100e3072 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -660,9 +660,7 @@ public class Coordinator implements CoordInterface {
@Override
public void close() {
- for (ScanNode scanNode : scanNodes) {
- scanNode.stop();
- }
+ // NOTE: all close method should be no exception
if (queryQueue != null && queueToken != null) {
try {
queryQueue.releaseAndNotify(queueToken);
@@ -670,6 +668,14 @@ public class Coordinator implements CoordInterface {
LOG.error("error happens when coordinator close ", t);
}
}
+
+ try {
+ for (ScanNode scanNode : scanNodes) {
+ scanNode.stop();
+ }
+ } catch (Throwable t) {
+ LOG.error("error happens when scannode stop ", t);
+ }
}
private void execInternal() throws Exception {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index d36042732bf..07d5939cc4f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -129,9 +129,11 @@ public class QueryQueue {
public void releaseAndNotify(QueueToken releaseToken) {
queueLock.lock();
try {
- //NOTE:token's tokenState need to be locked by queueLock
+ // NOTE:token's tokenState need to be locked by queueLock
if (releaseToken.isReadyToRun()) {
currentRunningQueryNum--;
+ } else {
+ priorityTokenQueue.remove(releaseToken);
}
Preconditions.checkArgument(currentRunningQueryNum >= 0);
while (currentRunningQueryNum < maxConcurrency) {
@@ -165,13 +167,4 @@ public class QueryQueue {
}
}
- public void removeToken(QueueToken queueToken) {
- queueLock.lock();
- try {
- priorityTokenQueue.remove(queueToken);
- } finally {
- queueLock.unlock();
- }
- }
-
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
index e9a17ca1ab4..20a46a526f5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
@@ -76,13 +76,10 @@ public class QueueToken implements Comparable<QueueToken> {
try {
future.get(waitTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
- queue.removeToken(this);
throw new UserException("query queue timeout, timeout: " +
waitTimeout + " ms ");
} catch (CancellationException e) {
- queue.removeToken(this);
throw new UserException("query is cancelled");
} catch (Throwable t) {
- queue.removeToken(this);
String errMsg = String.format("error happens when query {} queue",
queryId);
LOG.error(errMsg, t);
throw new RuntimeException(errMsg, t);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]