This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 13420e4efe [Fix](planner) Fix query queue can not limit maxConcurrency 
#23418
13420e4efe is described below

commit 13420e4efeb135ded5ab73b2734b3a243cf348e1
Author: wangbo <[email protected]>
AuthorDate: Sat Aug 26 17:31:44 2023 +0800

    [Fix](planner) Fix query queue can not limit maxConcurrency #23418
    
    2 Fix concurrent can not limit
---
 .../java/org/apache/doris/qe/StmtExecutor.java     | 13 +++++-----
 .../doris/resource/workloadgroup/QueryQueue.java   | 28 +++++++++++++++++-----
 2 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index d1da7f2b7d..6da5c07c30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -211,9 +211,6 @@ public class StmtExecutor {
     private OriginStatement originStmt;
     private StatementBase parsedStmt;
     private Analyzer analyzer;
-    private QueryQueue queryQueue = null;
-    // by default, false means no query queued, then no need to poll when 
query finish
-    private QueueOfferToken offerRet = new QueueOfferToken(false);
     private ProfileType profileType = ProfileType.QUERY;
     private volatile Coordinator coord = null;
     private MasterOpExecutor masterOpExecutor = null;
@@ -579,17 +576,19 @@ public class StmtExecutor {
     private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
         // queue query here
         syncJournalIfNeeded();
+        QueueOfferToken offerRet = null;
+        QueryQueue queryQueue = null;
         if (!parsedStmt.isExplain() && Config.enable_workload_group && 
Config.enable_query_queue
                 && context.getSessionVariable().getEnablePipelineEngine()) {
-            this.queryQueue = 
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
+            queryQueue = 
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
             try {
-                this.offerRet = queryQueue.offer();
+                offerRet = queryQueue.offer();
             } catch (InterruptedException e) {
                 // this Exception means try lock/await failed, so no need to 
handle offer result
                 LOG.error("error happens when offer queue, query id=" + 
DebugUtil.printId(queryId) + " ", e);
                 throw new RuntimeException("interrupted Exception happens when 
queue query");
             }
-            if (!offerRet.isOfferSuccess()) {
+            if (offerRet != null && !offerRet.isOfferSuccess()) {
                 String retMsg = "queue failed, reason=" + 
offerRet.getOfferResultDetail();
                 LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + 
retMsg);
                 throw new UserException(retMsg);
@@ -629,7 +628,7 @@ public class StmtExecutor {
                 }
             }
         } finally {
-            if (offerRet.isOfferSuccess()) {
+            if (offerRet != null && offerRet.isOfferSuccess()) {
                 queryQueue.poll();
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 8f364fc4a0..dc7e672973 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -59,11 +59,17 @@ public class QueryQueue {
         // we should catch the case when it happens
         queueLock.tryLock(5, TimeUnit.SECONDS);
         try {
-            // currentRunningQueryNum may bigger than maxRunningQueryNum
-            // because maxRunningQueryNum can be altered
-            if (currentRunningQueryNum >= maxConcurrency) {
+            if (LOG.isDebugEnabled()) {
+                LOG.info(this.debugString());
+            }
+
+            while (true) {
+                if (currentRunningQueryNum < maxConcurrency) {
+                    break;
+                }
+                // currentRunningQueryNum may bigger than maxRunningQueryNum
+                // because maxRunningQueryNum can be altered
                 if (currentWaitingQueryNum >= maxQueueSize) {
-                    LOG.debug(this.debugString());
                     return new QueueOfferToken(false, "query waiting queue is 
full, queue length=" + maxQueueSize);
                 }
 
@@ -75,13 +81,15 @@ public class QueryQueue {
                     currentWaitingQueryNum--;
                 }
                 if (!ret) {
-                    LOG.debug(this.debugString());
                     return new QueueOfferToken(false, "query wait timeout " + 
queueTimeout + " ms");
                 }
             }
             currentRunningQueryNum++;
             return new QueueOfferToken(true, "offer success");
         } finally {
+            if (LOG.isDebugEnabled()) {
+                LOG.info(this.debugString());
+            }
             queueLock.unlock();
         }
     }
@@ -92,8 +100,13 @@ public class QueryQueue {
             currentRunningQueryNum--;
             Preconditions.checkArgument(currentRunningQueryNum >= 0);
             // maybe only when currentWaitingQueryNum != 0 need to signal
-            queueLockCond.signal();
+            if (currentRunningQueryNum < maxConcurrency) {
+                queueLockCond.signal();
+            }
         } finally {
+            if (LOG.isDebugEnabled()) {
+                LOG.info(this.debugString());
+            }
             queueLock.unlock();
         }
     }
@@ -106,6 +119,9 @@ public class QueryQueue {
                 this.maxQueueSize = maxQueueSize;
                 this.queueTimeout = queryWaitTimeout;
             } finally {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(this.debugString());
+                }
                 queueLock.unlock();
             }
         } catch (InterruptedException e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to