This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 1144ab91150 [opt](session variable) max_msg_size_of_result_receiver 
#31809 (#31867)
1144ab91150 is described below

commit 1144ab9115095b61228692f5f42977631da0b69c
Author: zhiqiang <[email protected]>
AuthorDate: Wed Mar 6 20:03:46 2024 +0800

    [opt](session variable) max_msg_size_of_result_receiver #31809 (#31867)
    
    * REFORMAT & FINISH
    
    * FIX
---
 .../main/java/org/apache/doris/qe/Coordinator.java |  9 +++-
 .../java/org/apache/doris/qe/PointQueryExec.java   | 19 +++++--
 .../java/org/apache/doris/qe/ResultReceiver.java   | 22 ++++++--
 .../java/org/apache/doris/qe/SessionVariable.java  | 14 +++++
 .../java/org/apache/doris/qe/StmtExecutor.java     |  3 +-
 .../apache/doris/rpc/TCustomProtocolFactory.java   | 41 +++++++++++++++
 .../max_msg_size_of_result_receiver.groovy         | 61 ++++++++++++++++++++++
 7 files changed, 160 insertions(+), 9 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 96ed3649007..294bcaab444 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -124,6 +124,7 @@ import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TConfiguration;
 import org.apache.thrift.TException;
 import org.jetbrains.annotations.NotNull;
 
