This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new f0a9fed4891 [chore](agent) log the binary message size of agent tasks
#43239 (#43367)
f0a9fed4891 is described below
commit f0a9fed4891a8edcace36c3eab9ea26e4b0c372f
Author: walter <[email protected]>
AuthorDate: Wed Nov 6 18:38:24 2024 +0800
[chore](agent) log the binary message size of agent tasks #43239 (#43367)
cherry pick from #43239
---
.../java/org/apache/doris/common/ThriftUtils.java | 85 ++++++++++++++++++++++
.../java/org/apache/doris/task/AgentBatchTask.java | 9 +++
2 files changed, 94 insertions(+)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/ThriftUtils.java
b/fe/fe-common/src/main/java/org/apache/doris/common/ThriftUtils.java
new file mode 100644
index 00000000000..92881cec526
--- /dev/null
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/ThriftUtils.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+// Utility functions for thrift
+public class ThriftUtils {
+ // Get the size of the binary message of the thrift object
+ public static long getBinaryMessageSize(TBase<?, ?> thriftObject) {
+ TSizeTransport trans = new TSizeTransport();
+ TBinaryProtocol protocol = new TBinaryProtocol(trans);
+ try {
+ thriftObject.write(protocol);
+ } catch (TException e) {
+ return -1;
+ }
+ return trans.getSize();
+ }
+
+ // A transport class that only records the size of the message
+ private static class TSizeTransport extends TTransport {
+ private long size = 0;
+
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws
TTransportException {
+ size += len;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws
TTransportException {
+ throw new UnsupportedOperationException("Unimplemented method
'read'");
+ }
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return new TConfiguration();
+ }
+
+ @Override
+ public void updateKnownMessageSize(long size) throws
TTransportException {
+ }
+
+ @Override
+ public void checkReadBytesAvailable(long numBytes) throws
TTransportException {
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index b1839400d31..1e12a93368a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -20,6 +20,7 @@ package org.apache.doris.task;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.ThriftUtils;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TAgentServiceVersion;
@@ -204,6 +205,14 @@ public class AgentBatchTask implements Runnable {
private static void submitTasks(long backendId,
BackendService.Client client, List<TAgentTaskRequest>
agentTaskRequests) throws TException {
if (!agentTaskRequests.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ long size = agentTaskRequests.stream()
+ .map(ThriftUtils::getBinaryMessageSize)
+ .reduce(0L, Long::sum);
+ TTaskType firstTaskType =
agentTaskRequests.get(0).getTaskType();
+ LOG.debug("submit {} tasks to backend[{}], total size: {},
first task type: {}",
+ agentTaskRequests.size(), backendId, size,
firstTaskType);
+ }
client.submitTasks(agentTaskRequests);
}
if (LOG.isDebugEnabled()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]