This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 3a952cdb64d [enhancement](stmt-forward) record query result for proxy
query to avoid EOF (#30536) (#31363)
3a952cdb64d is described below
commit 3a952cdb64da8bd16e289698e0ac711004392118
Author: Siyang Tang <[email protected]>
AuthorDate: Thu Feb 29 11:30:16 2024 +0800
[enhancement](stmt-forward) record query result for proxy query to avoid
EOF (#30536) (#31363)
---
.../org/apache/doris/mysql/ProxyMysqlChannel.java | 40 ++++++++++++++++++++++
.../java/org/apache/doris/qe/ConnectContext.java | 7 ++++
.../java/org/apache/doris/qe/ConnectProcessor.java | 14 ++++++--
.../java/org/apache/doris/qe/MasterOpExecutor.java | 10 ++++--
.../java/org/apache/doris/qe/StmtExecutor.java | 40 ++++++++++++++++++----
.../apache/doris/service/FrontendServiceImpl.java | 2 +-
.../apache/doris/statistics/AnalysisManager.java | 2 +-
gensrc/thrift/FrontendService.thrift | 1 +
8 files changed, 102 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java
new file mode 100644
index 00000000000..ed491df4b87
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mysql;
+
+import com.google.common.collect.Lists;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * An interceptor for proxy query, keeping all packets to be sent to master
mysql channel.
+ */
+public class ProxyMysqlChannel extends MysqlChannel {
+
+ private final List<ByteBuffer> proxyResultBuffer = Lists.newArrayList();
+
+ @Override
+ public void sendOnePacket(ByteBuffer packet) {
+ proxyResultBuffer.add(packet);
+ }
+
+ public List<ByteBuffer> getProxyResultBufferList() {
+ return proxyResultBuffer;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 6f9527f274b..1e0994ff654 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -45,6 +45,7 @@ import org.apache.doris.mysql.MysqlCapability;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlSslContext;
+import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
@@ -285,12 +286,18 @@ public class ConnectContext {
}
public ConnectContext(StreamConnection connection) {
+ this(connection, false);
state = new QueryState();
returnRows = 0;
+ }
+
+ public ConnectContext(StreamConnection connection, boolean isProxy) {
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
isKilled = false;
if (connection != null) {
mysqlChannel = new MysqlChannel(connection, this);
+ } else if (isProxy) {
+ mysqlChannel = new ProxyMysqlChannel();
} else {
mysqlChannel = new DummyMysqlChannel();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 3db88e87f0c..adda35bb5e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -634,7 +634,11 @@ public class ConnectProcessor {
&& ctx.getState().getStateType() !=
QueryState.MysqlStateType.ERR) {
ShowResultSet resultSet = executor.getShowResultSet();
if (resultSet == null) {
- packet = executor.getOutputPacket();
+ if (executor.sendProxyQueryResult()) {
+ packet = getResultPacket();
+ } else {
+ packet = executor.getOutputPacket();
+ }
} else {
executor.sendResultSet(resultSet);
packet = getResultPacket();
@@ -809,8 +813,12 @@ public class ConnectProcessor {
result.setStatusCode(ctx.getState().getErrorCode().getCode());
result.setErrMessage(ctx.getState().getErrorMessage());
}
- if (executor != null && executor.getProxyResultSet() != null) {
- result.setResultSet(executor.getProxyResultSet().tothrift());
+ if (executor != null) {
+ if (executor.getProxyShowResultSet() != null) {
+
result.setResultSet(executor.getProxyShowResultSet().tothrift());
+ } else if (!executor.getProxyQueryResultBufList().isEmpty()) {
+
result.setQueryResultBufList(executor.getProxyQueryResultBufList());
+ }
}
return result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 805dcc77486..d8cf98e23f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -39,7 +39,9 @@ import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class MasterOpExecutor {
@@ -161,7 +163,7 @@ public class MasterOpExecutor {
private TMasterOpRequest buildStmtForwardParams() {
TMasterOpRequest params = new TMasterOpRequest();
- //node ident
+ // node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setCluster(ctx.getClusterName());
@@ -196,7 +198,7 @@ public class MasterOpExecutor {
private TMasterOpRequest buildSyncJournalParmas() {
final TMasterOpRequest params = new TMasterOpRequest();
- //node ident
+ // node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setSyncJournalOnly(true);
@@ -261,6 +263,10 @@ public class MasterOpExecutor {
}
}
+ public List<ByteBuffer> getQueryResultBufList() {
+ return result.isSetQueryResultBufList() ?
result.getQueryResultBufList() : Collections.emptyList();
+ }
+
public void setResult(TMasterOpResult result) {
this.result = result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 1a7e3524a91..14b77559064 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -97,6 +97,8 @@ import org.apache.doris.common.Version;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.ProfileManager.ProfileType;
@@ -112,6 +114,7 @@ import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlEofPacket;
import org.apache.doris.mysql.MysqlSerializer;
+import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
@@ -214,7 +217,7 @@ public class StmtExecutor {
private RedirectStatus redirectStatus = null;
private Planner planner;
private boolean isProxy;
- private ShowResultSet proxyResultSet = null;
+ private ShowResultSet proxyShowResultSet = null;
private Data.PQueryStatistics.Builder statisticsForAuditLog;
private boolean isCached;
private String stmtName;
@@ -348,7 +351,7 @@ public class StmtExecutor {
// this is a query stmt, but this non-master FE can not read, forward
it to master
if (isQuery() && !Env.getCurrentEnv().isMaster()
- && !Env.getCurrentEnv().canRead()) {
+ && (!Env.getCurrentEnv().canRead() ||
debugForwardAllQueries())) {
return true;
}
@@ -359,6 +362,11 @@ public class StmtExecutor {
}
}
+ private boolean debugForwardAllQueries() {
+ DebugPoint debugPoint =
DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries");
+ return debugPoint != null && debugPoint.param("forwardAllQueries",
true);
+ }
+
public ByteBuffer getOutputPacket() {
if (masterOpExecutor == null) {
return null;
@@ -367,8 +375,8 @@ public class StmtExecutor {
}
}
- public ShowResultSet getProxyResultSet() {
- return proxyResultSet;
+ public ShowResultSet getProxyShowResultSet() {
+ return proxyShowResultSet;
}
public ShowResultSet getShowResultSet() {
@@ -2147,7 +2155,7 @@ public class StmtExecutor {
return;
}
if (isProxy) {
- proxyResultSet = resultSet;
+ proxyShowResultSet = resultSet;
return;
}
@@ -2624,8 +2632,8 @@ public class StmtExecutor {
}
- public void setProxyResultSet(ShowResultSet proxyResultSet) {
- this.proxyResultSet = proxyResultSet;
+ public void setProxyShowResultSet(ShowResultSet proxyShowResultSet) {
+ this.proxyShowResultSet = proxyShowResultSet;
}
public ConnectContext getContext() {
@@ -2642,6 +2650,24 @@ public class StmtExecutor {
}
return "";
}
+
+ public List<ByteBuffer> getProxyQueryResultBufList() {
+ return ((ProxyMysqlChannel)
context.getMysqlChannel()).getProxyResultBufferList();
+ }
+
+ public boolean sendProxyQueryResult() throws IOException {
+ if (masterOpExecutor == null) {
+ return false;
+ }
+ List<ByteBuffer> queryResultBufList =
masterOpExecutor.getQueryResultBufList();
+ if (queryResultBufList.isEmpty()) {
+ return false;
+ }
+ for (ByteBuffer byteBuffer : queryResultBufList) {
+ context.getMysqlChannel().sendOnePacket(byteBuffer);
+ }
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d1cad65d49d..2e855ed79eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1089,7 +1089,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// add this log so that we can track this stmt
LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(),
params.getClientNodeHost());
- ConnectContext context = new ConnectContext();
+ ConnectContext context = new ConnectContext(null, true);
// Set current connected FE to the client address, so that we can know
where this request come from.
context.setCurrentConnectedFEIp(params.getClientNodeHost());
ConnectProcessor processor = new ConnectProcessor(context);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 0e179761c35..d7cb5e6613f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -285,7 +285,7 @@ public class AnalysisManager implements Writable {
if (!proxy) {
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
} else {
-
ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet);
+
ConnectContext.get().getExecutor().setProxyShowResultSet(commonResultSet);
}
} catch (Throwable t) {
LOG.warn("Failed to send job id to user", t);
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index fa594221b49..b4649267a9d 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -536,6 +536,7 @@ struct TMasterOpResult {
5: optional string status;
6: optional i32 statusCode;
7: optional string errMessage;
+ 8: optional list<binary> queryResultBufList;
}
struct TUpdateExportTaskStatusRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]