This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f26c8c2f06b [enhancement](stmt-forward) record query result for proxy
query to avoid EOF (#30536)
f26c8c2f06b is described below
commit f26c8c2f06be5cb6afe4c04a22255a9aeb7a6d8b
Author: Siyang Tang <[email protected]>
AuthorDate: Thu Feb 8 12:01:07 2024 +0800
[enhancement](stmt-forward) record query result for proxy query to avoid
EOF (#30536)
---
.../org/apache/doris/mysql/ProxyMysqlChannel.java | 40 +++++++++++++++++
.../java/org/apache/doris/qe/ConnectContext.java | 9 +++-
.../java/org/apache/doris/qe/ConnectProcessor.java | 14 ++++--
.../java/org/apache/doris/qe/MasterOpExecutor.java | 10 ++++-
.../java/org/apache/doris/qe/StmtExecutor.java | 40 ++++++++++++++---
.../apache/doris/service/FrontendServiceImpl.java | 2 +-
.../apache/doris/statistics/AnalysisManager.java | 2 +-
gensrc/thrift/FrontendService.thrift | 1 +
.../org/apache/doris/regression/suite/Suite.groovy | 14 +++---
.../doris/regression/suite/SuiteCluster.groovy | 5 +++
.../suites/query_p0/test_forward_qeury.groovy | 52 ++++++++++++++++++++++
11 files changed, 169 insertions(+), 20 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java
new file mode 100644
index 00000000000..ed491df4b87
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mysql;
+
+import com.google.common.collect.Lists;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * An interceptor for proxy query, keeping all packets to be sent to master
mysql channel.
+ */
+public class ProxyMysqlChannel extends MysqlChannel {
+
+ private final List<ByteBuffer> proxyResultBuffer = Lists.newArrayList();
+
+ @Override
+ public void sendOnePacket(ByteBuffer packet) {
+ proxyResultBuffer.add(packet);
+ }
+
+ public List<ByteBuffer> getProxyResultBufferList() {
+ return proxyResultBuffer;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 15e25cfdf3a..6e32a896895 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -47,6 +47,7 @@ import org.apache.doris.mysql.MysqlCapability;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlSslContext;
+import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
@@ -336,14 +337,20 @@ public class ConnectContext {
}
public ConnectContext() {
- this((StreamConnection) null);
+ this(null);
}
public ConnectContext(StreamConnection connection) {
+ this(connection, false);
+ }
+
+ public ConnectContext(StreamConnection connection, boolean isProxy) {
connectType = ConnectType.MYSQL;
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
if (connection != null) {
mysqlChannel = new MysqlChannel(connection, this);
+ } else if (isProxy) {
+ mysqlChannel = new ProxyMysqlChannel();
} else {
mysqlChannel = new DummyMysqlChannel();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 442b53669e0..2df6033e1fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -445,7 +445,11 @@ public abstract class ConnectProcessor {
&& ctx.getState().getStateType() !=
QueryState.MysqlStateType.ERR) {
ShowResultSet resultSet = executor.getShowResultSet();
if (resultSet == null) {
- packet = executor.getOutputPacket();
+ if (executor.sendProxyQueryResult()) {
+ packet = getResultPacket();
+ } else {
+ packet = executor.getOutputPacket();
+ }
} else {
executor.sendResultSet(resultSet);
packet = getResultPacket();
@@ -594,8 +598,12 @@ public abstract class ConnectProcessor {
result.setStatusCode(ctx.getState().getErrorCode().getCode());
result.setErrMessage(ctx.getState().getErrorMessage());
}
- if (executor != null && executor.getProxyResultSet() != null) {
- result.setResultSet(executor.getProxyResultSet().tothrift());
+ if (executor != null) {
+ if (executor.getProxyShowResultSet() != null) {
+
result.setResultSet(executor.getProxyShowResultSet().tothrift());
+ } else if (!executor.getProxyQueryResultBufList().isEmpty()) {
+
result.setQueryResultBufList(executor.getProxyQueryResultBufList());
+ }
}
return result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 7ed7061fbf5..40c126b732d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -35,6 +35,8 @@ import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
public class MasterOpExecutor {
@@ -144,7 +146,7 @@ public class MasterOpExecutor {
private TMasterOpRequest buildStmtForwardParams() {
TMasterOpRequest params = new TMasterOpRequest();
- //node ident
+ // node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setSql(originStmt.originStmt);
@@ -170,7 +172,7 @@ public class MasterOpExecutor {
private TMasterOpRequest buildSyncJournalParmas() {
final TMasterOpRequest params = new TMasterOpRequest();
- //node ident
+ // node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setSyncJournalOnly(true);
@@ -235,6 +237,10 @@ public class MasterOpExecutor {
}
}
+ public List<ByteBuffer> getQueryResultBufList() {
+ return result.isSetQueryResultBufList() ?
result.getQueryResultBufList() : Collections.emptyList();
+ }
+
public void setResult(TMasterOpResult result) {
this.result = result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c579c894b68..5c89a5c0590 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -100,6 +100,8 @@ import org.apache.doris.common.Version;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.ProfileManager.ProfileType;
@@ -115,6 +117,7 @@ import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlEofPacket;
import org.apache.doris.mysql.MysqlSerializer;
+import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
@@ -224,7 +227,7 @@ public class StmtExecutor {
private RedirectStatus redirectStatus = null;
private Planner planner;
private boolean isProxy;
- private ShowResultSet proxyResultSet = null;
+ private ShowResultSet proxyShowResultSet = null;
private Data.PQueryStatistics.Builder statisticsForAuditLog;
private boolean isCached;
private String stmtName;
@@ -363,7 +366,7 @@ public class StmtExecutor {
// this is a query stmt, but this non-master FE can not read, forward
it to master
if (isQuery() && !Env.getCurrentEnv().isMaster()
- && !Env.getCurrentEnv().canRead()) {
+ && (!Env.getCurrentEnv().canRead() ||
debugForwardAllQueries())) {
return true;
}
@@ -374,6 +377,11 @@ public class StmtExecutor {
}
}
+ private boolean debugForwardAllQueries() {
+ DebugPoint debugPoint =
DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries");
+ return debugPoint != null && debugPoint.param("forwardAllQueries",
true);
+ }
+
public ByteBuffer getOutputPacket() {
if (masterOpExecutor == null) {
return null;
@@ -382,8 +390,8 @@ public class StmtExecutor {
}
}
- public ShowResultSet getProxyResultSet() {
- return proxyResultSet;
+ public ShowResultSet getProxyShowResultSet() {
+ return proxyShowResultSet;
}
public ShowResultSet getShowResultSet() {
@@ -2358,7 +2366,7 @@ public class StmtExecutor {
return;
}
if (isProxy) {
- proxyResultSet = resultSet;
+ proxyShowResultSet = resultSet;
return;
}
@@ -2865,8 +2873,8 @@ public class StmtExecutor {
}
- public void setProxyResultSet(ShowResultSet proxyResultSet) {
- this.proxyResultSet = proxyResultSet;
+ public void setProxyShowResultSet(ShowResultSet proxyShowResultSet) {
+ this.proxyShowResultSet = proxyShowResultSet;
}
public ConnectContext getContext() {
@@ -2883,6 +2891,24 @@ public class StmtExecutor {
}
return "";
}
+
+ public List<ByteBuffer> getProxyQueryResultBufList() {
+ return ((ProxyMysqlChannel)
context.getMysqlChannel()).getProxyResultBufferList();
+ }
+
+ public boolean sendProxyQueryResult() throws IOException {
+ if (masterOpExecutor == null) {
+ return false;
+ }
+ List<ByteBuffer> queryResultBufList =
masterOpExecutor.getQueryResultBufList();
+ if (queryResultBufList.isEmpty()) {
+ return false;
+ }
+ for (ByteBuffer byteBuffer : queryResultBufList) {
+ context.getMysqlChannel().sendOnePacket(byteBuffer);
+ }
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f80ed0f54ac..16a4bfa8145 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -960,7 +960,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// add this log so that we can track this stmt
LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(),
params.getClientNodeHost());
- ConnectContext context = new ConnectContext();
+ ConnectContext context = new ConnectContext(null, true);
// Set current connected FE to the client address, so that we can know
where this request come from.
context.setCurrentConnectedFEIp(params.getClientNodeHost());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 90cb0cd5f6d..a005052cd46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -282,7 +282,7 @@ public class AnalysisManager implements Writable {
if (!proxy) {
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
} else {
-
ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet);
+
ConnectContext.get().getExecutor().setProxyShowResultSet(commonResultSet);
}
} catch (Throwable t) {
LOG.warn("Failed to send job id to user", t);
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index ea0300d23e5..15f424736ef 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -551,6 +551,7 @@ struct TMasterOpResult {
5: optional string status;
6: optional i32 statusCode;
7: optional string errMessage;
+ 8: optional list<binary> queryResultBufList;
}
struct TUpdateExportTaskStatusRequest {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index b5dde914199..84a84fa7a54 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -221,15 +221,19 @@ class Suite implements GroovyInterceptable {
def user = context.config.jdbcUser
def password = context.config.jdbcPassword
- def masterFe = cluster.getMasterFe()
- for (def i=0; masterFe == null && i<30; i++) {
- masterFe = cluster.getMasterFe()
+ Frontend fe = null
+ for (def i=0; fe == null && i<30; i++) {
+ if (options.connectToFollower) {
+ fe = cluster.getOneFollowerFe()
+ } else {
+ fe = cluster.getMasterFe()
+ }
Thread.sleep(1000)
}
- assertNotNull(masterFe)
+ assertNotNull(fe)
def url = String.format(
"jdbc:mysql://%s:%s/?useLocalSessionState=false&allowLoadLocalInfile=false",
- masterFe.host, masterFe.queryPort)
+ fe.host, fe.queryPort)
def conn = DriverManager.getConnection(url, user, password)
def sql = "CREATE DATABASE IF NOT EXISTS " + context.dbName
logger.info("try create database if not exists {}", context.dbName)
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 47b642af485..6d4ae4be0ad 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -36,6 +36,7 @@ class ClusterOptions {
int beNum = 3
List<String> feConfigs = ['heartbeat_interval_second=5']
List<String> beConfigs = []
+ boolean connectToFollower = false
// each be disks, a disks format is: disk_type=disk_num[,disk_capacity]
// here disk_type=HDD or SSD, disk capacity is in gb unit.
@@ -242,6 +243,10 @@ class SuiteCluster {
return getFrontends().stream().filter(fe ->
fe.isMaster).findFirst().orElse(null)
}
+ Frontend getOneFollowerFe() {
+ return getFrontends().stream().filter(fe ->
!fe.isMaster).findFirst().orElse(null)
+ }
+
Frontend getFeByIndex(int index) {
return getFrontends().stream().filter(fe -> fe.index ==
index).findFirst().orElse(null)
}
diff --git a/regression-test/suites/query_p0/test_forward_qeury.groovy
b/regression-test/suites/query_p0/test_forward_qeury.groovy
new file mode 100644
index 00000000000..8dbef459d2d
--- /dev/null
+++ b/regression-test/suites/query_p0/test_forward_qeury.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite("test_forward_query") {
+ def options = new ClusterOptions()
+ options.enableDebugPoints()
+ options.setFeNum(2)
+ options.connectToFollower = true
+
+ docker(options) {
+ def tbl = "test_forward_query"
+ sql """ DROP TABLE IF EXISTS ${tbl} """
+ sql """
+ CREATE TABLE ${tbl}
+ (
+ k1 int
+ )
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """ INSERT INTO ${tbl} VALUES(1);"""
+
+ cluster.injectDebugPoints(NodeType.FE,
['StmtExecutor.forward_all_queries' : [forwardAllQueries:true]])
+
+ try {
+ sql """ SELECT * FROM ${tbl} """
+ } catch (Exception ignored) {
+ assertTrue(false)
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]