This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new f0a9fed4891 [chore](agent) log the binary message size of agent tasks 
#43239 (#43367)
f0a9fed4891 is described below

commit f0a9fed4891a8edcace36c3eab9ea26e4b0c372f
Author: walter <[email protected]>
AuthorDate: Wed Nov 6 18:38:24 2024 +0800

    [chore](agent) log the binary message size of agent tasks #43239 (#43367)
    
    cherry pick from #43239
---
 .../java/org/apache/doris/common/ThriftUtils.java  | 85 ++++++++++++++++++++++
 .../java/org/apache/doris/task/AgentBatchTask.java |  9 +++
 2 files changed, 94 insertions(+)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/ThriftUtils.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/ThriftUtils.java
new file mode 100644
index 00000000000..92881cec526
--- /dev/null
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/ThriftUtils.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+// Utility functions for thrift
+public class ThriftUtils {
+    // Get the size of the binary message of the thrift object
+    public static long getBinaryMessageSize(TBase<?, ?> thriftObject) {
+        TSizeTransport trans = new TSizeTransport();
+        TBinaryProtocol protocol = new TBinaryProtocol(trans);
+        try {
+            thriftObject.write(protocol);
+        } catch (TException e) {
+            return -1;
+        }
+        return trans.getSize();
+    }
+
+    // A transport class that only records the size of the message
+    private static class TSizeTransport extends TTransport {
+        private long size = 0;
+
+        public long getSize() {
+            return size;
+        }
+
+        @Override
+        public void write(byte[] buf, int off, int len) throws 
TTransportException {
+            size += len;
+        }
+
+        @Override
+        public boolean isOpen() {
+            return true;
+        }
+
+        @Override
+        public void open() throws TTransportException {
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public int read(byte[] buf, int off, int len) throws 
TTransportException {
+            throw new UnsupportedOperationException("Unimplemented method 
'read'");
+        }
+
+        @Override
+        public TConfiguration getConfiguration() {
+            return new TConfiguration();
+        }
+
+        @Override
+        public void updateKnownMessageSize(long size) throws 
TTransportException {
+        }
+
+        @Override
+        public void checkReadBytesAvailable(long numBytes) throws 
TTransportException {
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index b1839400d31..1e12a93368a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -20,6 +20,7 @@ package org.apache.doris.task;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.ThriftUtils;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TAgentServiceVersion;
@@ -204,6 +205,14 @@ public class AgentBatchTask implements Runnable {
     private static void submitTasks(long backendId,
             BackendService.Client client, List<TAgentTaskRequest> 
agentTaskRequests) throws TException {
         if (!agentTaskRequests.isEmpty()) {
+            if (LOG.isDebugEnabled()) {
+                long size = agentTaskRequests.stream()
+                        .map(ThriftUtils::getBinaryMessageSize)
+                        .reduce(0L, Long::sum);
+                TTaskType firstTaskType = 
agentTaskRequests.get(0).getTaskType();
+                LOG.debug("submit {} tasks to backend[{}], total size: {}, 
first task type: {}",
+                        agentTaskRequests.size(), backendId, size, 
firstTaskType);
+            }
             client.submitTasks(agentTaskRequests);
         }
         if (LOG.isDebugEnabled()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to