This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c37d4cbb70 [INLONG-10751][Agent] Report Agent process status for 
backend problem analysis (#10752)
c37d4cbb70 is described below

commit c37d4cbb70694e6aa3d1cf43563a6caaf5d5a1f5
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Aug 6 14:59:03 2024 +0800

    [INLONG-10751][Agent] Report Agent process status for backend problem 
analysis (#10752)
    
    * [INLONG-10751][Agent] Report Agent process status for backend problem 
analysis
    
    * [INLONG-10751][Agent] Report Agent process status for backend problem 
analysis
    
    * [INLONG-10751][Agent] Report Agent process status for backend problem 
analysis
    
    * [INLONG-10751][Agent] Report Agent process status for backend problem 
analysis
---
 .../inlong/agent/constant/AgentConstants.java      |   1 +
 .../apache/inlong/agent/store/InstanceStore.java   |  12 +
 inlong-agent/agent-core/pom.xml                    |  15 ++
 .../inlong/agent/core/AgentStatusManager.java      | 281 +++++++++++++++++++++
 .../apache/inlong/agent/core/HeartbeatManager.java |  31 +--
 .../inlong/agent/core/task/OffsetManager.java      |   4 +
 .../apache/inlong/agent/core/task/TaskManager.java |  18 ++
 .../agent/plugin/fetcher/ManagerFetcher.java       |  26 +-
 .../plugin/sinks/filecollect/SenderManager.java    |   9 +-
 .../sinks/filecollect/TestSenderManager.java       |   2 +-
 10 files changed, 358 insertions(+), 41 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index dceb443b03..ec5831973b 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -56,6 +56,7 @@ public class AgentConstants {
     public static final String AGENT_CLUSTER_NAME = "agent.cluster.name";
     public static final String AGENT_CLUSTER_TAG = "agent.cluster.tag";
     public static final String AGENT_CLUSTER_IN_CHARGES = 
"agent.cluster.inCharges";
+    public static final String AGENT_INSTALL_PLATFORM = 
"agent.install.platform";
 
     public static final String AGENT_LOCAL_UUID = "agent.local.uuid";
     public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
index 2551734134..476d014e57 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.store;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.common.enums.InstanceStateEnum;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +55,17 @@ public class InstanceStore {
         return instanceList;
     }
 
+    public int getRunningInstanceCount() {
+        List<KeyValueEntity> result = this.store.findAll(store.getUniqueKey());
+        int count = 0;
+        for (KeyValueEntity entity : result) {
+            if (entity.getAsInstanceProfile().getState() == 
InstanceStateEnum.DEFAULT) {
+                count++;
+            }
+        }
+        return count;
+    }
+
     /**
      * get instance list from instance store.
      *
diff --git a/inlong-agent/agent-core/pom.xml b/inlong-agent/agent-core/pom.xml
index b314167960..2d7c1334cb 100755
--- a/inlong-agent/agent-core/pom.xml
+++ b/inlong-agent/agent-core/pom.xml
@@ -45,6 +45,21 @@
             <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>dataproxy-sdk</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-server</artifactId>
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
new file mode 100644
index 0000000000..fe6f4353aa
--- /dev/null
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ExcuteLinux;
+import org.apache.inlong.common.constant.ProtocolType;
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
+
+import com.google.common.collect.Lists;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
+import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
+import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_INSTALL_PLATFORM;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
+
+/**
+ * Collect various indicators of agent processes for backend problem analysis
+ */
+public class AgentStatusManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentStatusManager.class);
+    public static final String INLONG_AGENT_SYSTEM = "inlong_agent_system";
+    public static final String INLONG_AGENT_STATUS = "inlong_agent_status";
+
+    private static AgentStatusManager manager = null;
+    private final AgentConfiguration conf;
+    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss"); // 设置格式
+    private Runtime runtime = Runtime.getRuntime();
+    final long GB = 1024 * 1024 * 1024;
+    private OperatingSystemMXBean osMxBean;
+    private ThreadMXBean threadBean;
+    private final long INVALID_CPU = -1;
+    private RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+    private ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+    private AgentManager agentManager;
+    public static AtomicLong sendDataLen = new AtomicLong();
+    public static AtomicLong sendPackageCount = new AtomicLong();
+    private DefaultMessageSender sender;
+    private List<String> statusFieldsPre = Lists.newArrayList();
+    private String processStartupTime = 
format.format(runtimeMXBean.getStartTime());
+    private String systemStartupTime = ExcuteLinux.exeCmd("who -b|awk '{print 
$(NF-1), $NF}'").replaceAll("\r|\n", "");
+
+    private AgentStatusManager(AgentManager agentManager) {
+        this.agentManager = agentManager;
+        this.conf = AgentConfiguration.getAgentConf();
+        osMxBean = ManagementFactory.getOperatingSystemMXBean();
+        threadBean = ManagementFactory.getThreadMXBean();
+        initStatusFieldsPre();
+        createMessageSender();
+    }
+
+    public static AgentStatusManager getInstance(AgentManager agentManager) {
+        if (manager == null) {
+            synchronized (AgentStatusManager.class) {
+                if (manager == null) {
+                    manager = new AgentStatusManager(agentManager);
+                }
+            }
+        }
+        return manager;
+    }
+
+    public static AgentStatusManager getInstance() {
+        if (manager == null) {
+            throw new RuntimeException("HeartbeatManager has not been 
initialized by agentManager");
+        }
+        return manager;
+    }
+
+    public void sendStatusMsg(List<String> fields) {
+        if (sender == null) {
+            LOGGER.error("sender is null");
+            createMessageSender();
+            if (sender == null) {
+                return;
+            }
+        }
+        SendResult ret = sender.sendMessage(
+                StringUtils.join(fields, ",").getBytes(StandardCharsets.UTF_8),
+                INLONG_AGENT_SYSTEM,
+                INLONG_AGENT_STATUS,
+                AgentUtils.getCurrentTime(),
+                "", 30, TimeUnit.SECONDS);
+        if (ret != SendResult.OK) {
+            LOGGER.error("send status failed: ret {}", ret);
+        }
+    }
+
+    public void printStatusMsg(List<String> fields) {
+        List<String> toPrint = new ArrayList<>();
+        for (int i = 0; i < statusFieldsPre.size(); i++) {
+            toPrint.add(statusFieldsPre.get(i) + ": " + fields.get(i));
+        }
+        LOGGER.info("status detail:\n{}", StringUtils.join(toPrint, "\n"));
+    }
+
+    private void createMessageSender() {
+        String managerAddr = conf.get(AGENT_MANAGER_ADDR);
+        String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
+        String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
+        ProxyClientConfig proxyClientConfig = null;
+        try {
+            proxyClientConfig = new ProxyClientConfig(managerAddr, 
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
+            
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+            
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
+            
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
+            proxyClientConfig.setProtocolType(ProtocolType.TCP);
+            ThreadFactory SHARED_FACTORY = new 
DefaultThreadFactory("agent-sender-manager-heartbeat",
+                    Thread.currentThread().isDaemon());
+            sender = new DefaultMessageSender(proxyClientConfig, 
SHARED_FACTORY);
+        } catch (Exception e) {
+            LOGGER.error("heartbeat manager create sdk failed: ", e);
+        }
+    }
+
+    private double getProcessCpu() {
+        double cpu = tryGetProcessCpu();
+        int tryTimes = 0;
+        while (cpu < 0 && tryTimes < 10) {
+            cpu = tryGetProcessCpu();
+            tryTimes++;
+        }
+        return cpu;
+    }
+
+    private double tryGetProcessCpu() {
+        long[] startThreads = threadBean.getAllThreadIds();
+        long startTime = System.nanoTime();
+        long startUseTime = 0;
+        long temp;
+        for (long id : startThreads) {
+            temp = threadBean.getThreadCpuTime(id);
+            if (temp >= 0) {
+                startUseTime += temp;
+            } else {
+                return INVALID_CPU;
+            }
+        }
+        AgentUtils.silenceSleepInMs(5000);
+        long[] endThreads = threadBean.getAllThreadIds();
+        long endTime = System.nanoTime();
+        long endUseTime = 0;
+        for (long id : endThreads) {
+            temp = threadBean.getThreadCpuTime(id);
+            if (temp >= 0) {
+                endUseTime += temp;
+            } else {
+                return INVALID_CPU;
+            }
+        }
+        long usedTime = endUseTime - startUseTime;
+        long totalPassedTime = endTime - startTime;
+        return (((double) usedTime) / totalPassedTime) * 100;
+    }
+
+    public List<String> getStatusMessage() {
+        List<String> fields = Lists.newArrayList();
+        fields.add(AgentUtils.fetchLocalIp());
+        fields.add(conf.get(AGENT_CLUSTER_NAME));
+        fields.add(conf.get(AGENT_CLUSTER_TAG));
+        fields.add(TaskManager.class.getPackage().getImplementationVersion());
+        fields.add(processStartupTime);
+        fields.add(String.valueOf(runtime.availableProcessors()));
+        fields.add(String.valueOf(twoDecimal(getProcessCpu())));
+        fields.add(String.valueOf(twoDecimal((double) runtime.freeMemory() / 
GB)));
+        fields.add(String.valueOf(twoDecimal((double) runtime.maxMemory() / 
GB)));
+        fields.add(String.valueOf(twoDecimal((double) runtime.totalMemory() / 
GB)));
+        fields.add(System.getProperty("os.version"));
+        fields.add(conf.get(AGENT_INSTALL_PLATFORM, ""));
+        fields.add(System.getProperty("user.dir"));
+        fields.add(System.getProperty("user.name"));
+        fields.add(String.valueOf(getProcessId()));
+        if (AgentManager.getAgentConfigInfo() != null) {
+            fields.add(AgentManager.getAgentConfigInfo().getMd5());
+        } else {
+            fields.add("");
+        }
+        fields.add(agentManager.getTaskManager().getTaskResultMd5());
+        
fields.add(String.valueOf(agentManager.getTaskManager().getTaskStore().getTasks().size()));
+        
fields.add(String.valueOf(OffsetManager.getInstance().getRunningInstanceCount()));
+        fields.add(systemStartupTime);
+        fields.add(String.valueOf(sendPackageCount.getAndSet(0)));
+        fields.add(String.valueOf(sendDataLen.getAndSet(0)));
+        
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_SOURCE_PERMIT)));
+        
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT)));
+        
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_WRITER_PERMIT)));
+        fields.add(String.valueOf(threadMXBean.getThreadCount()));
+        return fields;
+    }
+
+    private void initStatusFieldsPre() {
+        statusFieldsPre.add("ip: ");
+        statusFieldsPre.add("cluster: ");
+        statusFieldsPre.add("tag: ");
+        statusFieldsPre.add("agent version: ");
+        statusFieldsPre.add("agent start time: ");
+        statusFieldsPre.add("cpu core: ");
+        statusFieldsPre.add("proc cpu: ");
+        statusFieldsPre.add("free mem: ");
+        statusFieldsPre.add("max mem: ");
+        statusFieldsPre.add("use mem: ");
+        statusFieldsPre.add("os: ");
+        statusFieldsPre.add("install platform: ");
+        statusFieldsPre.add("usr dir: ");
+        statusFieldsPre.add("usr name: ");
+        statusFieldsPre.add("process id: ");
+        statusFieldsPre.add("global config md5: ");
+        statusFieldsPre.add("task md5: ");
+        statusFieldsPre.add("task num: ");
+        statusFieldsPre.add("instance num: ");
+        statusFieldsPre.add("boot time: ");
+        statusFieldsPre.add("send package count: ");
+        statusFieldsPre.add("send data len: ");
+        statusFieldsPre.add("source permit left: ");
+        statusFieldsPre.add("queue permit left: ");
+        statusFieldsPre.add("writer permit left: ");
+        statusFieldsPre.add("active thread count: ");
+    }
+
+    public double twoDecimal(double doubleValue) {
+        BigDecimal bigDecimal = new BigDecimal(doubleValue).setScale(2, 
RoundingMode.HALF_UP);
+        return bigDecimal.doubleValue();
+    }
+
+    private Long getProcessId() {
+        try {
+            RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+            String name = runtime.getName();
+            String pid = name.substring(0, name.indexOf('@'));
+            return Long.parseLong(pid);
+        } catch (Exception e) {
+            return null;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index d9879586f8..05886ebd8a 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.core;
 import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.core.task.MemoryManager;
-import org.apache.inlong.agent.core.task.TaskManager;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.HttpManager;
 import org.apache.inlong.agent.utils.ThreadUtils;
@@ -33,9 +32,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.regex.Pattern;
+import java.util.List;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_CHARGES;
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
@@ -50,32 +48,23 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HeartbeatManager.class);
     public static final int PRINT_MEMORY_PERMIT_INTERVAL_SECOND = 60;
+    public static final int HEARTBEAT_INTERVAL_SECOND = 60;
+
     private static HeartbeatManager heartbeatManager = null;
-    private final TaskManager taskManager;
     private final AgentConfiguration conf;
     private final HttpManager httpManager;
     private final String baseManagerUrl;
     private final String reportHeartbeatUrl;
-    private final Pattern numberPattern = Pattern.compile("^[-+]?[\\d]*$");
 
     /**
      * Init heartbeat manager.
      */
     private HeartbeatManager(AgentManager agentManager) {
         this.conf = AgentConfiguration.getAgentConf();
-        taskManager = agentManager.getTaskManager();
         httpManager = new HttpManager(conf);
         baseManagerUrl = httpManager.getBaseUrl();
         reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
-    }
-
-    private HeartbeatManager() {
-        conf = AgentConfiguration.getAgentConf();
-        httpManager = new HttpManager(conf);
-        baseManagerUrl = httpManager.getBaseUrl();
-        reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
-
-        taskManager = null;
+        AgentStatusManager.getInstance(agentManager);
     }
 
     public static HeartbeatManager getInstance(AgentManager agentManager) {
@@ -122,10 +111,14 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug(" {} report heartbeat to manager", 
heartbeatMsg);
                     }
-                    SECONDS.sleep(heartbeatInterval());
+                    List<String> fields = 
AgentStatusManager.getInstance().getStatusMessage();
+                    AgentStatusManager.getInstance().sendStatusMsg(fields);
+                    AgentStatusManager.getInstance().printStatusMsg(fields);
                 } catch (Throwable e) {
                     LOGGER.error("interrupted while report heartbeat", e);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
e);
+                } finally {
+                    
AgentUtils.silenceSleepInSeconds(HEARTBEAT_INTERVAL_SECOND);
                 }
             }
         };
