This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 15803e874d1 [fix](query-forward) Fix forward query exception or stuck 
or potential query result loss (#41303)
15803e874d1 is described below

commit 15803e874d11dd3d6d19e585f376da35d8e6f3dc
Author: Siyang Tang <[email protected]>
AuthorDate: Tue Oct 22 17:43:24 2024 +0800

    [fix](query-forward) Fix forward query exception or stuck or potential 
query result loss (#41303)
    
    ## Proposed changes
    
    1. Fix forward query exception if no status code is set in master
    execution. EOF may result in this status.
    
    2. Fix forward query stuck due to no result packet sent to mysql
    channel. Should use result packets from master.
    
    3. Fix potential forward query result loss if follower can read status
    change during query process. Should judge by the status once before
    execution.
    
    4. Add assertion for regression test.
---
 .../main/java/org/apache/doris/common/Config.java    |  4 ++++
 .../java/org/apache/doris/qe/ConnectProcessor.java   | 14 ++++++++------
 .../main/java/org/apache/doris/qe/StmtExecutor.java  | 20 ++++++++++++--------
 .../suites/query_p0/test_forward_qeury.groovy        |  5 +++--
 4 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index b5c2bace7ae..fd8e2ad87bc 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3180,4 +3180,8 @@ public class Config extends ConfigBase {
                     + "default is conf/authentication.conf"})
     public static String authentication_config_file_path = 
"/conf/authentication.conf";
 
+    @ConfField(description = {"用于测试,强制将所有的查询forward到master以验证forward query的行为",
+            "For testing purposes, all queries are forcibly forwarded to the 
master to verify"
+                    + "the behavior of forwarding queries."})
+    public static boolean force_forward_all_queries = false;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 02aef153e37..2340aa37aeb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -560,11 +560,8 @@ public abstract class ConnectProcessor {
                 && ctx.getState().getStateType() != 
QueryState.MysqlStateType.ERR) {
             ShowResultSet resultSet = executor.getShowResultSet();
             if (resultSet == null) {
-                if (executor.sendProxyQueryResult()) {
-                    packet = getResultPacket();
-                } else {
-                    packet = executor.getOutputPacket();
-                }
+                executor.sendProxyQueryResult();
+                packet = executor.getOutputPacket();
             } else {
                 executor.sendResultSet(resultSet);
                 packet = getResultPacket();
@@ -724,7 +721,12 @@ public abstract class ConnectProcessor {
         if (ctx.getState().getStateType() == MysqlStateType.OK) {
             result.setStatusCode(0);
         } else {
-            result.setStatusCode(ctx.getState().getErrorCode().getCode());
+            ErrorCode errorCode = ctx.getState().getErrorCode();
+            if (errorCode != null) {
+                result.setStatusCode(errorCode.getCode());
+            } else {
+                result.setStatusCode(ErrorCode.ERR_UNKNOWN_ERROR.getCode());
+            }
             result.setErrMessage(ctx.getState().getErrorMessage());
         }
         if (request.isSetTxnLoadInfo()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 16600536aac..234d5f0e610 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -283,6 +283,7 @@ public class StmtExecutor {
     private boolean isHandleQueryInFe = false;
     // The profile of this execution
     private final Profile profile;
+    private Boolean isForwardedToMaster = null;
 
     // The result schema if "dry_run_query" is true.
     // Only one column to indicate the real return row numbers.
@@ -423,6 +424,13 @@ public class StmtExecutor {
     }
 
     public boolean isForwardToMaster() {
+        if (isForwardedToMaster == null) {
+            isForwardedToMaster = shouldForwardToMaster();
+        }
+        return isForwardedToMaster;
+    }
+
+    private boolean shouldForwardToMaster() {
         if (Env.getCurrentEnv().isMaster()) {
             return false;
         }
@@ -433,7 +441,7 @@ public class StmtExecutor {
 
         // this is a query stmt, but this non-master FE can not read, forward 
it to master
         if (isQuery() && !Env.getCurrentEnv().isMaster()
-                && (!Env.getCurrentEnv().canRead() || 
debugForwardAllQueries())) {
+                && (!Env.getCurrentEnv().canRead() || debugForwardAllQueries() 
|| Config.force_forward_all_queries)) {
             return true;
         }
 
@@ -446,7 +454,7 @@ public class StmtExecutor {
 
     private boolean debugForwardAllQueries() {
         DebugPoint debugPoint = 
DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries");
-        return debugPoint != null && debugPoint.param("forwardAllQueries", 
true);
+        return debugPoint != null && debugPoint.param("forwardAllQueries", 
false);
     }
 
     public ByteBuffer getOutputPacket() {
@@ -3618,17 +3626,13 @@ public class StmtExecutor {
         return ((ProxyMysqlChannel) 
context.getMysqlChannel()).getProxyResultBufferList();
     }
 
-    public boolean sendProxyQueryResult() throws IOException {
+    public void sendProxyQueryResult() throws IOException {
         if (masterOpExecutor == null) {
-            return false;
+            return;
         }
         List<ByteBuffer> queryResultBufList = 
masterOpExecutor.getQueryResultBufList();
-        if (queryResultBufList.isEmpty()) {
-            return false;
-        }
         for (ByteBuffer byteBuffer : queryResultBufList) {
             context.getMysqlChannel().sendOnePacket(byteBuffer);
         }
-        return true;
     }
 }
diff --git a/regression-test/suites/query_p0/test_forward_qeury.groovy 
b/regression-test/suites/query_p0/test_forward_qeury.groovy
index 798e8865ca5..d4761c835a2 100644
--- a/regression-test/suites/query_p0/test_forward_qeury.groovy
+++ b/regression-test/suites/query_p0/test_forward_qeury.groovy
@@ -41,8 +41,9 @@ suite("test_forward_query", 'docker') {
 
         sql """ INSERT INTO ${tbl} VALUES(1);"""
 
-        cluster.injectDebugPoints(NodeType.FE, 
['StmtExecutor.forward_all_queries' : [forwardAllQueries:true]])
+        cluster.injectDebugPoints(NodeType.FE, 
['StmtExecutor.forward_all_queries' : [forwardAllQueries:true, execute:1]])
 
-        sql """ SELECT * FROM ${tbl} """
+        def ret = sql """ SELECT * FROM ${tbl} """
+        assertEquals(ret[0][0], 1)
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to