This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 2fb8e00907e branch-2.1: [chore](task) log the thrift message size if
the broken pipe is occurred #49492 (#49509)
2fb8e00907e is described below
commit 2fb8e00907e193995d4c05f8ab700358ffd67688
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 28 09:57:53 2025 +0800
branch-2.1: [chore](task) log the thrift message size if the broken pipe is
occurred #49492 (#49509)
Cherry-picked from #49492
Co-authored-by: walter <[email protected]>
---
.../java/org/apache/doris/task/AgentBatchTask.java | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index ebfdb28a16b..0045e05ccba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -20,6 +20,7 @@ package org.apache.doris.task;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.ThriftUtils;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
@@ -60,6 +61,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/*
* This class group tasks by backend
@@ -166,6 +168,7 @@ public class AgentBatchTask implements Runnable {
TNetworkAddress address = null;
boolean ok = false;
String errMsg = "";
+ List<TAgentTaskRequest> agentTaskRequests = new
LinkedList<TAgentTaskRequest>();
try {
Backend backend =
Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null || !backend.isAlive()) {
@@ -177,7 +180,6 @@ public class AgentBatchTask implements Runnable {
String host = FeConstants.runningUnitTest ? "127.0.0.1" :
backend.getHost();
address = new TNetworkAddress(host, backend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
- List<TAgentTaskRequest> agentTaskRequests = new
LinkedList<TAgentTaskRequest>();
for (AgentTask task : tasks) {
agentTaskRequests.add(toAgentTaskRequest(task));
if (agentTaskRequests.size() >= batchSize) {
@@ -190,6 +192,21 @@ public class AgentBatchTask implements Runnable {
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backendId, e);
errMsg = String.format("task exec error: %s. backend[%d]",
e.getMessage(), backendId);
+ if (!agentTaskRequests.isEmpty() && errMsg.contains("Broken
pipe")) {
+ // Log the task binary message size and the max task type,
to help debug the
+ // large thrift message size issue.
+ List<Pair<TTaskType, Long>> taskTypeAndSize =
agentTaskRequests.stream()
+ .map(req -> Pair.of(req.getTaskType(),
ThriftUtils.getBinaryMessageSize(req)))
+ .collect(Collectors.toList());
+ Pair<TTaskType, Long> maxTaskTypeAndSize =
taskTypeAndSize.stream()
+ .max((p1, p2) -> Long.compare(p1.value(),
p2.value()))
+ .orElse(null); // taskTypeAndSize is not empty
+ TTaskType maxType = maxTaskTypeAndSize.first;
+ long maxSize = maxTaskTypeAndSize.second;
+ long totalSize =
taskTypeAndSize.stream().map(Pair::value).reduce(0L, Long::sum);
+ LOG.warn("submit {} tasks to backend[{}], total size: {},
max task type: {}, size: {}. msg: {}",
+ agentTaskRequests.size(), backendId, totalSize,
maxType, maxSize, e.getMessage());
+ }
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]