This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f26c8c2f06b [enhancement](stmt-forward) record query result for proxy 
query to avoid EOF (#30536)
f26c8c2f06b is described below

commit f26c8c2f06be5cb6afe4c04a22255a9aeb7a6d8b
Author: Siyang Tang <[email protected]>
AuthorDate: Thu Feb 8 12:01:07 2024 +0800

    [enhancement](stmt-forward) record query result for proxy query to avoid 
EOF (#30536)
---
 .../org/apache/doris/mysql/ProxyMysqlChannel.java  | 40 +++++++++++++++++
 .../java/org/apache/doris/qe/ConnectContext.java   |  9 +++-
 .../java/org/apache/doris/qe/ConnectProcessor.java | 14 ++++--
 .../java/org/apache/doris/qe/MasterOpExecutor.java | 10 ++++-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 40 ++++++++++++++---
 .../apache/doris/service/FrontendServiceImpl.java  |  2 +-
 .../apache/doris/statistics/AnalysisManager.java   |  2 +-
 gensrc/thrift/FrontendService.thrift               |  1 +
 .../org/apache/doris/regression/suite/Suite.groovy | 14 +++---
 .../doris/regression/suite/SuiteCluster.groovy     |  5 +++
 .../suites/query_p0/test_forward_qeury.groovy      | 52 ++++++++++++++++++++++
 11 files changed, 169 insertions(+), 20 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java
new file mode 100644
index 00000000000..ed491df4b87
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mysql;
+
+import com.google.common.collect.Lists;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * An interceptor for proxy query, keeping all packets to be sent to master 
mysql channel.
+ */
+public class ProxyMysqlChannel extends MysqlChannel {
+
+    private final List<ByteBuffer> proxyResultBuffer = Lists.newArrayList();
+
+    @Override
+    public void sendOnePacket(ByteBuffer packet) {
+        proxyResultBuffer.add(packet);
+    }
+
+    public List<ByteBuffer> getProxyResultBufferList() {
+        return proxyResultBuffer;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 15e25cfdf3a..6e32a896895 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -47,6 +47,7 @@ import org.apache.doris.mysql.MysqlCapability;
 import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.mysql.MysqlSslContext;
+import org.apache.doris.mysql.ProxyMysqlChannel;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
@@ -336,14 +337,20 @@ public class ConnectContext {
     }
 
     public ConnectContext() {
-        this((StreamConnection) null);
+        this(null);
     }
 
     public ConnectContext(StreamConnection connection) {
+        this(connection, false);
+    }
+
+    public ConnectContext(StreamConnection connection, boolean isProxy) {
         connectType = ConnectType.MYSQL;
         serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
         if (connection != null) {
             mysqlChannel = new MysqlChannel(connection, this);
+        } else if (isProxy) {
+            mysqlChannel = new ProxyMysqlChannel();
         } else {
             mysqlChannel = new DummyMysqlChannel();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 442b53669e0..2df6033e1fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -445,7 +445,11 @@ public abstract class ConnectProcessor {
                 && ctx.getState().getStateType() != 
QueryState.MysqlStateType.ERR) {
             ShowResultSet resultSet = executor.getShowResultSet();
             if (resultSet == null) {
-                packet = executor.getOutputPacket();
+                if (executor.sendProxyQueryResult()) {
+                    packet = getResultPacket();
+                } else {
+                    packet = executor.getOutputPacket();
+                }
             } else {
                 executor.sendResultSet(resultSet);
                 packet = getResultPacket();
@@ -594,8 +598,12 @@ public abstract class ConnectProcessor {
             result.setStatusCode(ctx.getState().getErrorCode().getCode());
             result.setErrMessage(ctx.getState().getErrorMessage());
         }
-        if (executor != null && executor.getProxyResultSet() != null) {
-            result.setResultSet(executor.getProxyResultSet().tothrift());
+        if (executor != null) {
+            if (executor.getProxyShowResultSet() != null) {
+                
result.setResultSet(executor.getProxyShowResultSet().tothrift());
+            } else if (!executor.getProxyQueryResultBufList().isEmpty()) {
+                
result.setQueryResultBufList(executor.getProxyQueryResultBufList());
+            }
         }
         return result;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 7ed7061fbf5..40c126b732d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -35,6 +35,8 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 public class MasterOpExecutor {
@@ -144,7 +146,7 @@ public class MasterOpExecutor {
 
     private TMasterOpRequest buildStmtForwardParams() {
         TMasterOpRequest params = new TMasterOpRequest();
-        //node ident
+        // node ident
         params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
         params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
         params.setSql(originStmt.originStmt);
@@ -170,7 +172,7 @@ public class MasterOpExecutor {
 
     private TMasterOpRequest buildSyncJournalParmas() {
         final TMasterOpRequest params = new TMasterOpRequest();
-        //node ident
+        // node ident
         params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
         params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
         params.setSyncJournalOnly(true);
@@ -235,6 +237,10 @@ public class MasterOpExecutor {
         }
     }
 
+    public List<ByteBuffer> getQueryResultBufList() {
+        return result.isSetQueryResultBufList() ? 
result.getQueryResultBufList() : Collections.emptyList();
+    }
+
     public void setResult(TMasterOpResult result) {
         this.result = result;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c579c894b68..5c89a5c0590 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -100,6 +100,8 @@ import org.apache.doris.common.Version;
 import org.apache.doris.common.profile.Profile;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.ProfileManager.ProfileType;
@@ -115,6 +117,7 @@ import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.mysql.MysqlEofPacket;
 import org.apache.doris.mysql.MysqlSerializer;
+import org.apache.doris.mysql.ProxyMysqlChannel;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.StatementContext;
@@ -224,7 +227,7 @@ public class StmtExecutor {
     private RedirectStatus redirectStatus = null;
     private Planner planner;
     private boolean isProxy;
-    private ShowResultSet proxyResultSet = null;
+    private ShowResultSet proxyShowResultSet = null;
     private Data.PQueryStatistics.Builder statisticsForAuditLog;
     private boolean isCached;
     private String stmtName;
@@ -363,7 +366,7 @@ public class StmtExecutor {
 
         // this is a query stmt, but this non-master FE can not read, forward 
it to master
         if (isQuery() && !Env.getCurrentEnv().isMaster()
-                && !Env.getCurrentEnv().canRead()) {
+                && (!Env.getCurrentEnv().canRead() || 
debugForwardAllQueries())) {
             return true;
         }
 
@@ -374,6 +377,11 @@ public class StmtExecutor {
         }
     }
 
+    private boolean debugForwardAllQueries() {
+        DebugPoint debugPoint = 
DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries");
+        return debugPoint != null && debugPoint.param("forwardAllQueries", 
true);
+    }
+
     public ByteBuffer getOutputPacket() {
         if (masterOpExecutor == null) {
             return null;
@@ -382,8 +390,8 @@ public class StmtExecutor {
         }
     }
 
-    public ShowResultSet getProxyResultSet() {
-        return proxyResultSet;
+    public ShowResultSet getProxyShowResultSet() {
+        return proxyShowResultSet;
     }
 
     public ShowResultSet getShowResultSet() {
@@ -2358,7 +2366,7 @@ public class StmtExecutor {
             return;
         }
         if (isProxy) {
-            proxyResultSet = resultSet;
+            proxyShowResultSet = resultSet;
             return;
         }
 
@@ -2865,8 +2873,8 @@ public class StmtExecutor {
     }
 
 
-    public void setProxyResultSet(ShowResultSet proxyResultSet) {
-        this.proxyResultSet = proxyResultSet;
+    public void setProxyShowResultSet(ShowResultSet proxyShowResultSet) {
+        this.proxyShowResultSet = proxyShowResultSet;
     }
 
     public ConnectContext getContext() {
@@ -2883,6 +2891,24 @@ public class StmtExecutor {
         }
         return "";
     }
+
+    public List<ByteBuffer> getProxyQueryResultBufList() {
+        return ((ProxyMysqlChannel) 
context.getMysqlChannel()).getProxyResultBufferList();
+    }
+
+    public boolean sendProxyQueryResult() throws IOException {
+        if (masterOpExecutor == null) {
+            return false;
+        }
+        List<ByteBuffer> queryResultBufList = 
masterOpExecutor.getQueryResultBufList();
+        if (queryResultBufList.isEmpty()) {
+            return false;
+        }
+        for (ByteBuffer byteBuffer : queryResultBufList) {
+            context.getMysqlChannel().sendOnePacket(byteBuffer);
+        }
+        return true;
+    }
 }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f80ed0f54ac..16a4bfa8145 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -960,7 +960,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
         // add this log so that we can track this stmt
         LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), 
params.getClientNodeHost());
-        ConnectContext context = new ConnectContext();
+        ConnectContext context = new ConnectContext(null, true);
         // Set current connected FE to the client address, so that we can know 
where this request come from.
         context.setCurrentConnectedFEIp(params.getClientNodeHost());
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 90cb0cd5f6d..a005052cd46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -282,7 +282,7 @@ public class AnalysisManager implements Writable {
             if (!proxy) {
                 
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
             } else {
-                
ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet);
+                
ConnectContext.get().getExecutor().setProxyShowResultSet(commonResultSet);
             }
         } catch (Throwable t) {
             LOG.warn("Failed to send job id to user", t);
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index ea0300d23e5..15f424736ef 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -551,6 +551,7 @@ struct TMasterOpResult {
     5: optional string status;
     6: optional i32 statusCode;
     7: optional string errMessage;
+    8: optional list<binary> queryResultBufList;
 }
 
 struct TUpdateExportTaskStatusRequest {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index b5dde914199..84a84fa7a54 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -221,15 +221,19 @@ class Suite implements GroovyInterceptable {
 
             def user = context.config.jdbcUser
             def password = context.config.jdbcPassword
-            def masterFe = cluster.getMasterFe()
-            for (def i=0; masterFe == null && i<30; i++) {
-                masterFe = cluster.getMasterFe()
+            Frontend fe = null
+            for (def i=0; fe == null && i<30; i++) {
+                if (options.connectToFollower) {
+                    fe = cluster.getOneFollowerFe()
+                } else {
+                    fe = cluster.getMasterFe()
+                }
                 Thread.sleep(1000)
             }
-            assertNotNull(masterFe)
+            assertNotNull(fe)
             def url = String.format(
                     
"jdbc:mysql://%s:%s/?useLocalSessionState=false&allowLoadLocalInfile=false",
-                    masterFe.host, masterFe.queryPort)
+                    fe.host, fe.queryPort)
             def conn = DriverManager.getConnection(url, user, password)
             def sql = "CREATE DATABASE IF NOT EXISTS " + context.dbName
             logger.info("try create database if not exists {}", context.dbName)
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 47b642af485..6d4ae4be0ad 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -36,6 +36,7 @@ class ClusterOptions {
     int beNum = 3
     List<String> feConfigs = ['heartbeat_interval_second=5']
     List<String> beConfigs = []
+    boolean connectToFollower = false
 
     // each be disks, a disks format is: disk_type=disk_num[,disk_capacity]
     // here disk_type=HDD or SSD,  disk capacity is in gb unit.
@@ -242,6 +243,10 @@ class SuiteCluster {
         return getFrontends().stream().filter(fe -> 
fe.isMaster).findFirst().orElse(null)
     }
 
+    Frontend getOneFollowerFe() {
+        return getFrontends().stream().filter(fe -> 
!fe.isMaster).findFirst().orElse(null)
+    }
+
     Frontend getFeByIndex(int index) {
         return getFrontends().stream().filter(fe -> fe.index == 
index).findFirst().orElse(null)
     }
diff --git a/regression-test/suites/query_p0/test_forward_qeury.groovy 
b/regression-test/suites/query_p0/test_forward_qeury.groovy
new file mode 100644
index 00000000000..8dbef459d2d
--- /dev/null
+++ b/regression-test/suites/query_p0/test_forward_qeury.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite("test_forward_query") {
+    def options = new ClusterOptions()
+    options.enableDebugPoints()
+    options.setFeNum(2)
+    options.connectToFollower = true
+
+    docker(options) {
+        def tbl = "test_forward_query"
+        sql """ DROP TABLE IF EXISTS ${tbl} """
+        sql """
+            CREATE TABLE ${tbl}
+            (
+                k1 int
+            )
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+            PROPERTIES (
+               "replication_num" = "1"
+            );
+        """
+
+        sql """ INSERT INTO ${tbl} VALUES(1);"""
+
+        cluster.injectDebugPoints(NodeType.FE, 
['StmtExecutor.forward_all_queries' : [forwardAllQueries:true]])
+
+        try {
+            sql """ SELECT * FROM ${tbl} """
+        } catch (Exception ignored) {
+            assertTrue(false)
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to