This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 1144ab91150 [opt](session variable) max_msg_size_of_result_receiver
#31809 (#31867)
1144ab91150 is described below
commit 1144ab9115095b61228692f5f42977631da0b69c
Author: zhiqiang <[email protected]>
AuthorDate: Wed Mar 6 20:03:46 2024 +0800
[opt](session variable) max_msg_size_of_result_receiver #31809 (#31867)
* REFORMAT & FINISH
* FIX
---
.../main/java/org/apache/doris/qe/Coordinator.java | 9 +++-
.../java/org/apache/doris/qe/PointQueryExec.java | 19 +++++--
.../java/org/apache/doris/qe/ResultReceiver.java | 22 ++++++--
.../java/org/apache/doris/qe/SessionVariable.java | 14 +++++
.../java/org/apache/doris/qe/StmtExecutor.java | 3 +-
.../apache/doris/rpc/TCustomProtocolFactory.java | 41 +++++++++++++++
.../max_msg_size_of_result_receiver.groovy | 61 ++++++++++++++++++++++
7 files changed, 160 insertions(+), 9 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 96ed3649007..294bcaab444 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -124,6 +124,7 @@ import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.jetbrains.annotations.NotNull;
@@ -243,6 +244,8 @@ public class Coordinator implements CoordInterface {
private boolean fasterFloatConvert = false;
+ private int maxMsgSizeOfResultReceiver =
TConfiguration.DEFAULT_MAX_MESSAGE_SIZE;
+
// Runtime filter merge instance address and ID
public TNetworkAddress runtimeFilterMergeAddr;
public TUniqueId runtimeFilterMergeInstanceId;
@@ -315,7 +318,7 @@ public class Coordinator implements CoordInterface {
nextInstanceId.setLo(queryId.lo + 1);
this.assignedRuntimeFilters = planner.getRuntimeFilters();
this.executionProfile = new ExecutionProfile(queryId,
fragments.size());
-
+ this.maxMsgSizeOfResultReceiver =
context.getSessionVariable().getMaxMsgSizeOfResultReceiver();
}
// Used for broker load task/export task/update coordinator
@@ -566,7 +569,9 @@ public class Coordinator implements CoordInterface {
if (topDataSink instanceof ResultSink || topDataSink instanceof
ResultFileSink) {
TNetworkAddress execBeAddr =
topParams.instanceExecParams.get(0).host;
receiver = new ResultReceiver(queryId,
topParams.instanceExecParams.get(0).instanceId,
- addressToBackendID.get(execBeAddr),
toBrpcHost(execBeAddr), this.timeoutDeadline);
+ addressToBackendID.get(execBeAddr),
toBrpcHost(execBeAddr), this.timeoutDeadline,
+ this.maxMsgSizeOfResultReceiver);
+
if (LOG.isDebugEnabled()) {
LOG.debug("dispatch query job: {} to {}",
DebugUtil.printId(queryId),
topParams.instanceExecParams.get(0).host);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index 0d18b2f08c2..f5712d3a92b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -35,6 +35,7 @@ import org.apache.doris.proto.InternalService.KeyTuple;
import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
+import org.apache.doris.rpc.TCustomProtocolFactory;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TExprList;
@@ -85,6 +86,8 @@ public class PointQueryExec implements CoordInterface {
// using this ID to find for this prepared statement
private UUID cacheID;
+ private final int maxMsgSizeOfResultReceiver;
+
private OlapScanNode getPlanRoot() {
List<PlanFragment> fragments = planner.getFragments();
PlanFragment fragment = fragments.get(0);
@@ -94,7 +97,7 @@ public class PointQueryExec implements CoordInterface {
return planRoot;
}
- public PointQueryExec(Planner planner, Analyzer analyzer) {
+ public PointQueryExec(Planner planner, Analyzer analyzer, int
maxMessageSize) {
// init from planner
this.planner = planner;
List<PlanFragment> fragments = planner.getFragments();
@@ -115,6 +118,7 @@ public class PointQueryExec implements CoordInterface {
// TODO
// planner.getDescTable().toThrift();
}
+ this.maxMsgSizeOfResultReceiver = maxMessageSize;
}
void setScanRangeLocations() throws Exception {
@@ -306,8 +310,17 @@ public class PointQueryExec implements CoordInterface {
} else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
byte[] serialResult = pResult.getRowBatch().toByteArray();
TResultBatch resultBatch = new TResultBatch();
- TDeserializer deserializer = new TDeserializer();
- deserializer.deserialize(resultBatch, serialResult);
+ TDeserializer deserializer = new TDeserializer(
+ new
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
+ try {
+ deserializer.deserialize(resultBatch, serialResult);
+ } catch (TException e) {
+ if (e.getMessage().contains("MaxMessageSize reached")) {
+ throw new TException("MaxMessageSize reached, try increase
max_msg_size_of_result_receiver");
+ } else {
+ throw e;
+ }
+ }
rowBatch.setBatch(resultBatch);
rowBatch.setEos(true);
return rowBatch;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index a9e9740963f..c473d74b919 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -23,6 +23,7 @@ import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
+import org.apache.doris.rpc.TCustomProtocolFactory;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TStatusCode;
@@ -53,12 +54,16 @@ public class ResultReceiver {
private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture =
null;
public String cancelReason = "";
- public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId,
TNetworkAddress address, long timeoutTs) {
+ int maxMsgSizeOfResultReceiver;
+
+ public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId,
TNetworkAddress address, long timeoutTs,
+ int maxMsgSizeOfResultReceiver) {
this.queryId =
Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
this.finstId =
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
this.backendId = backendId;
this.address = address;
this.timeoutTs = timeoutTs;
+ this.maxMsgSizeOfResultReceiver = maxMsgSizeOfResultReceiver;
}
public RowBatch getNext(Status status) throws TException {
@@ -136,8 +141,19 @@ public class ResultReceiver {
} else if (pResult.hasRowBatch() &&
pResult.getRowBatch().size() > 0) {
byte[] serialResult = pResult.getRowBatch().toByteArray();
TResultBatch resultBatch = new TResultBatch();
- TDeserializer deserializer = new TDeserializer();
- deserializer.deserialize(resultBatch, serialResult);
+ TDeserializer deserializer = new TDeserializer(
+ new
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
+ try {
+ deserializer.deserialize(resultBatch, serialResult);
+ } catch (TException e) {
+ if (e.getMessage().contains("MaxMessageSize reached"))
{
+ throw new TException(
+ "MaxMessageSize reached, try increase
max_msg_size_of_result_receiver");
+ } else {
+ throw e;
+ }
+ }
+
rowBatch.setBatch(resultBatch);
rowBatch.setEos(pResult.getEos());
return rowBatch;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 40cf53db0d7..12083f17c13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -40,6 +40,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TConfiguration;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -447,6 +448,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String SHOW_ALL_FE_CONNECTION =
"show_all_fe_connection";
+ public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER =
"max_msg_size_of_result_receiver";
+
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@@ -1375,6 +1378,12 @@ public class SessionVariable implements Serializable,
Writable {
"the plan node type which is ignored in 'explain shape
plan' command"})
public String ignoreShapePlanNodes = "";
+ @VariableMgr.VarAttr(name = MAX_MSG_SIZE_OF_RESULT_RECEIVER,
+ description = {"Max message size during result deserialization,
change this if you meet error"
+ + " like \"MaxMessageSize reached\"",
+ "用于控制结果反序列化时 thrift 字段的最大值,当遇到类似\"MaxMessageSize
reached\"这样的错误时可以考虑修改该参数"})
+ public int maxMsgSizeOfResultReceiver =
TConfiguration.DEFAULT_MAX_MESSAGE_SIZE;
+
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to
generate some variables,
// not the default value set in the code.
public void initFuzzyModeVariables() {
@@ -2867,5 +2876,10 @@ public class SessionVariable implements Serializable,
Writable {
public boolean getShowAllFeConnection() {
return this.showAllFeConnection;
}
+
+ public int getMaxMsgSizeOfResultReceiver() {
+ return this.maxMsgSizeOfResultReceiver;
+ }
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 14b77559064..067c31f26be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1432,7 +1432,8 @@ public class StmtExecutor {
RowBatch batch;
CoordInterface coordBase = null;
if (queryStmt instanceof SelectStmt && ((SelectStmt)
parsedStmt).isPointQueryShortCircuit()) {
- coordBase = new PointQueryExec(planner, analyzer);
+ coordBase = new PointQueryExec(planner, analyzer,
+
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
} else {
coord = new Coordinator(context, analyzer, planner,
context.getStatsErrorEstimator());
if (Config.enable_workload_group &&
context.sessionVariable.getEnablePipelineEngine()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java
new file mode 100644
index 00000000000..ca586df1e39
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.rpc;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TTransport;
+
+/*
+* CustomProtocolFactory can change maxMessageSize of TTransport,
+* so that we can transfer message whose filed size is very large.
+*/
+public class TCustomProtocolFactory implements TProtocolFactory {
+ private final int maxMessageSize;
+
+ @Override
+ public TProtocol getProtocol(TTransport tTransport) {
+ tTransport.getConfiguration().setMaxMessageSize(maxMessageSize);
+ return new TBinaryProtocol(tTransport);
+ }
+
+ public TCustomProtocolFactory(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+}
diff --git
a/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy
b/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy
new file mode 100644
index 00000000000..e7fead33d90
--- /dev/null
+++ b/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("max_msg_size_of_result_receiver") {
+ def MESSAGE_SIZE_BASE=1000*1000; // 1MB
+ def DEFAULT_MAX_MESSAGE_SIZE = 104000000; // 104MB
+ def table_name = "max_msg_size_of_result_receiver"
+ sql """
+ DROP TABLE IF EXISTS ${table_name}
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (id int, str string)
+ ENGINE=OLAP DISTRIBUTED BY HASH(id)
+ PROPERTIES("replication_num"="1")
+ """
+
+ sql """
+ INSERT INTO ${table_name} VALUES (104, repeat("a", ${MESSAGE_SIZE_BASE
* 104}))
+ """
+
+ sql """
+ INSERT INTO ${table_name} VALUES (105, repeat("a", ${MESSAGE_SIZE_BASE
* 105}))
+ """
+
+ def with_exception = false
+ try {
+ sql "SELECT * FROM ${table_name} WHERE id = 104"
+ } catch (Exception e) {
+ with_exception = true
+ }
+ assertEquals(with_exception, false)
+
+ try {
+ sql "SELECT * FROM ${table_name} WHERE id = 105"
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains('MaxMessageSize reached, try
increase max_msg_size_of_result_receiver'))
+ }
+
+ try {
+ sql "SELECT
/*+SET_VAR(max_msg_size_of_result_receiver=${DEFAULT_MAX_MESSAGE_SIZE * 2})*/ *
FROM ${table_name} WHERE id = 105"
+ } catch (Exception e) {
+ with_exception = true
+
+ }
+ assertEquals(with_exception, false)
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]