This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a4d137402f [feature](workloadgroup) support nereids internal query 
and all dml query (#28054)
4a4d137402f is described below

commit 4a4d137402fbeee5f7e8d193e15da482b80bca1f
Author: yiguolei <[email protected]>
AuthorDate: Wed Dec 6 21:07:55 2023 +0800

    [feature](workloadgroup) support nereids internal query and all dml query 
(#28054)
    
    support nereids internal query to bind a workload group
    support insert into select bind workload group
    support create table as select bind workload group
    change token wait timeout to be query timeout or queue timeout
    query queue should not bind to pipeline engine, it could be used every 
where.
---
 .../java/org/apache/doris/qe/CoordInterface.java   |   5 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  31 ++++++
 .../java/org/apache/doris/qe/StmtExecutor.java     | 121 +++++++++------------
 .../doris/resource/workloadgroup/QueryQueue.java   |   2 +-
 .../doris/resource/workloadgroup/QueueToken.java   |  11 +-
 .../workload_manager_p0/test_curd_wlg.groovy       |  39 ++++++-
 6 files changed, 132 insertions(+), 77 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
index 925cd1fd15b..e57d0d261e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
@@ -20,6 +20,7 @@ package org.apache.doris.qe;
 import org.apache.doris.proto.Types;
 
 public interface CoordInterface {
+
     public void exec() throws Exception;
 
     public RowBatch getNext() throws Exception;
@@ -27,5 +28,9 @@ public interface CoordInterface {
     public int getInstanceTotalNum();
 
     public void cancel(Types.PPlanFragmentCancelReason cancelReason);
+
+    // When call exec or get next data finished, should call this method to 
release
+    // some resource.
+    public default void close() {}
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3789be07546..cc56b6f764a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -68,6 +68,8 @@ import org.apache.doris.proto.Types;
 import org.apache.doris.proto.Types.PUniqueId;
 import org.apache.doris.qe.ConnectContext.ConnectType;
 import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
+import org.apache.doris.resource.workloadgroup.QueryQueue;
+import org.apache.doris.resource.workloadgroup.QueueToken;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.ExecuteEnv;
@@ -269,6 +271,9 @@ public class Coordinator implements CoordInterface {
 
     private final ExecutionProfile executionProfile;
 
+    private QueueToken queueToken = null;
+    private QueryQueue queryQueue = null;
+
     public ExecutionProfile getExecutionProfile() {
         return executionProfile;
     }
@@ -590,6 +595,32 @@ public class Coordinator implements CoordInterface {
     // A call to Exec() must precede all other member function calls.
     @Override
     public void exec() throws Exception {
+        // LoadTask does not have context, not controlled by queue now
+        if (Config.enable_workload_group && Config.enable_query_queue && 
context != null) {
+            queryQueue = 
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
+            if (queryQueue == null) {
+                // This logic is actually useless, because when could not find 
query queue, it will
+                // throw exception during workload group manager.
+                throw new UserException("could not find query queue");
+            }
+            queueToken = queryQueue.getToken();
+            if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() 
* 1000)) {
+                LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + 
queueToken.getOfferResultDetail());
+                queryQueue.returnToken(queueToken);
+                throw new UserException(queueToken.getOfferResultDetail());
+            }
+        }
+        execInternal();
+    }
+
+    @Override
+    public void close() {
+        if (queryQueue != null && queueToken != null) {
+            queryQueue.returnToken(queueToken);
+        }
+    }
+
+    private void execInternal() throws Exception {
         if (LOG.isDebugEnabled() && !scanNodes.isEmpty()) {
             LOG.debug("debug: in Coordinator::exec. query id: {}, planNode: 
{}",
                     DebugUtil.printId(queryId), 
scanNodes.get(0).treeToThrift());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c7658b94e61..7fdbd061696 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -143,8 +143,6 @@ import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.cache.Cache;
 import org.apache.doris.qe.cache.CacheAnalyzer;
 import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
-import org.apache.doris.resource.workloadgroup.QueryQueue;
-import org.apache.doris.resource.workloadgroup.QueueToken;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.rpc.RpcException;
@@ -618,55 +616,37 @@ public class StmtExecutor {
     private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
         // queue query here
         syncJournalIfNeeded();
-        QueueToken queueToken = null;
-        QueryQueue queryQueue = null;
-        if (!parsedStmt.isExplain() && Config.enable_workload_group && 
Config.enable_query_queue
-                && context.getSessionVariable().getEnablePipelineEngine()) {
-            queryQueue = 
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
-            queueToken = queryQueue.getToken();
-            if (!queueToken.waitSignal()) {
-                LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + 
queueToken.getOfferResultDetail());
-                queryQueue.returnToken(queueToken);
-                throw new UserException(queueToken.getOfferResultDetail());
-            }
-        }
 
         int retryTime = Config.max_query_retry_time;
-        try {
-            for (int i = 0; i < retryTime; i++) {
-                try {
-                    // reset query id for each retry
-                    if (i > 0) {
-                        UUID uuid = UUID.randomUUID();
-                        TUniqueId newQueryId = new 
TUniqueId(uuid.getMostSignificantBits(),
-                                uuid.getLeastSignificantBits());
-                        AuditLog.getQueryAudit().log("Query {} {} times with 
new query id: {}",
-                                DebugUtil.printId(queryId), i, 
DebugUtil.printId(newQueryId));
-                        context.setQueryId(newQueryId);
-                    }
-                    if (context.getConnectType() == 
ConnectType.ARROW_FLIGHT_SQL) {
-                        context.setReturnResultFromLocal(false);
-                    }
-                    handleQueryStmt();
-                    break;
-                } catch (RpcException e) {
-                    if (i == retryTime - 1) {
-                        throw e;
-                    }
-                    if (context.getConnectType().equals(ConnectType.MYSQL) && 
!context.getMysqlChannel().isSend()) {
-                        LOG.warn("retry {} times. stmt: {}", (i + 1), 
parsedStmt.getOrigStmt().originStmt);
-                    } else {
-                        throw e;
-                    }
-                } finally {
-                    if (context.isReturnResultFromLocal()) {
-                        finalizeQuery();
-                    }
+        for (int i = 0; i < retryTime; i++) {
+            try {
+                // reset query id for each retry
+                if (i > 0) {
+                    UUID uuid = UUID.randomUUID();
+                    TUniqueId newQueryId = new 
TUniqueId(uuid.getMostSignificantBits(),
+                            uuid.getLeastSignificantBits());
+                    AuditLog.getQueryAudit().log("Query {} {} times with new 
query id: {}",
+                            DebugUtil.printId(queryId), i, 
DebugUtil.printId(newQueryId));
+                    context.setQueryId(newQueryId);
+                }
+                if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
+                    context.setReturnResultFromLocal(false);
+                }
+                handleQueryStmt();
+                break;
+            } catch (RpcException e) {
+                if (i == retryTime - 1) {
+                    throw e;
+                }
+                if (context.getConnectType().equals(ConnectType.MYSQL) && 
!context.getMysqlChannel().isSend()) {
+                    LOG.warn("retry {} times. stmt: {}", (i + 1), 
parsedStmt.getOrigStmt().originStmt);
+                } else {
+                    throw e;
+                }
+            } finally {
+                if (context.isReturnResultFromLocal()) {
+                    finalizeQuery();
                 }
-            }
-        } finally {
-            if (queueToken != null) {
-                queryQueue.returnToken(queueToken);
             }
         }
     }
@@ -1497,27 +1477,13 @@ public class StmtExecutor {
             coordBase = coord;
         }
 
-        coordBase.exec();
-
-        profile.getSummaryProfile().setQueryScheduleFinishTime();
-        updateProfile(false);
-        if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
-            try {
-                LOG.debug("Start to execute fragment. user: {}, db: {}, sql: 
{}, fragment instance num: {}",
-                        context.getQualifiedUser(), context.getDatabase(),
-                        parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
-                        coordBase.getInstanceTotalNum());
-            } catch (Exception e) {
-                LOG.warn("Fail to print fragment concurrency for Query.", e);
-            }
-        }
-
-        if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
-            Preconditions.checkState(!context.isReturnResultFromLocal());
-            profile.getSummaryProfile().setTempStartTime();
+        try {
+            coordBase.exec();
+            profile.getSummaryProfile().setQueryScheduleFinishTime();
+            updateProfile(false);
             if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
                 try {
-                    LOG.debug("Finish to execute fragment. user: {}, db: {}, 
sql: {}, fragment instance num: {}",
+                    LOG.debug("Start to execute fragment. user: {}, db: {}, 
sql: {}, fragment instance num: {}",
                             context.getQualifiedUser(), context.getDatabase(),
                             parsedStmt.getOrigStmt().originStmt.replace("\n", 
" "),
                             coordBase.getInstanceTotalNum());
@@ -1525,10 +1491,22 @@ public class StmtExecutor {
                     LOG.warn("Fail to print fragment concurrency for Query.", 
e);
                 }
             }
-            return;
-        }
 
-        try {
+            if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) 
{
+                Preconditions.checkState(!context.isReturnResultFromLocal());
+                profile.getSummaryProfile().setTempStartTime();
+                if (coordBase.getInstanceTotalNum() > 1 && 
LOG.isDebugEnabled()) {
+                    try {
+                        LOG.debug("Finish to execute fragment. user: {}, db: 
{}, sql: {}, fragment instance num: {}",
+                                context.getQualifiedUser(), 
context.getDatabase(),
+                                
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
+                                coordBase.getInstanceTotalNum());
+                    } catch (Exception e) {
+                        LOG.warn("Fail to print fragment concurrency for 
Query.", e);
+                    }
+                }
+                return;
+            }
             while (true) {
                 // register the fetch result time.
                 profile.getSummaryProfile().setTempStartTime();
@@ -1603,6 +1581,7 @@ public class StmtExecutor {
             coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
             throw e;
         } finally {
+            coordBase.close();
             if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
                 try {
                     LOG.debug("Finish to execute fragment. user: {}, db: {}, 
sql: {}, fragment instance num: {}",
@@ -2038,6 +2017,7 @@ public class StmtExecutor {
                  */
                 throwable = t;
             } finally {
+                coord.close();
                 finalizeQuery();
             }
 
@@ -2741,6 +2721,7 @@ public class StmtExecutor {
                 throw new RuntimeException("Failed to fetch internal SQL 
result. " + Util.getRootCauseMessage(e), e);
             }
         } finally {
+            coord.close();
             AuditLogHelper.logAuditLog(context, originStmt.toString(), 
parsedStmt, getQueryStatisticsForAuditLog(),
                     true);
             QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 5d9a61f4a2b..7ba6353e746 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -126,7 +126,7 @@ public class QueryQueue {
     }
 
     // If the token is acquired and do work success, then call this method to 
release it.
-    public void returnToken(QueueToken token) throws InterruptedException {
+    public void returnToken(QueueToken token) {
         queueLock.lock();
         try {
             // If current token is not in ready to run state, then it is still 
in the queue
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
index 9126535dc0f..17299d3ea3b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
@@ -42,7 +42,7 @@ public class QueueToken {
 
     private TokenState tokenState;
 
-    private long waitTimeout = 0;
+    private long queueWaitTimeout = 0;
 
     private String offerResultDetail;
 
@@ -51,15 +51,15 @@ public class QueueToken {
     private final ReentrantLock tokenLock = new ReentrantLock();
     private final Condition tokenCond = tokenLock.newCondition();
 
-    public QueueToken(TokenState tokenState, long waitTimeout,
+    public QueueToken(TokenState tokenState, long queueWaitTimeout,
             String offerResultDetail) {
         this.tokenId = tokenIdGenerator.addAndGet(1);
         this.tokenState = tokenState;
-        this.waitTimeout = waitTimeout;
+        this.queueWaitTimeout = queueWaitTimeout;
         this.offerResultDetail = offerResultDetail;
     }
 
-    public boolean waitSignal() throws InterruptedException {
+    public boolean waitSignal(long queryTimeoutMillis) throws 
InterruptedException {
         this.tokenLock.lock();
         try {
             if (isTimeout) {
@@ -68,6 +68,9 @@ public class QueueToken {
             if (tokenState == TokenState.READY_TO_RUN) {
                 return true;
             }
+            // If query timeout is less than queue wait timeout, then should 
use
+            // query timeout as wait timeout
+            long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? 
queueWaitTimeout : queryTimeoutMillis;
             tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS);
             // If wait timeout and is steal not ready to run, then return false
             if (tokenState != TokenState.READY_TO_RUN) {
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 476aa4c10a9..6ad4697ae33 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -17,8 +17,12 @@
 
 suite("test_crud_wlg") {
     def table_name = "wlg_test_table"
+    def table_name2 = "wlg_test_table2"
+    def table_name3 = "wlg_test_table3"
 
     sql "drop table if exists ${table_name}"
+    sql "drop table if exists ${table_name2}"
+    sql "drop table if exists ${table_name3}"
 
     sql """
         CREATE TABLE IF NOT EXISTS `${table_name}` (
@@ -37,6 +41,23 @@ suite("test_crud_wlg") {
         )
     """
 
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table_name2}` (
+          `siteid` int(11) NOT NULL COMMENT "",
+          `citycode` int(11) NOT NULL COMMENT "",
+          `userid` int(11) NOT NULL COMMENT "",
+          `pv` int(11) NOT NULL COMMENT ""
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`siteid`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2"
+        )
+    """
+
     sql """insert into ${table_name} values
         (9,10,11,12),
         (1,2,3,4)
@@ -257,7 +278,21 @@ suite("test_crud_wlg") {
     sql "alter workload group test_group properties ( 'max_queue_size'='0' );"
     Thread.sleep(3000);
     try {
-        sql "select 1;"
+        sql "select * from ${table_name};"
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains("query waiting queue is full"));
+    }
+
+    // test insert into select will go to queue
+    try {
+        sql "insert into ${table_name2} select * from ${table_name};"
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains("query waiting queue is full"));
+    }
+
+    // test create table as select will go to queue
+    try {
+        sql "create table ${table_name3} PROPERTIES('replication_num' = '1') 
as select * from ${table_name};"
     } catch (Exception e) {
         assertTrue(e.getMessage().contains("query waiting queue is full"));
     }
@@ -266,7 +301,7 @@ suite("test_crud_wlg") {
     sql "alter workload group test_group properties ( 'queue_timeout'='500' );"
     Thread.sleep(3000);
     try {
-        sql "select 1;"
+        sql "select * from ${table_name};"
     } catch (Exception e) {
         assertTrue(e.getMessage().contains("query wait timeout"));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to