This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0b2646a7d3 [INLONG-10756][Agent] Report file metrics for backend
problem analysis (#10757)
0b2646a7d3 is described below
commit 0b2646a7d31c2931d46f0bd3c50bc0f7993109f0
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Aug 7 12:12:03 2024 +0800
[INLONG-10756][Agent] Report file metrics for backend problem analysis
(#10757)
* [INLONG-10756][Agent] Report file metrics for backend problem analysis
* [INLONG-10756][Agent] Report file metrics for backend problem analysis
* [INLONG-10756][Agent] Report file metrics for backend problem analysis
---
.../inlong/agent/core/AgentStatusManager.java | 219 ++++++++++-----------
.../inlong/agent/core/FileStaticManager.java | 141 +++++++++++++
.../apache/inlong/agent/core/HeartbeatManager.java | 41 +++-
.../inlong/agent/core/task/OffsetManager.java | 3 -
.../inlong/agent/plugin/sources/LogFileSource.java | 25 +++
5 files changed, 308 insertions(+), 121 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
index fe6f4353aa..39dde5d934 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
@@ -18,34 +18,30 @@
package org.apache.inlong.agent.core;
import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.core.task.MemoryManager;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ExcuteLinux;
-import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import com.google.common.collect.Lists;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import org.apache.commons.lang3.StringUtils;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
-import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -55,25 +51,85 @@ import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_INSTALL_PLAT
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
/**
* Collect various indicators of agent processes for backend problem analysis
*/
public class AgentStatusManager {
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class AgentStatus {
+
+ private String agentIp;
+ private String tag;
+ private String cluster;
+ private String agentVersion;
+ private String agentStartTime;
+ private String cpuCore;
+ private String procCpu;
+ private String freeMem;
+ private String maxMem;
+ private String useMem;
+ private String os;
+ private String installPlatform;
+ private String usrDir;
+ private String usrName;
+ private String processId;
+ private String globalConfigMd5;
+ private String taskMd5;
+ private String taskNum;
+ private String instanceNum;
+ private String bootTime;
+ private String sendPackageCount;
+ private String sendDataLen;
+ private String sourcePermitLeft;
+ private String queuePermitLeft;
+ private String writerPermitLeft;
+ private String activeThreadCount;
+
+ public String getFieldsString() {
+ List<String> fields = Lists.newArrayList();
+ fields.add(agentIp);
+ fields.add(tag);
+ fields.add(cluster);
+ fields.add(agentVersion);
+ fields.add(agentStartTime);
+ fields.add(cpuCore);
+ fields.add(procCpu);
+ fields.add(freeMem);
+ fields.add(maxMem);
+ fields.add(useMem);
+ fields.add(os);
+ fields.add(installPlatform);
+ fields.add(usrDir);
+ fields.add(usrName);
+ fields.add(processId);
+ fields.add(globalConfigMd5);
+ fields.add(taskMd5);
+ fields.add(taskNum);
+ fields.add(instanceNum);
+ fields.add(bootTime);
+ fields.add(sendPackageCount);
+ fields.add(sendDataLen);
+ fields.add(sourcePermitLeft);
+ fields.add(queuePermitLeft);
+ fields.add(writerPermitLeft);
+ fields.add(activeThreadCount);
+ return Strings.join(fields, ',');
+ }
+ }
+
private static final Logger LOGGER =
LoggerFactory.getLogger(AgentStatusManager.class);
public static final String INLONG_AGENT_SYSTEM = "inlong_agent_system";
public static final String INLONG_AGENT_STATUS = "inlong_agent_status";
private static AgentStatusManager manager = null;
private final AgentConfiguration conf;
- private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss"); // 设置格式
+ private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
private Runtime runtime = Runtime.getRuntime();
final long GB = 1024 * 1024 * 1024;
- private OperatingSystemMXBean osMxBean;
private ThreadMXBean threadBean;
private final long INVALID_CPU = -1;
private RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
@@ -81,18 +137,13 @@ public class AgentStatusManager {
private AgentManager agentManager;
public static AtomicLong sendDataLen = new AtomicLong();
public static AtomicLong sendPackageCount = new AtomicLong();
- private DefaultMessageSender sender;
- private List<String> statusFieldsPre = Lists.newArrayList();
private String processStartupTime =
format.format(runtimeMXBean.getStartTime());
- private String systemStartupTime = ExcuteLinux.exeCmd("who -b|awk '{print
$(NF-1), $NF}'").replaceAll("\r|\n", "");
+ private String systemStartupTime = ExcuteLinux.exeCmd("uptime
-s").replaceAll("\r|\n", "");
private AgentStatusManager(AgentManager agentManager) {
this.agentManager = agentManager;
this.conf = AgentConfiguration.getAgentConf();
- osMxBean = ManagementFactory.getOperatingSystemMXBean();
threadBean = ManagementFactory.getThreadMXBean();
- initStatusFieldsPre();
- createMessageSender();
}
public static AgentStatusManager getInstance(AgentManager agentManager) {
@@ -113,16 +164,13 @@ public class AgentStatusManager {
return manager;
}
- public void sendStatusMsg(List<String> fields) {
+ public void sendStatusMsg(DefaultMessageSender sender) {
+ AgentStatus data = AgentStatusManager.getInstance().getStatus();
+ LOGGER.info("status detail: {}", data);
if (sender == null) {
- LOGGER.error("sender is null");
- createMessageSender();
- if (sender == null) {
- return;
- }
+ return;
}
- SendResult ret = sender.sendMessage(
- StringUtils.join(fields, ",").getBytes(StandardCharsets.UTF_8),
+ SendResult ret =
sender.sendMessage(data.getFieldsString().getBytes(StandardCharsets.UTF_8),
INLONG_AGENT_SYSTEM,
INLONG_AGENT_STATUS,
AgentUtils.getCurrentTime(),
@@ -132,33 +180,6 @@ public class AgentStatusManager {
}
}
- public void printStatusMsg(List<String> fields) {
- List<String> toPrint = new ArrayList<>();
- for (int i = 0; i < statusFieldsPre.size(); i++) {
- toPrint.add(statusFieldsPre.get(i) + ": " + fields.get(i));
- }
- LOGGER.info("status detail:\n{}", StringUtils.join(toPrint, "\n"));
- }
-
- private void createMessageSender() {
- String managerAddr = conf.get(AGENT_MANAGER_ADDR);
- String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
- String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
- ProxyClientConfig proxyClientConfig = null;
- try {
- proxyClientConfig = new ProxyClientConfig(managerAddr,
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
-
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
-
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
-
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
- proxyClientConfig.setProtocolType(ProtocolType.TCP);
- ThreadFactory SHARED_FACTORY = new
DefaultThreadFactory("agent-sender-manager-heartbeat",
- Thread.currentThread().isDaemon());
- sender = new DefaultMessageSender(proxyClientConfig,
SHARED_FACTORY);
- } catch (Exception e) {
- LOGGER.error("heartbeat manager create sdk failed: ", e);
- }
- }
-
private double getProcessCpu() {
double cpu = tryGetProcessCpu();
int tryTimes = 0;
@@ -199,68 +220,38 @@ public class AgentStatusManager {
return (((double) usedTime) / totalPassedTime) * 100;
}
- public List<String> getStatusMessage() {
- List<String> fields = Lists.newArrayList();
- fields.add(AgentUtils.fetchLocalIp());
- fields.add(conf.get(AGENT_CLUSTER_NAME));
- fields.add(conf.get(AGENT_CLUSTER_TAG));
- fields.add(TaskManager.class.getPackage().getImplementationVersion());
- fields.add(processStartupTime);
- fields.add(String.valueOf(runtime.availableProcessors()));
- fields.add(String.valueOf(twoDecimal(getProcessCpu())));
- fields.add(String.valueOf(twoDecimal((double) runtime.freeMemory() /
GB)));
- fields.add(String.valueOf(twoDecimal((double) runtime.maxMemory() /
GB)));
- fields.add(String.valueOf(twoDecimal((double) runtime.totalMemory() /
GB)));
- fields.add(System.getProperty("os.version"));
- fields.add(conf.get(AGENT_INSTALL_PLATFORM, ""));
- fields.add(System.getProperty("user.dir"));
- fields.add(System.getProperty("user.name"));
- fields.add(String.valueOf(getProcessId()));
+ private AgentStatus getStatus() {
+ AgentStatus data = new AgentStatus();
+ data.setAgentIp(AgentUtils.fetchLocalIp());
+ data.setTag(conf.get(AGENT_CLUSTER_TAG));
+ data.setCluster(conf.get(AGENT_CLUSTER_NAME));
+
data.setAgentVersion(TaskManager.class.getPackage().getImplementationVersion());
+ data.setAgentStartTime(processStartupTime);
+ data.setCpuCore(String.valueOf(runtime.availableProcessors()));
+ data.setProcCpu(String.valueOf(twoDecimal(getProcessCpu())));
+ data.setFreeMem(String.valueOf(twoDecimal((double)
runtime.freeMemory() / GB)));
+ data.setMaxMem(String.valueOf(twoDecimal((double) runtime.maxMemory()
/ GB)));
+ data.setUseMem(String.valueOf(twoDecimal((double)
runtime.totalMemory() / GB)));
+ data.setOs(System.getProperty("os.version"));
+ data.setInstallPlatform(conf.get(AGENT_INSTALL_PLATFORM, ""));
+ data.setUsrDir(System.getProperty("user.dir"));
+ data.setUsrName(System.getProperty("user.name"));
+ data.setProcessId(String.valueOf(getProcessId()));
if (AgentManager.getAgentConfigInfo() != null) {
- fields.add(AgentManager.getAgentConfigInfo().getMd5());
- } else {
- fields.add("");
+
data.setGlobalConfigMd5(AgentManager.getAgentConfigInfo().getMd5());
}
- fields.add(agentManager.getTaskManager().getTaskResultMd5());
-
fields.add(String.valueOf(agentManager.getTaskManager().getTaskStore().getTasks().size()));
-
fields.add(String.valueOf(OffsetManager.getInstance().getRunningInstanceCount()));
- fields.add(systemStartupTime);
- fields.add(String.valueOf(sendPackageCount.getAndSet(0)));
- fields.add(String.valueOf(sendDataLen.getAndSet(0)));
-
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_SOURCE_PERMIT)));
-
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT)));
-
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_WRITER_PERMIT)));
- fields.add(String.valueOf(threadMXBean.getThreadCount()));
- return fields;
- }
-
- private void initStatusFieldsPre() {
- statusFieldsPre.add("ip: ");
- statusFieldsPre.add("cluster: ");
- statusFieldsPre.add("tag: ");
- statusFieldsPre.add("agent version: ");
- statusFieldsPre.add("agent start time: ");
- statusFieldsPre.add("cpu core: ");
- statusFieldsPre.add("proc cpu: ");
- statusFieldsPre.add("free mem: ");
- statusFieldsPre.add("max mem: ");
- statusFieldsPre.add("use mem: ");
- statusFieldsPre.add("os: ");
- statusFieldsPre.add("install platform: ");
- statusFieldsPre.add("usr dir: ");
- statusFieldsPre.add("usr name: ");
- statusFieldsPre.add("process id: ");
- statusFieldsPre.add("global config md5: ");
- statusFieldsPre.add("task md5: ");
- statusFieldsPre.add("task num: ");
- statusFieldsPre.add("instance num: ");
- statusFieldsPre.add("boot time: ");
- statusFieldsPre.add("send package count: ");
- statusFieldsPre.add("send data len: ");
- statusFieldsPre.add("source permit left: ");
- statusFieldsPre.add("queue permit left: ");
- statusFieldsPre.add("writer permit left: ");
- statusFieldsPre.add("active thread count: ");
+ data.setTaskMd5(agentManager.getTaskManager().getTaskResultMd5());
+
data.setTaskNum(String.valueOf(agentManager.getTaskManager().getTaskStore().getTasks().size()));
+
data.setInstanceNum(String.valueOf(OffsetManager.getInstance().getRunningInstanceCount()));
+ data.setBootTime(systemStartupTime);
+
data.setSendPackageCount(String.valueOf(sendPackageCount.getAndSet(0)));
+ data.setSendDataLen(String.valueOf(sendDataLen.getAndSet(0)));
+ data.setSourcePermitLeft(
+
String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_SOURCE_PERMIT)));
+
data.setQueuePermitLeft(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT)));
+
data.setWriterPermitLeft(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_WRITER_PERMIT)));
+
data.setActiveThreadCount(String.valueOf(threadMXBean.getThreadCount()));
+ return data;
}
public double twoDecimal(double doubleValue) {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
new file mode 100644
index 0000000000..7acd32941b
--- /dev/null
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
+
+import com.google.common.collect.Lists;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.logging.log4j.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
+
+/**
+ * Collect various indicators of agent processes for backend problem analysis
+ */
+public class FileStaticManager {
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class FileStatic {
+
+ private String agentIp;
+ private String tag;
+ private String cluster;
+ private String taskId;
+ private String retry;
+ private String contentType;
+ private String groupId;
+ private String streamId;
+ private String dataTime;
+ private String fileName;
+ private String fileLen;
+ private String readBytes;
+ private String readLines;
+ private String sendLines;
+
+ public String getFieldsString() {
+ List<String> fields = Lists.newArrayList();
+ fields.add(agentIp);
+ fields.add(tag);
+ fields.add(cluster);
+ fields.add(taskId);
+ fields.add(retry);
+ fields.add(contentType);
+ fields.add(groupId);
+ fields.add(streamId);
+ fields.add(dataTime);
+ fields.add(fileName);
+ fields.add(fileLen);
+ fields.add(readBytes);
+ fields.add(readLines);
+ fields.add(sendLines);
+ return Strings.join(fields, ',');
+ }
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileStaticManager.class);
+ public static final String INLONG_AGENT_SYSTEM = "inlong_agent_system";
+ public static final String INLONG_FILE_STATIC = "inlong_agent_file_static";
+ protected final Integer CACHE_QUEUE_SIZE = 10000;
+ private static FileStaticManager manager = null;
+ private final AgentConfiguration conf;
+ protected BlockingQueue<FileStatic> queue;
+
+ private FileStaticManager(AgentManager agentManager) {
+ this.conf = AgentConfiguration.getAgentConf();
+ queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+ }
+
+ public static FileStaticManager getInstance(AgentManager agentManager) {
+ if (manager == null) {
+ synchronized (FileStaticManager.class) {
+ if (manager == null) {
+ manager = new FileStaticManager(agentManager);
+ }
+ }
+ }
+ return manager;
+ }
+
+ public static FileStaticManager getInstance() {
+ return manager;
+ }
+
+ public void putStaticMsg(FileStatic data) {
+ data.setAgentIp(AgentUtils.fetchLocalIp());
+ data.setTag(conf.get(AGENT_CLUSTER_TAG));
+ data.setCluster(conf.get(AGENT_CLUSTER_NAME));
+ while (!queue.offer(data)) {
+ LOGGER.error("file static queue is full remove {}", queue.poll());
+ }
+ }
+
+ public void sendStaticMsg(DefaultMessageSender sender) {
+ while (!queue.isEmpty()) {
+ FileStatic data = queue.poll();
+ LOGGER.info("file static detail: {}", data);
+ if (sender == null) {
+ continue;
+ }
+ SendResult ret =
sender.sendMessage(data.getFieldsString().getBytes(StandardCharsets.UTF_8),
+ INLONG_AGENT_SYSTEM,
+ INLONG_FILE_STATIC,
+ AgentUtils.getCurrentTime(),
+ "", 30, TimeUnit.SECONDS);
+ if (ret != SendResult.OK) {
+ LOGGER.error("send static failed: ret {}", ret);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 05886ebd8a..1f833275eb 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -19,25 +19,33 @@ package org.apache.inlong.agent.core;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.core.task.MemoryManager;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
+import java.util.concurrent.ThreadFactory;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_CHARGES;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
@@ -49,12 +57,14 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
private static final Logger LOGGER =
LoggerFactory.getLogger(HeartbeatManager.class);
public static final int PRINT_MEMORY_PERMIT_INTERVAL_SECOND = 60;
public static final int HEARTBEAT_INTERVAL_SECOND = 60;
+ public static final String INLONG_AGENT_SYSTEM = "inlong_agent_system";
private static HeartbeatManager heartbeatManager = null;
private final AgentConfiguration conf;
private final HttpManager httpManager;
private final String baseManagerUrl;
private final String reportHeartbeatUrl;
+ private DefaultMessageSender sender;
/**
* Init heartbeat manager.
@@ -64,7 +74,9 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
httpManager = new HttpManager(conf);
baseManagerUrl = httpManager.getBaseUrl();
reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
+ createMessageSender();
AgentStatusManager.getInstance(agentManager);
+ FileStaticManager.getInstance(agentManager);
}
public static HeartbeatManager getInstance(AgentManager agentManager) {
@@ -111,9 +123,11 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(" {} report heartbeat to manager",
heartbeatMsg);
}
- List<String> fields =
AgentStatusManager.getInstance().getStatusMessage();
- AgentStatusManager.getInstance().sendStatusMsg(fields);
- AgentStatusManager.getInstance().printStatusMsg(fields);
+ if (sender == null) {
+ createMessageSender();
+ }
+ AgentStatusManager.getInstance().sendStatusMsg(sender);
+ FileStaticManager.getInstance().sendStaticMsg(sender);
} catch (Throwable e) {
LOGGER.error("interrupted while report heartbeat", e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
e);
@@ -181,4 +195,23 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
private String buildReportHeartbeatUrl(String baseUrl) {
return baseUrl + conf.get(AGENT_MANAGER_HEARTBEAT_HTTP_PATH,
DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH);
}
+
+ private void createMessageSender() {
+ String managerAddr = conf.get(AGENT_MANAGER_ADDR);
+ String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
+ String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
+ ProxyClientConfig proxyClientConfig = null;
+ try {
+ proxyClientConfig = new ProxyClientConfig(managerAddr,
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
+
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
+
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
+ proxyClientConfig.setProtocolType(ProtocolType.TCP);
+ ThreadFactory SHARED_FACTORY = new
DefaultThreadFactory("agent-sender-manager-heartbeat",
+ Thread.currentThread().isDaemon());
+ sender = new DefaultMessageSender(proxyClientConfig,
SHARED_FACTORY);
+ } catch (Exception e) {
+ LOGGER.error("heartbeat manager create sdk failed: ", e);
+ }
+ }
}
\ No newline at end of file
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 631e8a889e..3167b850e1 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -106,9 +106,6 @@ public class OffsetManager extends AbstractDaemon {
* get taskPositionManager singleton
*/
public static OffsetManager getInstance() {
- if (offsetManager == null) {
- throw new RuntimeException("task position manager has not been
initialized by agentManager");
- }
return offsetManager;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 376200e475..6624ee819a 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -18,9 +18,13 @@
package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.DataCollectType;
import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.FileStaticManager;
+import org.apache.inlong.agent.core.FileStaticManager.FileStatic;
+import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.file.Reader;
@@ -38,9 +42,12 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.RandomAccessFile;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
+import static
org.apache.inlong.agent.constant.TaskConstants.SOURCE_DATA_CONTENT_STYLE;
+
/**
* Read text files
*/
@@ -49,6 +56,7 @@ public class LogFileSource extends AbstractSource {
private static final Logger LOGGER =
LoggerFactory.getLogger(LogFileSource.class);
private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
+ private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss"); // 设置格式
private String fileName;
private File file;
@@ -316,6 +324,23 @@ public class LogFileSource extends AbstractSource {
protected void releaseSource() {
if (randomAccessFile != null) {
try {
+ if (FileStaticManager.getInstance() == null) {
+ return;
+ }
+ FileStatic data = new FileStatic();
+ data.setTaskId(taskId);
+ data.setRetry(String.valueOf(profile.isRetry()));
+ data.setContentType(profile.get(SOURCE_DATA_CONTENT_STYLE));
+ data.setGroupId(profile.getInlongGroupId());
+ data.setStreamId(profile.getInlongStreamId());
+ data.setDataTime(format.format(profile.getSinkDataTime()));
+ data.setFileName(profile.getInstanceId());
+ data.setFileLen(String.valueOf(randomAccessFile.length()));
+ data.setReadBytes(String.valueOf(bytePosition));
+ data.setReadLines(String.valueOf(linePosition));
+ OffsetProfile offsetProfile =
OffsetManager.getInstance().getOffset(taskId, instanceId);
+ data.setSendLines(offsetProfile.getOffset());
+ FileStaticManager.getInstance().putStaticMsg(data);
randomAccessFile.close();
} catch (IOException e) {
LOGGER.error("close randomAccessFile error", e);