This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 13420e4efe [Fix](planner) Fix query queue can not limit maxConcurrency
#23418
13420e4efe is described below
commit 13420e4efeb135ded5ab73b2734b3a243cf348e1
Author: wangbo <[email protected]>
AuthorDate: Sat Aug 26 17:31:44 2023 +0800
[Fix](planner) Fix query queue can not limit maxConcurrency #23418
2 Fix concurrent can not limit
---
.../java/org/apache/doris/qe/StmtExecutor.java | 13 +++++-----
.../doris/resource/workloadgroup/QueryQueue.java | 28 +++++++++++++++++-----
2 files changed, 28 insertions(+), 13 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index d1da7f2b7d..6da5c07c30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -211,9 +211,6 @@ public class StmtExecutor {
private OriginStatement originStmt;
private StatementBase parsedStmt;
private Analyzer analyzer;
- private QueryQueue queryQueue = null;
- // by default, false means no query queued, then no need to poll when
query finish
- private QueueOfferToken offerRet = new QueueOfferToken(false);
private ProfileType profileType = ProfileType.QUERY;
private volatile Coordinator coord = null;
private MasterOpExecutor masterOpExecutor = null;
@@ -579,17 +576,19 @@ public class StmtExecutor {
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
// queue query here
syncJournalIfNeeded();
+ QueueOfferToken offerRet = null;
+ QueryQueue queryQueue = null;
if (!parsedStmt.isExplain() && Config.enable_workload_group &&
Config.enable_query_queue
&& context.getSessionVariable().getEnablePipelineEngine()) {
- this.queryQueue =
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
+ queryQueue =
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
try {
- this.offerRet = queryQueue.offer();
+ offerRet = queryQueue.offer();
} catch (InterruptedException e) {
// this Exception means try lock/await failed, so no need to
handle offer result
LOG.error("error happens when offer queue, query id=" +
DebugUtil.printId(queryId) + " ", e);
throw new RuntimeException("interrupted Exception happens when
queue query");
}
- if (!offerRet.isOfferSuccess()) {
+ if (offerRet != null && !offerRet.isOfferSuccess()) {
String retMsg = "queue failed, reason=" +
offerRet.getOfferResultDetail();
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " +
retMsg);
throw new UserException(retMsg);
@@ -629,7 +628,7 @@ public class StmtExecutor {
}
}
} finally {
- if (offerRet.isOfferSuccess()) {
+ if (offerRet != null && offerRet.isOfferSuccess()) {
queryQueue.poll();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 8f364fc4a0..dc7e672973 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -59,11 +59,17 @@ public class QueryQueue {
// we should catch the case when it happens
queueLock.tryLock(5, TimeUnit.SECONDS);
try {
- // currentRunningQueryNum may bigger than maxRunningQueryNum
- // because maxRunningQueryNum can be altered
- if (currentRunningQueryNum >= maxConcurrency) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info(this.debugString());
+ }
+
+ while (true) {
+ if (currentRunningQueryNum < maxConcurrency) {
+ break;
+ }
+ // currentRunningQueryNum may bigger than maxRunningQueryNum
+ // because maxRunningQueryNum can be altered
if (currentWaitingQueryNum >= maxQueueSize) {
- LOG.debug(this.debugString());
return new QueueOfferToken(false, "query waiting queue is
full, queue length=" + maxQueueSize);
}
@@ -75,13 +81,15 @@ public class QueryQueue {
currentWaitingQueryNum--;
}
if (!ret) {
- LOG.debug(this.debugString());
return new QueueOfferToken(false, "query wait timeout " +
queueTimeout + " ms");
}
}
currentRunningQueryNum++;
return new QueueOfferToken(true, "offer success");
} finally {
+ if (LOG.isDebugEnabled()) {
+ LOG.info(this.debugString());
+ }
queueLock.unlock();
}
}
@@ -92,8 +100,13 @@ public class QueryQueue {
currentRunningQueryNum--;
Preconditions.checkArgument(currentRunningQueryNum >= 0);
// maybe only when currentWaitingQueryNum != 0 need to signal
- queueLockCond.signal();
+ if (currentRunningQueryNum < maxConcurrency) {
+ queueLockCond.signal();
+ }
} finally {
+ if (LOG.isDebugEnabled()) {
+ LOG.info(this.debugString());
+ }
queueLock.unlock();
}
}
@@ -106,6 +119,9 @@ public class QueryQueue {
this.maxQueueSize = maxQueueSize;
this.queueTimeout = queryWaitTimeout;
} finally {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this.debugString());
+ }
queueLock.unlock();
}
} catch (InterruptedException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]