@@ -188,10 +181,4 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
     private String buildReportHeartbeatUrl(String baseUrl) {
         return baseUrl + conf.get(AGENT_MANAGER_HEARTBEAT_HTTP_PATH, 
DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH);
     }
-
-    public static void main(String[] args) throws Exception {
-        HeartbeatManager heartbeatManager = new HeartbeatManager();
-        
heartbeatManager.reportHeartbeat(heartbeatManager.buildDeadHeartbeatMsg());
-        System.out.println("Success send dead heartbeat message to manager.");
-    }
 }
\ No newline at end of file
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 05aca28f28..631e8a889e 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -179,6 +179,10 @@ public class OffsetManager extends AbstractDaemon {
         }
     }
 
+    public int getRunningInstanceCount() {
+        return instanceStore.getRunningInstanceCount();
+    }
+
     @Override
     public void start() throws Exception {
         submitWorker(coreThread());
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index c1b7da07f3..6db94a546b 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -80,6 +80,8 @@ public class TaskManager extends AbstractDaemon {
     private static final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
     // instance profile queue.
     private final BlockingQueue<TaskAction> actionQueue;
+    private String taskResultMd5;
+    private Integer taskResultVersion = -1;
 
     private class TaskPrintStat {
 
@@ -529,6 +531,22 @@ public class TaskManager extends AbstractDaemon {
         return taskStore.getTask(taskId);
     }
 
+    public String getTaskResultMd5() {
+        return taskResultMd5;
+    }
+
+    public void setTaskResultMd5(String taskResultMd5) {
+        this.taskResultMd5 = taskResultMd5;
+    }
+
+    public Integer getTaskResultVersion() {
+        return taskResultVersion;
+    }
+
+    public void setTaskResultVersion(Integer taskResultVersion) {
+        this.taskResultVersion = taskResultVersion;
+    }
+
     @Override
     public void start() throws Exception {
         restoreFromStore();
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 3746c36574..e0125751c3 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -84,10 +84,6 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
     private String uuid;
     private String clusterTag;
     private String clusterName;
-    private String taskResultMd5;
-    private Integer taskResultVersion = -1;
-    private String agentConfigMd5;
-    private Integer agentConfigVersion = -1;
 
     public ManagerFetcher(AgentManager agentManager) {
         this.agentManager = agentManager;
@@ -167,7 +163,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     public TaskRequest getTaskRequest() {
         TaskRequest request = new TaskRequest();
-        request.setMd5(taskResultMd5);
+        request.setMd5(agentManager.getTaskManager().getTaskResultMd5());
         request.setAgentIp(localIp);
         request.setUuid(uuid);
         request.setClusterName(clusterName);
@@ -178,7 +174,9 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     public AgentConfigRequest getAgentConfigInfoRequest() {
         AgentConfigRequest request = new AgentConfigRequest();
-        request.setMd5(agentConfigMd5);
+        if (AgentManager.getAgentConfigInfo() != null) {
+            request.setMd5(AgentManager.getAgentConfigInfo().getMd5());
+        }
         request.setClusterTag(clusterTag);
         request.setClusterName(clusterName);
         request.setIp(localIp);
@@ -197,22 +195,22 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
                 try {
                     TaskResult taskResult = getStaticConfig();
                     if (taskResult != null && 
taskResult.getCode().equals(AgentResponseCode.SUCCESS)
-                            && taskResultVersion < taskResult.getVersion()) {
+                            && 
agentManager.getTaskManager().getTaskResultVersion() < taskResult.getVersion()) 
{
                         List<TaskProfile> taskProfiles = new ArrayList<>();
                         taskResult.getDataConfigs().forEach((config) -> {
                             TaskProfile profile = 
TaskProfile.convertToTaskProfile(config);
                             taskProfiles.add(profile);
                         });
                         
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
-                        taskResultMd5 = taskResult.getMd5();
-                        taskResultVersion = taskResult.getVersion();
+                        
agentManager.getTaskManager().setTaskResultMd5(taskResult.getMd5());
+                        
agentManager.getTaskManager().setTaskResultVersion(taskResult.getVersion());
                     }
                     AgentConfigInfo config = getAgentConfigInfo();
-                    if (config != null && 
config.getCode().equals(AgentResponseCode.SUCCESS)
-                            && agentConfigVersion < config.getVersion()) {
-                        agentManager.subNewAgentConfigInfo(config);
-                        agentConfigMd5 = config.getMd5();
-                        agentConfigVersion = config.getVersion();
+                    if (config != null && 
config.getCode().equals(AgentResponseCode.SUCCESS)) {
+                        if (AgentManager.getAgentConfigInfo() == null
+                                || 
AgentManager.getAgentConfigInfo().getVersion() < config.getVersion()) {
+                            agentManager.subNewAgentConfigInfo(config);
+                        }
                     }
                 } catch (Throwable ex) {
                     LOGGER.warn("exception caught", ex);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 3422f3b05e..fff55577c6 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.AgentStatusManager;
 import org.apache.inlong.agent.message.file.SenderMessage;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
@@ -154,7 +155,7 @@ public class SenderManager {
     }
 
     public void Start() throws Exception {
-        createMessageSender(inlongGroupId);
+        createMessageSender();
         EXECUTOR_SERVICE.execute(flushResendQueue());
         started = true;
     }
@@ -194,10 +195,8 @@ public class SenderManager {
 
     /**
      * createMessageSender
-     *
-     * @param tagName we use group id as tag name
      */
-    private void createMessageSender(String tagName) throws Exception {
+    private void createMessageSender() throws Exception {
         ProxyClientConfig proxyClientConfig = new 
ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
                 authSecretKey);
         proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
@@ -359,6 +358,8 @@ public class SenderManager {
                         dataTime, message.getMsgCnt(), message.getTotalSize(), 
auditVersion);
                 
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, groupId, 
streamId,
                         AgentUtils.getCurrentTime(), message.getMsgCnt(), 
message.getTotalSize(), auditVersion);
+                
AgentStatusManager.sendPackageCount.addAndGet(message.getMsgCnt());
+                
AgentStatusManager.sendDataLen.addAndGet(message.getTotalSize());
             } else {
                 LOGGER.warn("send groupId {}, streamId {}, taskId {}, 
instanceId {}, dataTime {} fail with times {}, "
                         + "error {}", groupId, streamId, taskId, instanceId, 
dataTime, retry, result);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index bc0aeedbc9..91b3c6c10a 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -87,7 +87,7 @@ public class TestSenderManager {
         try {
             profile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(profile.getInstanceId()));
             SenderManager senderManager = PowerMockito.spy(new 
SenderManager(profile, "inlongGroupId", "sourceName"));
-            PowerMockito.doNothing().when(senderManager, 
"createMessageSender", Mockito.anyString());
+            PowerMockito.doNothing().when(senderManager, 
"createMessageSender");
 
             PowerMockito.doAnswer(invocation -> {
                 SendMessageCallback cb = invocation.getArgument(0);

Reply via email to