@@ -243,6 +244,8 @@ public class Coordinator implements CoordInterface {
 
     private boolean fasterFloatConvert = false;
 
+    private int maxMsgSizeOfResultReceiver = 
TConfiguration.DEFAULT_MAX_MESSAGE_SIZE;
+
     // Runtime filter merge instance address and ID
     public TNetworkAddress runtimeFilterMergeAddr;
     public TUniqueId runtimeFilterMergeInstanceId;
@@ -315,7 +318,7 @@ public class Coordinator implements CoordInterface {
         nextInstanceId.setLo(queryId.lo + 1);
         this.assignedRuntimeFilters = planner.getRuntimeFilters();
         this.executionProfile = new ExecutionProfile(queryId, 
fragments.size());
-
+        this.maxMsgSizeOfResultReceiver = 
context.getSessionVariable().getMaxMsgSizeOfResultReceiver();
     }
 
     // Used for broker load task/export task/update coordinator
@@ -566,7 +569,9 @@ public class Coordinator implements CoordInterface {
         if (topDataSink instanceof ResultSink || topDataSink instanceof 
ResultFileSink) {
             TNetworkAddress execBeAddr = 
topParams.instanceExecParams.get(0).host;
             receiver = new ResultReceiver(queryId, 
topParams.instanceExecParams.get(0).instanceId,
-                    addressToBackendID.get(execBeAddr), 
toBrpcHost(execBeAddr), this.timeoutDeadline);
+                    addressToBackendID.get(execBeAddr), 
toBrpcHost(execBeAddr), this.timeoutDeadline,
+                    this.maxMsgSizeOfResultReceiver);
+
             if (LOG.isDebugEnabled()) {
                 LOG.debug("dispatch query job: {} to {}", 
DebugUtil.printId(queryId),
                         topParams.instanceExecParams.get(0).host);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index 0d18b2f08c2..f5712d3a92b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -35,6 +35,7 @@ import org.apache.doris.proto.InternalService.KeyTuple;
 import org.apache.doris.proto.Types;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.rpc.TCustomProtocolFactory;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TExpr;
 import org.apache.doris.thrift.TExprList;
@@ -85,6 +86,8 @@ public class PointQueryExec implements CoordInterface {
     // using this ID to find for this prepared statement
     private UUID cacheID;
 
+    private final int maxMsgSizeOfResultReceiver;
+
     private OlapScanNode getPlanRoot() {
         List<PlanFragment> fragments = planner.getFragments();
         PlanFragment fragment = fragments.get(0);
@@ -94,7 +97,7 @@ public class PointQueryExec implements CoordInterface {
         return planRoot;
     }
 
-    public PointQueryExec(Planner planner, Analyzer analyzer) {
+    public PointQueryExec(Planner planner, Analyzer analyzer, int 
maxMessageSize) {
         // init from planner
         this.planner = planner;
         List<PlanFragment> fragments = planner.getFragments();
@@ -115,6 +118,7 @@ public class PointQueryExec implements CoordInterface {
             // TODO
             // planner.getDescTable().toThrift();
         }
+        this.maxMsgSizeOfResultReceiver = maxMessageSize;
     }
 
     void setScanRangeLocations() throws Exception {
@@ -306,8 +310,17 @@ public class PointQueryExec implements CoordInterface {
         } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
             byte[] serialResult = pResult.getRowBatch().toByteArray();
             TResultBatch resultBatch = new TResultBatch();
-            TDeserializer deserializer = new TDeserializer();
-            deserializer.deserialize(resultBatch, serialResult);
+            TDeserializer deserializer = new TDeserializer(
+                    new 
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
+            try {
+                deserializer.deserialize(resultBatch, serialResult);
+            } catch (TException e) {
+                if (e.getMessage().contains("MaxMessageSize reached")) {
+                    throw new TException("MaxMessageSize reached, try increase 
max_msg_size_of_result_receiver");
+                } else {
+                    throw e;
+                }
+            }
             rowBatch.setBatch(resultBatch);
             rowBatch.setEos(true);
             return rowBatch;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index a9e9740963f..c473d74b919 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -23,6 +23,7 @@ import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.Types;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.rpc.TCustomProtocolFactory;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TResultBatch;
 import org.apache.doris.thrift.TStatusCode;
@@ -53,12 +54,16 @@ public class ResultReceiver {
     private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture = 
null;
     public String cancelReason = "";
 
-    public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, 
TNetworkAddress address, long timeoutTs) {
+    int maxMsgSizeOfResultReceiver;
+
+    public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, 
TNetworkAddress address, long timeoutTs,
+            int maxMsgSizeOfResultReceiver) {
         this.queryId = 
Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
         this.finstId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
         this.backendId = backendId;
         this.address = address;
         this.timeoutTs = timeoutTs;
+        this.maxMsgSizeOfResultReceiver = maxMsgSizeOfResultReceiver;
     }
 
     public RowBatch getNext(Status status) throws TException {
@@ -136,8 +141,19 @@ public class ResultReceiver {
                 } else if (pResult.hasRowBatch() && 
pResult.getRowBatch().size() > 0) {
                     byte[] serialResult = pResult.getRowBatch().toByteArray();
                     TResultBatch resultBatch = new TResultBatch();
-                    TDeserializer deserializer = new TDeserializer();
-                    deserializer.deserialize(resultBatch, serialResult);
+                    TDeserializer deserializer = new TDeserializer(
+                            new 
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
+                    try {
+                        deserializer.deserialize(resultBatch, serialResult);
+                    } catch (TException e) {
+                        if (e.getMessage().contains("MaxMessageSize reached")) 
{
+                            throw new TException(
+                                "MaxMessageSize reached, try increase 
max_msg_size_of_result_receiver");
+                        } else {
+                            throw e;
+                        }
+                    }
+
                     rowBatch.setBatch(resultBatch);
                     rowBatch.setEos(pResult.getEos());
                     return rowBatch;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 40cf53db0d7..12083f17c13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -40,6 +40,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TConfiguration;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
@@ -447,6 +448,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String SHOW_ALL_FE_CONNECTION = 
"show_all_fe_connection";
 
+    public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER = 
"max_msg_size_of_result_receiver";
+
     public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
             SKIP_DELETE_PREDICATE,
             SKIP_DELETE_BITMAP,
@@ -1375,6 +1378,12 @@ public class SessionVariable implements Serializable, 
Writable {
                     "the plan node type which is ignored in 'explain shape 
plan' command"})
     public String ignoreShapePlanNodes = "";
 
+    @VariableMgr.VarAttr(name = MAX_MSG_SIZE_OF_RESULT_RECEIVER,
+            description = {"Max message size during result deserialization, 
change this if you meet error"
+                    + " like \"MaxMessageSize reached\"",
+                    "用于控制结果反序列化时 thrift 字段的最大值,当遇到类似\"MaxMessageSize 
reached\"这样的错误时可以考虑修改该参数"})
+    public int maxMsgSizeOfResultReceiver = 
TConfiguration.DEFAULT_MAX_MESSAGE_SIZE;
+
     // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to 
generate some variables,
     // not the default value set in the code.
     public void initFuzzyModeVariables() {
@@ -2867,5 +2876,10 @@ public class SessionVariable implements Serializable, 
Writable {
     public boolean getShowAllFeConnection() {
         return this.showAllFeConnection;
     }
+
+    public int getMaxMsgSizeOfResultReceiver() {
+        return this.maxMsgSizeOfResultReceiver;
+    }
+
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 14b77559064..067c31f26be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1432,7 +1432,8 @@ public class StmtExecutor {
         RowBatch batch;
         CoordInterface coordBase = null;
         if (queryStmt instanceof SelectStmt && ((SelectStmt) 
parsedStmt).isPointQueryShortCircuit()) {
-            coordBase = new PointQueryExec(planner, analyzer);
+            coordBase = new PointQueryExec(planner, analyzer,
+                    
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
         } else {
             coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
             if (Config.enable_workload_group && 
context.sessionVariable.getEnablePipelineEngine()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java
new file mode 100644
index 00000000000..ca586df1e39
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/TCustomProtocolFactory.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.rpc;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TTransport;
+
+/*
+*   CustomProtocolFactory can change maxMessageSize of TTransport,
+*   so that we can transfer message whose filed size is very large.
+*/
+public class TCustomProtocolFactory implements TProtocolFactory {
+    private final int maxMessageSize;
+
+    @Override
+    public TProtocol getProtocol(TTransport tTransport) {
+        tTransport.getConfiguration().setMaxMessageSize(maxMessageSize);
+        return new TBinaryProtocol(tTransport);
+    }
+
+    public TCustomProtocolFactory(int maxMessageSize) {
+        this.maxMessageSize = maxMessageSize;
+    }
+}
diff --git 
a/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy 
b/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy
new file mode 100644
index 00000000000..e7fead33d90
--- /dev/null
+++ b/regression-test/suites/variable_p0/max_msg_size_of_result_receiver.groovy
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("max_msg_size_of_result_receiver") {
+    def MESSAGE_SIZE_BASE=1000*1000; // 1MB
+    def DEFAULT_MAX_MESSAGE_SIZE = 104000000; // 104MB
+    def table_name = "max_msg_size_of_result_receiver"
+    sql """
+        DROP TABLE  IF EXISTS ${table_name}
+    """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${table_name} (id int, str string)
+        ENGINE=OLAP DISTRIBUTED BY HASH(id)
+        PROPERTIES("replication_num"="1")
+    """
+
+    sql """
+        INSERT INTO ${table_name} VALUES (104, repeat("a", ${MESSAGE_SIZE_BASE 
* 104}))
+    """
+
+    sql """
+        INSERT INTO ${table_name} VALUES (105, repeat("a", ${MESSAGE_SIZE_BASE 
* 105}))
+    """
+
+    def with_exception = false
+    try {
+        sql "SELECT * FROM ${table_name} WHERE id = 104"
+    } catch (Exception e) {
+        with_exception = true
+    }
+    assertEquals(with_exception, false)
+    
+    try {
+        sql "SELECT * FROM ${table_name} WHERE id = 105"
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains('MaxMessageSize reached, try 
increase max_msg_size_of_result_receiver'))
+    }
+
+    try {
+        sql "SELECT 
/*+SET_VAR(max_msg_size_of_result_receiver=${DEFAULT_MAX_MESSAGE_SIZE * 2})*/ * 
FROM ${table_name} WHERE id = 105"
+    } catch (Exception e) {
+        with_exception = true
+    
+    }
+    assertEquals(with_exception, false)
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to