This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b2646a7d3 [INLONG-10756][Agent] Report file metrics for backend 
problem analysis (#10757)
0b2646a7d3 is described below

commit 0b2646a7d31c2931d46f0bd3c50bc0f7993109f0
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Aug 7 12:12:03 2024 +0800

    [INLONG-10756][Agent] Report file metrics for backend problem analysis 
(#10757)
    
    * [INLONG-10756][Agent] Report file metrics for backend problem analysis
    
    * [INLONG-10756][Agent] Report file metrics for backend problem analysis
    
    * [INLONG-10756][Agent] Report file metrics for backend problem analysis
---
 .../inlong/agent/core/AgentStatusManager.java      | 219 ++++++++++-----------
 .../inlong/agent/core/FileStaticManager.java       | 141 +++++++++++++
 .../apache/inlong/agent/core/HeartbeatManager.java |  41 +++-
 .../inlong/agent/core/task/OffsetManager.java      |   3 -
 .../inlong/agent/plugin/sources/LogFileSource.java |  25 +++
 5 files changed, 308 insertions(+), 121 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
index fe6f4353aa..39dde5d934 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
@@ -18,34 +18,30 @@
 package org.apache.inlong.agent.core;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.core.task.MemoryManager;
 import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.TaskManager;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ExcuteLinux;
-import org.apache.inlong.common.constant.ProtocolType;
 import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 
 import com.google.common.collect.Lists;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import org.apache.commons.lang3.StringUtils;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.logging.log4j.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.management.ManagementFactory;
-import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -55,25 +51,85 @@ import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_INSTALL_PLAT
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
 
 /**
  * Collect various indicators of agent processes for backend problem analysis
  */
 public class AgentStatusManager {
 
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class AgentStatus {
+
+        private String agentIp;
+        private String tag;
+        private String cluster;
+        private String agentVersion;
+        private String agentStartTime;
+        private String cpuCore;
+        private String procCpu;
+        private String freeMem;
+        private String maxMem;
+        private String useMem;
+        private String os;
+        private String installPlatform;
+        private String usrDir;
+        private String usrName;
+        private String processId;
+        private String globalConfigMd5;
+        private String taskMd5;
+        private String taskNum;
+        private String instanceNum;
+        private String bootTime;
+        private String sendPackageCount;
+        private String sendDataLen;
+        private String sourcePermitLeft;
+        private String queuePermitLeft;
+        private String writerPermitLeft;
+        private String activeThreadCount;
+
+        public String getFieldsString() {
+            List<String> fields = Lists.newArrayList();
+            fields.add(agentIp);
+            fields.add(tag);
+            fields.add(cluster);
+            fields.add(agentVersion);
+            fields.add(agentStartTime);
+            fields.add(cpuCore);
+            fields.add(procCpu);
+            fields.add(freeMem);
+            fields.add(maxMem);
+            fields.add(useMem);
+            fields.add(os);
+            fields.add(installPlatform);
+            fields.add(usrDir);
+            fields.add(usrName);
+            fields.add(processId);
+            fields.add(globalConfigMd5);
+            fields.add(taskMd5);
+            fields.add(taskNum);
+            fields.add(instanceNum);
+            fields.add(bootTime);
+            fields.add(sendPackageCount);
+            fields.add(sendDataLen);
+            fields.add(sourcePermitLeft);
+            fields.add(queuePermitLeft);
+            fields.add(writerPermitLeft);
+            fields.add(activeThreadCount);
+            return Strings.join(fields, ',');
+        }
+    }
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentStatusManager.class);
     public static final String INLONG_AGENT_SYSTEM = "inlong_agent_system";
     public static final String INLONG_AGENT_STATUS = "inlong_agent_status";
 
     private static AgentStatusManager manager = null;
     private final AgentConfiguration conf;
-    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss"); // 设置格式
+    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
     private Runtime runtime = Runtime.getRuntime();
     final long GB = 1024 * 1024 * 1024;
-    private OperatingSystemMXBean osMxBean;
     private ThreadMXBean threadBean;
     private final long INVALID_CPU = -1;
     private RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
@@ -81,18 +137,13 @@ public class AgentStatusManager {
     private AgentManager agentManager;
     public static AtomicLong sendDataLen = new AtomicLong();
     public static AtomicLong sendPackageCount = new AtomicLong();
-    private DefaultMessageSender sender;
-    private List<String> statusFieldsPre = Lists.newArrayList();
     private String processStartupTime = 
format.format(runtimeMXBean.getStartTime());
-    private String systemStartupTime = ExcuteLinux.exeCmd("who -b|awk '{print 
$(NF-1), $NF}'").replaceAll("\r|\n", "");
+    private String systemStartupTime = ExcuteLinux.exeCmd("uptime 
-s").replaceAll("\r|\n", "");
 
     private AgentStatusManager(AgentManager agentManager) {
         this.agentManager = agentManager;
         this.conf = AgentConfiguration.getAgentConf();
-        osMxBean = ManagementFactory.getOperatingSystemMXBean();
         threadBean = ManagementFactory.getThreadMXBean();
-        initStatusFieldsPre();
-        createMessageSender();
     }
 
     public static AgentStatusManager getInstance(AgentManager agentManager) {
@@ -113,16 +164,13 @@ public class AgentStatusManager {
         return manager;
     }
 
-    public void sendStatusMsg(List<String> fields) {
+    public void sendStatusMsg(DefaultMessageSender sender) {
+        AgentStatus data = AgentStatusManager.getInstance().getStatus();
+        LOGGER.info("status detail: {}", data);
         if (sender == null) {
-            LOGGER.error("sender is null");
-            createMessageSender();
-            if (sender == null) {
-                return;
-            }
+            return;
         }
-        SendResult ret = sender.sendMessage(
-                StringUtils.join(fields, ",").getBytes(StandardCharsets.UTF_8),
+        SendResult ret = 
sender.sendMessage(data.getFieldsString().getBytes(StandardCharsets.UTF_8),
                 INLONG_AGENT_SYSTEM,
                 INLONG_AGENT_STATUS,
                 AgentUtils.getCurrentTime(),
@@ -132,33 +180,6 @@ public class AgentStatusManager {
         }
     }
 
-    public void printStatusMsg(List<String> fields) {
-        List<String> toPrint = new ArrayList<>();
-        for (int i = 0; i < statusFieldsPre.size(); i++) {
-            toPrint.add(statusFieldsPre.get(i) + ": " + fields.get(i));
-        }
-        LOGGER.info("status detail:\n{}", StringUtils.join(toPrint, "\n"));
-    }
-
-    private void createMessageSender() {
-        String managerAddr = conf.get(AGENT_MANAGER_ADDR);
-        String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
-        String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
-        ProxyClientConfig proxyClientConfig = null;
-        try {
-            proxyClientConfig = new ProxyClientConfig(managerAddr, 
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
-            
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
-            
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
-            
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
-            proxyClientConfig.setProtocolType(ProtocolType.TCP);
-            ThreadFactory SHARED_FACTORY = new 
DefaultThreadFactory("agent-sender-manager-heartbeat",
-                    Thread.currentThread().isDaemon());
-            sender = new DefaultMessageSender(proxyClientConfig, 
SHARED_FACTORY);
-        } catch (Exception e) {
-            LOGGER.error("heartbeat manager create sdk failed: ", e);
-        }
-    }
-
     private double getProcessCpu() {
         double cpu = tryGetProcessCpu();
         int tryTimes = 0;
@@ -199,68 +220,38 @@ public class AgentStatusManager {
         return (((double) usedTime) / totalPassedTime) * 100;
     }
 
-    public List<String> getStatusMessage() {
-        List<String> fields = Lists.newArrayList();
-        fields.add(AgentUtils.fetchLocalIp());
-        fields.add(conf.get(AGENT_CLUSTER_NAME));
-        fields.add(conf.get(AGENT_CLUSTER_TAG));
-        fields.add(TaskManager.class.getPackage().getImplementationVersion());
-        fields.add(processStartupTime);
-        fields.add(String.valueOf(runtime.availableProcessors()));
-        fields.add(String.valueOf(twoDecimal(getProcessCpu())));
-        fields.add(String.valueOf(twoDecimal((double) runtime.freeMemory() / 
GB)));
-        fields.add(String.valueOf(twoDecimal((double) runtime.maxMemory() / 
GB)));
-        fields.add(String.valueOf(twoDecimal((double) runtime.totalMemory() / 
GB)));
-        fields.add(System.getProperty("os.version"));
-        fields.add(conf.get(AGENT_INSTALL_PLATFORM, ""));
-        fields.add(System.getProperty("user.dir"));
-        fields.add(System.getProperty("user.name"));
-        fields.add(String.valueOf(getProcessId()));
+    private AgentStatus getStatus() {
+        AgentStatus data = new AgentStatus();
+        data.setAgentIp(AgentUtils.fetchLocalIp());
+        data.setTag(conf.get(AGENT_CLUSTER_TAG));
+        data.setCluster(conf.get(AGENT_CLUSTER_NAME));
+        
data.setAgentVersion(TaskManager.class.getPackage().getImplementationVersion());
+        data.setAgentStartTime(processStartupTime);
+        data.setCpuCore(String.valueOf(runtime.availableProcessors()));
+        data.setProcCpu(String.valueOf(twoDecimal(getProcessCpu())));
+        data.setFreeMem(String.valueOf(twoDecimal((double) 
runtime.freeMemory() / GB)));
+        data.setMaxMem(String.valueOf(twoDecimal((double) runtime.maxMemory() 
/ GB)));
+        data.setUseMem(String.valueOf(twoDecimal((double) 
runtime.totalMemory() / GB)));
+        data.setOs(System.getProperty("os.version"));
+        data.setInstallPlatform(conf.get(AGENT_INSTALL_PLATFORM, ""));
+        data.setUsrDir(System.getProperty("user.dir"));
+        data.setUsrName(System.getProperty("user.name"));
+        data.setProcessId(String.valueOf(getProcessId()));
         if (AgentManager.getAgentConfigInfo() != null) {
-            fields.add(AgentManager.getAgentConfigInfo().getMd5());
-        } else {
-            fields.add("");
+            
data.setGlobalConfigMd5(AgentManager.getAgentConfigInfo().getMd5());
         }
-        fields.add(agentManager.getTaskManager().getTaskResultMd5());
-        
fields.add(String.valueOf(agentManager.getTaskManager().getTaskStore().getTasks().size()));
-        
fields.add(String.valueOf(OffsetManager.getInstance().getRunningInstanceCount()));
-        fields.add(systemStartupTime);
-        fields.add(String.valueOf(sendPackageCount.getAndSet(0)));
-        fields.add(String.valueOf(sendDataLen.getAndSet(0)));
-        
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_SOURCE_PERMIT)));
-        
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT)));
-        
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_WRITER_PERMIT)));
-        fields.add(String.valueOf(threadMXBean.getThreadCount()));
-        return fields;
-    }
-
-    private void initStatusFieldsPre() {
-        statusFieldsPre.add("ip: ");
-        statusFieldsPre.add("cluster: ");
-        statusFieldsPre.add("tag: ");
-        statusFieldsPre.add("agent version: ");
-        statusFieldsPre.add("agent start time: ");
-        statusFieldsPre.add("cpu core: ");
-        statusFieldsPre.add("proc cpu: ");
-        statusFieldsPre.add("free mem: ");
-        statusFieldsPre.add("max mem: ");
-        statusFieldsPre.add("use mem: ");
-        statusFieldsPre.add("os: ");
-        statusFieldsPre.add("install platform: ");
-        statusFieldsPre.add("usr dir: ");
-        statusFieldsPre.add("usr name: ");
-        statusFieldsPre.add("process id: ");
-        statusFieldsPre.add("global config md5: ");
-        statusFieldsPre.add("task md5: ");
-        statusFieldsPre.add("task num: ");
-        statusFieldsPre.add("instance num: ");
-        statusFieldsPre.add("boot time: ");
-        statusFieldsPre.add("send package count: ");
-        statusFieldsPre.add("send data len: ");
-        statusFieldsPre.add("source permit left: ");
-        statusFieldsPre.add("queue permit left: ");
-        statusFieldsPre.add("writer permit left: ");
-        statusFieldsPre.add("active thread count: ");
+        data.setTaskMd5(agentManager.getTaskManager().getTaskResultMd5());
+        
data.setTaskNum(String.valueOf(agentManager.getTaskManager().getTaskStore().getTasks().size()));
+        
data.setInstanceNum(String.valueOf(OffsetManager.getInstance().getRunningInstanceCount()));
+        data.setBootTime(systemStartupTime);
+        
data.setSendPackageCount(String.valueOf(sendPackageCount.getAndSet(0)));
+        data.setSendDataLen(String.valueOf(sendDataLen.getAndSet(0)));
+        data.setSourcePermitLeft(
+                
String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_SOURCE_PERMIT)));
+        
data.setQueuePermitLeft(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT)));
+        
data.setWriterPermitLeft(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_WRITER_PERMIT)));
+        
data.setActiveThreadCount(String.valueOf(threadMXBean.getThreadCount()));
+        return data;
     }
 
     public double twoDecimal(double doubleValue) {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
new file mode 100644
index 0000000000..7acd32941b
--- /dev/null
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
+
+import com.google.common.collect.Lists;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.logging.log4j.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
+import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
+
+/**
+ * Collect various indicators of agent processes for backend problem analysis
+ */
+public class FileStaticManager {
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class FileStatic {
+
+        private String agentIp;
+        private String tag;
+        private String cluster;
+        private String taskId;
+        private String retry;
+        private String contentType;
+        private String groupId;
+        private String streamId;
+        private String dataTime;
+        private String fileName;
+        private String fileLen;
+        private String readBytes;
+        private String readLines;
+        private String sendLines;
+
+        public String getFieldsString() {
+            List<String> fields = Lists.newArrayList();
+            fields.add(agentIp);
+            fields.add(tag);
+            fields.add(cluster);
+            fields.add(taskId);
+            fields.add(retry);
+            fields.add(contentType);
+            fields.add(groupId);
+            fields.add(streamId);
+            fields.add(dataTime);
+            fields.add(fileName);
+            fields.add(fileLen);
+            fields.add(readBytes);
+            fields.add(readLines);
+            fields.add(sendLines);
+            return Strings.join(fields, ',');
+        }
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileStaticManager.class);
+    public static final String INLONG_AGENT_SYSTEM = "inlong_agent_system";
+    public static final String INLONG_FILE_STATIC = "inlong_agent_file_static";
+    protected final Integer CACHE_QUEUE_SIZE = 10000;
+    private static FileStaticManager manager = null;
+    private final AgentConfiguration conf;
+    protected BlockingQueue<FileStatic> queue;
+
+    private FileStaticManager(AgentManager agentManager) {
+        this.conf = AgentConfiguration.getAgentConf();
+        queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+    }
+
+    public static FileStaticManager getInstance(AgentManager agentManager) {
+        if (manager == null) {
+            synchronized (FileStaticManager.class) {
+                if (manager == null) {
+                    manager = new FileStaticManager(agentManager);
+                }
+            }
+        }
+        return manager;
+    }
+
+    public static FileStaticManager getInstance() {
+        return manager;
+    }
+
+    public void putStaticMsg(FileStatic data) {
+        data.setAgentIp(AgentUtils.fetchLocalIp());
+        data.setTag(conf.get(AGENT_CLUSTER_TAG));
+        data.setCluster(conf.get(AGENT_CLUSTER_NAME));
+        while (!queue.offer(data)) {
+            LOGGER.error("file static queue is full remove {}", queue.poll());
+        }
+    }
+
+    public void sendStaticMsg(DefaultMessageSender sender) {
+        while (!queue.isEmpty()) {
+            FileStatic data = queue.poll();
+            LOGGER.info("file static detail: {}", data);
+            if (sender == null) {
+                continue;
+            }
+            SendResult ret = 
sender.sendMessage(data.getFieldsString().getBytes(StandardCharsets.UTF_8),
+                    INLONG_AGENT_SYSTEM,
+                    INLONG_FILE_STATIC,
+                    AgentUtils.getCurrentTime(),
+                    "", 30, TimeUnit.SECONDS);
+            if (ret != SendResult.OK) {
+                LOGGER.error("send static failed: ret {}", ret);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 05886ebd8a..1f833275eb 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -19,25 +19,33 @@ package org.apache.inlong.agent.core;
 
 import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.core.task.MemoryManager;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.HttpManager;
 import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.constant.ProtocolType;
 import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.common.enums.NodeSrvStatus;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
+import java.util.concurrent.ThreadFactory;
 
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_CHARGES;
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
 
@@ -49,12 +57,14 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HeartbeatManager.class);
     public static final int PRINT_MEMORY_PERMIT_INTERVAL_SECOND = 60;
     public static final int HEARTBEAT_INTERVAL_SECOND = 60;
+    public static final String INLONG_AGENT_SYSTEM = "inlong_agent_system";
 
     private static HeartbeatManager heartbeatManager = null;
     private final AgentConfiguration conf;
     private final HttpManager httpManager;
     private final String baseManagerUrl;
     private final String reportHeartbeatUrl;
+    private DefaultMessageSender sender;
 
     /**
      * Init heartbeat manager.
@@ -64,7 +74,9 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         httpManager = new HttpManager(conf);
         baseManagerUrl = httpManager.getBaseUrl();
         reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
+        createMessageSender();
         AgentStatusManager.getInstance(agentManager);
+        FileStaticManager.getInstance(agentManager);
     }
 
     public static HeartbeatManager getInstance(AgentManager agentManager) {
@@ -111,9 +123,11 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug(" {} report heartbeat to manager", 
heartbeatMsg);
                     }
-                    List<String> fields = 
AgentStatusManager.getInstance().getStatusMessage();
-                    AgentStatusManager.getInstance().sendStatusMsg(fields);
-                    AgentStatusManager.getInstance().printStatusMsg(fields);
+                    if (sender == null) {
+                        createMessageSender();
+                    }
+                    AgentStatusManager.getInstance().sendStatusMsg(sender);
+                    FileStaticManager.getInstance().sendStaticMsg(sender);
                 } catch (Throwable e) {
                     LOGGER.error("interrupted while report heartbeat", e);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
e);
@@ -181,4 +195,23 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
     private String buildReportHeartbeatUrl(String baseUrl) {
         return baseUrl + conf.get(AGENT_MANAGER_HEARTBEAT_HTTP_PATH, 
DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH);
     }
+
+    private void createMessageSender() {
+        String managerAddr = conf.get(AGENT_MANAGER_ADDR);
+        String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
+        String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
+        ProxyClientConfig proxyClientConfig = null;
+        try {
+            proxyClientConfig = new ProxyClientConfig(managerAddr, 
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
+            
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+            
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
+            
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
+            proxyClientConfig.setProtocolType(ProtocolType.TCP);
+            ThreadFactory SHARED_FACTORY = new 
DefaultThreadFactory("agent-sender-manager-heartbeat",
+                    Thread.currentThread().isDaemon());
+            sender = new DefaultMessageSender(proxyClientConfig, 
SHARED_FACTORY);
+        } catch (Exception e) {
+            LOGGER.error("heartbeat manager create sdk failed: ", e);
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 631e8a889e..3167b850e1 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -106,9 +106,6 @@ public class OffsetManager extends AbstractDaemon {
      * get taskPositionManager singleton
      */
     public static OffsetManager getInstance() {
-        if (offsetManager == null) {
-            throw new RuntimeException("task position manager has not been 
initialized by agentManager");
-        }
         return offsetManager;
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 376200e475..6624ee819a 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -18,9 +18,13 @@
 package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.DataCollectType;
 import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.FileStaticManager;
+import org.apache.inlong.agent.core.FileStaticManager.FileStatic;
+import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.file.Reader;
@@ -38,9 +42,12 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.LineNumberReader;
 import java.io.RandomAccessFile;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.inlong.agent.constant.TaskConstants.SOURCE_DATA_CONTENT_STYLE;
+
 /**
  * Read text files
  */
@@ -49,6 +56,7 @@ public class LogFileSource extends AbstractSource {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LogFileSource.class);
     private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
     private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
+    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss"); // 设置格式
 
     private String fileName;
     private File file;
@@ -316,6 +324,23 @@ public class LogFileSource extends AbstractSource {
     protected void releaseSource() {
         if (randomAccessFile != null) {
             try {
+                if (FileStaticManager.getInstance() == null) {
+                    return;
+                }
+                FileStatic data = new FileStatic();
+                data.setTaskId(taskId);
+                data.setRetry(String.valueOf(profile.isRetry()));
+                data.setContentType(profile.get(SOURCE_DATA_CONTENT_STYLE));
+                data.setGroupId(profile.getInlongGroupId());
+                data.setStreamId(profile.getInlongStreamId());
+                data.setDataTime(format.format(profile.getSinkDataTime()));
+                data.setFileName(profile.getInstanceId());
+                data.setFileLen(String.valueOf(randomAccessFile.length()));
+                data.setReadBytes(String.valueOf(bytePosition));
+                data.setReadLines(String.valueOf(linePosition));
+                OffsetProfile offsetProfile = 
OffsetManager.getInstance().getOffset(taskId, instanceId);
+                data.setSendLines(offsetProfile.getOffset());
+                FileStaticManager.getInstance().putStaticMsg(data);
                 randomAccessFile.close();
             } catch (IOException e) {
                 LOGGER.error("close randomAccessFile error", e);

Reply via email to