This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 23041730f0bdb8b151b35bbfd4adb800aed316b4 Author: Siyang Tang <[email protected]> AuthorDate: Thu Feb 8 12:01:07 2024 +0800 [enhancement](stmt-forward) record query result for proxy query to avoid EOF (#30536) --- .../org/apache/doris/mysql/ProxyMysqlChannel.java | 40 +++++++++++++++++ .../java/org/apache/doris/qe/ConnectContext.java | 9 +++- .../java/org/apache/doris/qe/ConnectProcessor.java | 14 ++++-- .../java/org/apache/doris/qe/MasterOpExecutor.java | 10 ++++- .../java/org/apache/doris/qe/StmtExecutor.java | 40 ++++++++++++++--- .../apache/doris/service/FrontendServiceImpl.java | 2 +- .../apache/doris/statistics/AnalysisManager.java | 2 +- gensrc/thrift/FrontendService.thrift | 1 + .../org/apache/doris/regression/suite/Suite.groovy | 14 +++--- .../doris/regression/suite/SuiteCluster.groovy | 5 +++ .../suites/query_p0/test_forward_qeury.groovy | 52 ++++++++++++++++++++++ 11 files changed, 169 insertions(+), 20 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java new file mode 100644 index 00000000000..ed491df4b87 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mysql; + +import com.google.common.collect.Lists; + +import java.nio.ByteBuffer; +import java.util.List; + +/** + * An interceptor for proxy query, keeping all packets to be sent to master mysql channel. + */ +public class ProxyMysqlChannel extends MysqlChannel { + + private final List<ByteBuffer> proxyResultBuffer = Lists.newArrayList(); + + @Override + public void sendOnePacket(ByteBuffer packet) { + proxyResultBuffer.add(packet); + } + + public List<ByteBuffer> getProxyResultBufferList() { + return proxyResultBuffer; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 3813f29062b..5dc43c3a635 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -45,6 +45,7 @@ import org.apache.doris.mysql.MysqlCapability; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.MysqlSslContext; +import org.apache.doris.mysql.ProxyMysqlChannel; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.expressions.literal.Literal; @@ -326,14 +327,20 @@ public class ConnectContext { } public ConnectContext() { - this((StreamConnection) null); + this(null); } public ConnectContext(StreamConnection connection) { + this(connection, false); + } + + public ConnectContext(StreamConnection connection, boolean isProxy) { connectType = ConnectType.MYSQL; serverCapability = MysqlCapability.DEFAULT_CAPABILITY; if (connection != null) { mysqlChannel = new MysqlChannel(connection, this); + } else if (isProxy) { + mysqlChannel = new ProxyMysqlChannel(); } else { mysqlChannel = new DummyMysqlChannel(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 442b53669e0..2df6033e1fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -445,7 +445,11 @@ public abstract class ConnectProcessor { && ctx.getState().getStateType() != QueryState.MysqlStateType.ERR) { ShowResultSet resultSet = executor.getShowResultSet(); if (resultSet == null) { - packet = executor.getOutputPacket(); + if (executor.sendProxyQueryResult()) { + packet = getResultPacket(); + } else { + packet = executor.getOutputPacket(); + } } else { executor.sendResultSet(resultSet); packet = getResultPacket(); @@ -594,8 +598,12 @@ public abstract class ConnectProcessor { result.setStatusCode(ctx.getState().getErrorCode().getCode()); result.setErrMessage(ctx.getState().getErrorMessage()); } - if (executor != null && executor.getProxyResultSet() != null) { - result.setResultSet(executor.getProxyResultSet().tothrift()); + if (executor != null) { + if (executor.getProxyShowResultSet() != null) { + result.setResultSet(executor.getProxyShowResultSet().tothrift()); + } else if (!executor.getProxyQueryResultBufList().isEmpty()) { + result.setQueryResultBufList(executor.getProxyQueryResultBufList()); + } } return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 7ed7061fbf5..40c126b732d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -35,6 +35,8 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; import java.util.Map; public class MasterOpExecutor { @@ -144,7 +146,7 @@ public class MasterOpExecutor { private TMasterOpRequest buildStmtForwardParams() { TMasterOpRequest params = new TMasterOpRequest(); - //node ident + // node ident params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); params.setSql(originStmt.originStmt); @@ -170,7 +172,7 @@ public class MasterOpExecutor { private TMasterOpRequest buildSyncJournalParmas() { final TMasterOpRequest params = new TMasterOpRequest(); - //node ident + // node ident params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); params.setSyncJournalOnly(true); @@ -235,6 +237,10 @@ public class MasterOpExecutor { } } + public List<ByteBuffer> getQueryResultBufList() { + return result.isSetQueryResultBufList() ? result.getQueryResultBufList() : Collections.emptyList(); + } + public void setResult(TMasterOpResult result) { this.result = result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c579c894b68..5c89a5c0590 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -100,6 +100,8 @@ import org.apache.doris.common.Version; import org.apache.doris.common.profile.Profile; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder; +import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.ProfileManager.ProfileType; @@ -115,6 +117,7 @@ import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.MysqlEofPacket; import org.apache.doris.mysql.MysqlSerializer; +import org.apache.doris.mysql.ProxyMysqlChannel; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; @@ -224,7 +227,7 @@ public class StmtExecutor { private RedirectStatus redirectStatus = null; private Planner planner; private boolean isProxy; - private ShowResultSet proxyResultSet = null; + private ShowResultSet proxyShowResultSet = null; private Data.PQueryStatistics.Builder statisticsForAuditLog; private boolean isCached; private String stmtName; @@ -363,7 +366,7 @@ public class StmtExecutor { // this is a query stmt, but this non-master FE can not read, forward it to master if (isQuery() && !Env.getCurrentEnv().isMaster() - && !Env.getCurrentEnv().canRead()) { + && (!Env.getCurrentEnv().canRead() || debugForwardAllQueries())) { return true; } @@ -374,6 +377,11 @@ public class StmtExecutor { } } + private boolean debugForwardAllQueries() { + DebugPoint debugPoint = DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries"); + return debugPoint != null && debugPoint.param("forwardAllQueries", true); + } + public ByteBuffer getOutputPacket() { if (masterOpExecutor == null) { return null; @@ -382,8 +390,8 @@ public class StmtExecutor { } } - public ShowResultSet getProxyResultSet() { - return proxyResultSet; + public ShowResultSet getProxyShowResultSet() { + return proxyShowResultSet; } public ShowResultSet getShowResultSet() { @@ -2358,7 +2366,7 @@ public class StmtExecutor { return; } if (isProxy) { - proxyResultSet = resultSet; + proxyShowResultSet = resultSet; return; } @@ -2865,8 +2873,8 @@ public class StmtExecutor { } - public void setProxyResultSet(ShowResultSet proxyResultSet) { - this.proxyResultSet = proxyResultSet; + public void setProxyShowResultSet(ShowResultSet proxyShowResultSet) { + this.proxyShowResultSet = proxyShowResultSet; } public ConnectContext getContext() { @@ -2883,6 +2891,24 @@ public class StmtExecutor { } return ""; } + + public List<ByteBuffer> getProxyQueryResultBufList() { + return ((ProxyMysqlChannel) context.getMysqlChannel()).getProxyResultBufferList(); + } + + public boolean sendProxyQueryResult() throws IOException { + if (masterOpExecutor == null) { + return false; + } + List<ByteBuffer> queryResultBufList = masterOpExecutor.getQueryResultBufList(); + if (queryResultBufList.isEmpty()) { + return false; + } + for (ByteBuffer byteBuffer : queryResultBufList) { + context.getMysqlChannel().sendOnePacket(byteBuffer); + } + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index a90c9357c1c..9f6ee5baf68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -959,7 +959,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // add this log so that we can track this stmt LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost()); - ConnectContext context = new ConnectContext(); + ConnectContext context = new ConnectContext(null, true); // Set current connected FE to the client address, so that we can know where this request come from. context.setCurrentConnectedFEIp(params.getClientNodeHost()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 90cb0cd5f6d..a005052cd46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -282,7 +282,7 @@ public class AnalysisManager implements Writable { if (!proxy) { ConnectContext.get().getExecutor().sendResultSet(commonResultSet); } else { - ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet); + ConnectContext.get().getExecutor().setProxyShowResultSet(commonResultSet); } } catch (Throwable t) { LOG.warn("Failed to send job id to user", t); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 40e21d37cb2..93b229a11d7 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -543,6 +543,7 @@ struct TMasterOpResult { 5: optional string status; 6: optional i32 statusCode; 7: optional string errMessage; + 8: optional list<binary> queryResultBufList; } struct TUpdateExportTaskStatusRequest { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index b5dde914199..84a84fa7a54 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -221,15 +221,19 @@ class Suite implements GroovyInterceptable { def user = context.config.jdbcUser def password = context.config.jdbcPassword - def masterFe = cluster.getMasterFe() - for (def i=0; masterFe == null && i<30; i++) { - masterFe = cluster.getMasterFe() + Frontend fe = null + for (def i=0; fe == null && i<30; i++) { + if (options.connectToFollower) { + fe = cluster.getOneFollowerFe() + } else { + fe = cluster.getMasterFe() + } Thread.sleep(1000) } - assertNotNull(masterFe) + assertNotNull(fe) def url = String.format( "jdbc:mysql://%s:%s/?useLocalSessionState=false&allowLoadLocalInfile=false", - masterFe.host, masterFe.queryPort) + fe.host, fe.queryPort) def conn = DriverManager.getConnection(url, user, password) def sql = "CREATE DATABASE IF NOT EXISTS " + context.dbName logger.info("try create database if not exists {}", context.dbName) diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 47b642af485..6d4ae4be0ad 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -36,6 +36,7 @@ class ClusterOptions { int beNum = 3 List<String> feConfigs = ['heartbeat_interval_second=5'] List<String> beConfigs = [] + boolean connectToFollower = false // each be disks, a disks format is: disk_type=disk_num[,disk_capacity] // here disk_type=HDD or SSD, disk capacity is in gb unit. @@ -242,6 +243,10 @@ class SuiteCluster { return getFrontends().stream().filter(fe -> fe.isMaster).findFirst().orElse(null) } + Frontend getOneFollowerFe() { + return getFrontends().stream().filter(fe -> !fe.isMaster).findFirst().orElse(null) + } + Frontend getFeByIndex(int index) { return getFrontends().stream().filter(fe -> fe.index == index).findFirst().orElse(null) } diff --git a/regression-test/suites/query_p0/test_forward_qeury.groovy b/regression-test/suites/query_p0/test_forward_qeury.groovy new file mode 100644 index 00000000000..8dbef459d2d --- /dev/null +++ b/regression-test/suites/query_p0/test_forward_qeury.groovy @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType + +suite("test_forward_query") { + def options = new ClusterOptions() + options.enableDebugPoints() + options.setFeNum(2) + options.connectToFollower = true + + docker(options) { + def tbl = "test_forward_query" + sql """ DROP TABLE IF EXISTS ${tbl} """ + sql """ + CREATE TABLE ${tbl} + ( + k1 int + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ INSERT INTO ${tbl} VALUES(1);""" + + cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.forward_all_queries' : [forwardAllQueries:true]]) + + try { + sql """ SELECT * FROM ${tbl} """ + } catch (Exception ignored) { + assertTrue(false) + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
