This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 3a952cdb64d [enhancement](stmt-forward) record query result for proxy 
query to avoid EOF (#30536) (#31363)
3a952cdb64d is described below

commit 3a952cdb64da8bd16e289698e0ac711004392118
Author: Siyang Tang <[email protected]>
AuthorDate: Thu Feb 29 11:30:16 2024 +0800

    [enhancement](stmt-forward) record query result for proxy query to avoid 
EOF (#30536) (#31363)
---
 .../org/apache/doris/mysql/ProxyMysqlChannel.java  | 40 ++++++++++++++++++++++
 .../java/org/apache/doris/qe/ConnectContext.java   |  7 ++++
 .../java/org/apache/doris/qe/ConnectProcessor.java | 14 ++++++--
 .../java/org/apache/doris/qe/MasterOpExecutor.java | 10 ++++--
 .../java/org/apache/doris/qe/StmtExecutor.java     | 40 ++++++++++++++++++----
 .../apache/doris/service/FrontendServiceImpl.java  |  2 +-
 .../apache/doris/statistics/AnalysisManager.java   |  2 +-
 gensrc/thrift/FrontendService.thrift               |  1 +
 8 files changed, 102 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java
new file mode 100644
index 00000000000..ed491df4b87
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mysql;
+
+import com.google.common.collect.Lists;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * An interceptor for proxy query, keeping all packets to be sent to master 
mysql channel.
+ */
+public class ProxyMysqlChannel extends MysqlChannel {
+
+    private final List<ByteBuffer> proxyResultBuffer = Lists.newArrayList();
+
+    @Override
+    public void sendOnePacket(ByteBuffer packet) {
+        proxyResultBuffer.add(packet);
+    }
+
+    public List<ByteBuffer> getProxyResultBufferList() {
+        return proxyResultBuffer;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 6f9527f274b..1e0994ff654 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -45,6 +45,7 @@ import org.apache.doris.mysql.MysqlCapability;
 import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.mysql.MysqlSslContext;
+import org.apache.doris.mysql.ProxyMysqlChannel;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
@@ -285,12 +286,18 @@ public class ConnectContext {
     }
 
     public ConnectContext(StreamConnection connection) {
+        this(connection, false);
         state = new QueryState();
         returnRows = 0;
+    }
+
+    public ConnectContext(StreamConnection connection, boolean isProxy) {
         serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
         isKilled = false;
         if (connection != null) {
             mysqlChannel = new MysqlChannel(connection, this);
+        } else if (isProxy) {
+            mysqlChannel = new ProxyMysqlChannel();
         } else {
             mysqlChannel = new DummyMysqlChannel();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 3db88e87f0c..adda35bb5e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -634,7 +634,11 @@ public class ConnectProcessor {
                 && ctx.getState().getStateType() != 
QueryState.MysqlStateType.ERR) {
             ShowResultSet resultSet = executor.getShowResultSet();
             if (resultSet == null) {
-                packet = executor.getOutputPacket();
+                if (executor.sendProxyQueryResult()) {
+                    packet = getResultPacket();
+                } else {
+                    packet = executor.getOutputPacket();
+                }
             } else {
                 executor.sendResultSet(resultSet);
                 packet = getResultPacket();
@@ -809,8 +813,12 @@ public class ConnectProcessor {
             result.setStatusCode(ctx.getState().getErrorCode().getCode());
             result.setErrMessage(ctx.getState().getErrorMessage());
         }
-        if (executor != null && executor.getProxyResultSet() != null) {
-            result.setResultSet(executor.getProxyResultSet().tothrift());
+        if (executor != null) {
+            if (executor.getProxyShowResultSet() != null) {
+                
result.setResultSet(executor.getProxyShowResultSet().tothrift());
+            } else if (!executor.getProxyQueryResultBufList().isEmpty()) {
+                
result.setQueryResultBufList(executor.getProxyQueryResultBufList());
+            }
         }
         return result;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 805dcc77486..d8cf98e23f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -39,7 +39,9 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class MasterOpExecutor {
@@ -161,7 +163,7 @@ public class MasterOpExecutor {
 
     private TMasterOpRequest buildStmtForwardParams() {
         TMasterOpRequest params = new TMasterOpRequest();
-        //node ident
+        // node ident
         params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
         params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
         params.setCluster(ctx.getClusterName());
@@ -196,7 +198,7 @@ public class MasterOpExecutor {
 
     private TMasterOpRequest buildSyncJournalParmas() {
         final TMasterOpRequest params = new TMasterOpRequest();
-        //node ident
+        // node ident
         params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
         params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
         params.setSyncJournalOnly(true);
@@ -261,6 +263,10 @@ public class MasterOpExecutor {
         }
     }
 
+    public List<ByteBuffer> getQueryResultBufList() {
+        return result.isSetQueryResultBufList() ? 
result.getQueryResultBufList() : Collections.emptyList();
+    }
+
     public void setResult(TMasterOpResult result) {
         this.result = result;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 1a7e3524a91..14b77559064 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -97,6 +97,8 @@ import org.apache.doris.common.Version;
 import org.apache.doris.common.profile.Profile;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.ProfileManager.ProfileType;
@@ -112,6 +114,7 @@ import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.mysql.MysqlEofPacket;
 import org.apache.doris.mysql.MysqlSerializer;
+import org.apache.doris.mysql.ProxyMysqlChannel;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.StatementContext;
@@ -214,7 +217,7 @@ public class StmtExecutor {
     private RedirectStatus redirectStatus = null;
     private Planner planner;
     private boolean isProxy;
-    private ShowResultSet proxyResultSet = null;
+    private ShowResultSet proxyShowResultSet = null;
     private Data.PQueryStatistics.Builder statisticsForAuditLog;
     private boolean isCached;
     private String stmtName;
@@ -348,7 +351,7 @@ public class StmtExecutor {
 
         // this is a query stmt, but this non-master FE can not read, forward 
it to master
         if (isQuery() && !Env.getCurrentEnv().isMaster()
-                && !Env.getCurrentEnv().canRead()) {
+                && (!Env.getCurrentEnv().canRead() || 
debugForwardAllQueries())) {
             return true;
         }
 
@@ -359,6 +362,11 @@ public class StmtExecutor {
         }
     }
 
+    private boolean debugForwardAllQueries() {
+        DebugPoint debugPoint = 
DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries");
+        return debugPoint != null && debugPoint.param("forwardAllQueries", 
true);
+    }
+
     public ByteBuffer getOutputPacket() {
         if (masterOpExecutor == null) {
             return null;
@@ -367,8 +375,8 @@ public class StmtExecutor {
         }
     }
 
-    public ShowResultSet getProxyResultSet() {
-        return proxyResultSet;
+    public ShowResultSet getProxyShowResultSet() {
+        return proxyShowResultSet;
     }
 
     public ShowResultSet getShowResultSet() {
@@ -2147,7 +2155,7 @@ public class StmtExecutor {
             return;
         }
         if (isProxy) {
-            proxyResultSet = resultSet;
+            proxyShowResultSet = resultSet;
             return;
         }
 
@@ -2624,8 +2632,8 @@ public class StmtExecutor {
     }
 
 
-    public void setProxyResultSet(ShowResultSet proxyResultSet) {
-        this.proxyResultSet = proxyResultSet;
+    public void setProxyShowResultSet(ShowResultSet proxyShowResultSet) {
+        this.proxyShowResultSet = proxyShowResultSet;
     }
 
     public ConnectContext getContext() {
@@ -2642,6 +2650,24 @@ public class StmtExecutor {
         }
         return "";
     }
+
+    public List<ByteBuffer> getProxyQueryResultBufList() {
+        return ((ProxyMysqlChannel) 
context.getMysqlChannel()).getProxyResultBufferList();
+    }
+
+    public boolean sendProxyQueryResult() throws IOException {
+        if (masterOpExecutor == null) {
+            return false;
+        }
+        List<ByteBuffer> queryResultBufList = 
masterOpExecutor.getQueryResultBufList();
+        if (queryResultBufList.isEmpty()) {
+            return false;
+        }
+        for (ByteBuffer byteBuffer : queryResultBufList) {
+            context.getMysqlChannel().sendOnePacket(byteBuffer);
+        }
+        return true;
+    }
 }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d1cad65d49d..2e855ed79eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1089,7 +1089,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
         // add this log so that we can track this stmt
         LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), 
params.getClientNodeHost());
-        ConnectContext context = new ConnectContext();
+        ConnectContext context = new ConnectContext(null, true);
         // Set current connected FE to the client address, so that we can know 
where this request come from.
         context.setCurrentConnectedFEIp(params.getClientNodeHost());
         ConnectProcessor processor = new ConnectProcessor(context);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 0e179761c35..d7cb5e6613f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -285,7 +285,7 @@ public class AnalysisManager implements Writable {
             if (!proxy) {
                 
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
             } else {
-                
ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet);
+                
ConnectContext.get().getExecutor().setProxyShowResultSet(commonResultSet);
             }
         } catch (Throwable t) {
             LOG.warn("Failed to send job id to user", t);
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index fa594221b49..b4649267a9d 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -536,6 +536,7 @@ struct TMasterOpResult {
     5: optional string status;
     6: optional i32 statusCode;
     7: optional string errMessage;
+    8: optional list<binary> queryResultBufList;
 }
 
 struct TUpdateExportTaskStatusRequest {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to