This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c37d4cbb70 [INLONG-10751][Agent] Report Agent process status for
backend problem analysis (#10752)
c37d4cbb70 is described below
commit c37d4cbb70694e6aa3d1cf43563a6caaf5d5a1f5
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Aug 6 14:59:03 2024 +0800
[INLONG-10751][Agent] Report Agent process status for backend problem
analysis (#10752)
* [INLONG-10751][Agent] Report Agent process status for backend problem
analysis
* [INLONG-10751][Agent] Report Agent process status for backend problem
analysis
* [INLONG-10751][Agent] Report Agent process status for backend problem
analysis
* [INLONG-10751][Agent] Report Agent process status for backend problem
analysis
---
.../inlong/agent/constant/AgentConstants.java | 1 +
.../apache/inlong/agent/store/InstanceStore.java | 12 +
inlong-agent/agent-core/pom.xml | 15 ++
.../inlong/agent/core/AgentStatusManager.java | 281 +++++++++++++++++++++
.../apache/inlong/agent/core/HeartbeatManager.java | 31 +--
.../inlong/agent/core/task/OffsetManager.java | 4 +
.../apache/inlong/agent/core/task/TaskManager.java | 18 ++
.../agent/plugin/fetcher/ManagerFetcher.java | 26 +-
.../plugin/sinks/filecollect/SenderManager.java | 9 +-
.../sinks/filecollect/TestSenderManager.java | 2 +-
10 files changed, 358 insertions(+), 41 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index dceb443b03..ec5831973b 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -56,6 +56,7 @@ public class AgentConstants {
public static final String AGENT_CLUSTER_NAME = "agent.cluster.name";
public static final String AGENT_CLUSTER_TAG = "agent.cluster.tag";
public static final String AGENT_CLUSTER_IN_CHARGES =
"agent.cluster.inCharges";
+ public static final String AGENT_INSTALL_PLATFORM =
"agent.install.platform";
public static final String AGENT_LOCAL_UUID = "agent.local.uuid";
public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
index 2551734134..476d014e57 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.store;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.common.enums.InstanceStateEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +55,17 @@ public class InstanceStore {
return instanceList;
}
+ public int getRunningInstanceCount() {
+ List<KeyValueEntity> result = this.store.findAll(store.getUniqueKey());
+ int count = 0;
+ for (KeyValueEntity entity : result) {
+ if (entity.getAsInstanceProfile().getState() ==
InstanceStateEnum.DEFAULT) {
+ count++;
+ }
+ }
+ return count;
+ }
+
/**
* get instance list from instance store.
*
diff --git a/inlong-agent/agent-core/pom.xml b/inlong-agent/agent-core/pom.xml
index b314167960..2d7c1334cb 100755
--- a/inlong-agent/agent-core/pom.xml
+++ b/inlong-agent/agent-core/pom.xml
@@ -45,6 +45,21 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>dataproxy-sdk</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
new file mode 100644
index 0000000000..fe6f4353aa
--- /dev/null
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ExcuteLinux;
+import org.apache.inlong.common.constant.ProtocolType;
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
+
+import com.google.common.collect.Lists;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_INSTALL_PLATFORM;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
+
+/**
+ * Collect various indicators of agent processes for backend problem analysis
+ */
+public class AgentStatusManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AgentStatusManager.class);
+ public static final String INLONG_AGENT_SYSTEM = "inlong_agent_system";
+ public static final String INLONG_AGENT_STATUS = "inlong_agent_status";
+
+ private static AgentStatusManager manager = null;
+ private final AgentConfiguration conf;
+ private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss"); // 设置格式
+ private Runtime runtime = Runtime.getRuntime();
+ final long GB = 1024 * 1024 * 1024;
+ private OperatingSystemMXBean osMxBean;
+ private ThreadMXBean threadBean;
+ private final long INVALID_CPU = -1;
+ private RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+ private ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+ private AgentManager agentManager;
+ public static AtomicLong sendDataLen = new AtomicLong();
+ public static AtomicLong sendPackageCount = new AtomicLong();
+ private DefaultMessageSender sender;
+ private List<String> statusFieldsPre = Lists.newArrayList();
+ private String processStartupTime =
format.format(runtimeMXBean.getStartTime());
+ private String systemStartupTime = ExcuteLinux.exeCmd("who -b|awk '{print
$(NF-1), $NF}'").replaceAll("\r|\n", "");
+
+ private AgentStatusManager(AgentManager agentManager) {
+ this.agentManager = agentManager;
+ this.conf = AgentConfiguration.getAgentConf();
+ osMxBean = ManagementFactory.getOperatingSystemMXBean();
+ threadBean = ManagementFactory.getThreadMXBean();
+ initStatusFieldsPre();
+ createMessageSender();
+ }
+
+ public static AgentStatusManager getInstance(AgentManager agentManager) {
+ if (manager == null) {
+ synchronized (AgentStatusManager.class) {
+ if (manager == null) {
+ manager = new AgentStatusManager(agentManager);
+ }
+ }
+ }
+ return manager;
+ }
+
+ public static AgentStatusManager getInstance() {
+ if (manager == null) {
+ throw new RuntimeException("HeartbeatManager has not been
initialized by agentManager");
+ }
+ return manager;
+ }
+
+ public void sendStatusMsg(List<String> fields) {
+ if (sender == null) {
+ LOGGER.error("sender is null");
+ createMessageSender();
+ if (sender == null) {
+ return;
+ }
+ }
+ SendResult ret = sender.sendMessage(
+ StringUtils.join(fields, ",").getBytes(StandardCharsets.UTF_8),
+ INLONG_AGENT_SYSTEM,
+ INLONG_AGENT_STATUS,
+ AgentUtils.getCurrentTime(),
+ "", 30, TimeUnit.SECONDS);
+ if (ret != SendResult.OK) {
+ LOGGER.error("send status failed: ret {}", ret);
+ }
+ }
+
+ public void printStatusMsg(List<String> fields) {
+ List<String> toPrint = new ArrayList<>();
+ for (int i = 0; i < statusFieldsPre.size(); i++) {
+ toPrint.add(statusFieldsPre.get(i) + ": " + fields.get(i));
+ }
+ LOGGER.info("status detail:\n{}", StringUtils.join(toPrint, "\n"));
+ }
+
+ private void createMessageSender() {
+ String managerAddr = conf.get(AGENT_MANAGER_ADDR);
+ String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
+ String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
+ ProxyClientConfig proxyClientConfig = null;
+ try {
+ proxyClientConfig = new ProxyClientConfig(managerAddr,
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
+
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
+
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
+ proxyClientConfig.setProtocolType(ProtocolType.TCP);
+ ThreadFactory SHARED_FACTORY = new
DefaultThreadFactory("agent-sender-manager-heartbeat",
+ Thread.currentThread().isDaemon());
+ sender = new DefaultMessageSender(proxyClientConfig,
SHARED_FACTORY);
+ } catch (Exception e) {
+ LOGGER.error("heartbeat manager create sdk failed: ", e);
+ }
+ }
+
+ private double getProcessCpu() {
+ double cpu = tryGetProcessCpu();
+ int tryTimes = 0;
+ while (cpu < 0 && tryTimes < 10) {
+ cpu = tryGetProcessCpu();
+ tryTimes++;
+ }
+ return cpu;
+ }
+
+ private double tryGetProcessCpu() {
+ long[] startThreads = threadBean.getAllThreadIds();
+ long startTime = System.nanoTime();
+ long startUseTime = 0;
+ long temp;
+ for (long id : startThreads) {
+ temp = threadBean.getThreadCpuTime(id);
+ if (temp >= 0) {
+ startUseTime += temp;
+ } else {
+ return INVALID_CPU;
+ }
+ }
+ AgentUtils.silenceSleepInMs(5000);
+ long[] endThreads = threadBean.getAllThreadIds();
+ long endTime = System.nanoTime();
+ long endUseTime = 0;
+ for (long id : endThreads) {
+ temp = threadBean.getThreadCpuTime(id);
+ if (temp >= 0) {
+ endUseTime += temp;
+ } else {
+ return INVALID_CPU;
+ }
+ }
+ long usedTime = endUseTime - startUseTime;
+ long totalPassedTime = endTime - startTime;
+ return (((double) usedTime) / totalPassedTime) * 100;
+ }
+
+ public List<String> getStatusMessage() {
+ List<String> fields = Lists.newArrayList();
+ fields.add(AgentUtils.fetchLocalIp());
+ fields.add(conf.get(AGENT_CLUSTER_NAME));
+ fields.add(conf.get(AGENT_CLUSTER_TAG));
+ fields.add(TaskManager.class.getPackage().getImplementationVersion());
+ fields.add(processStartupTime);
+ fields.add(String.valueOf(runtime.availableProcessors()));
+ fields.add(String.valueOf(twoDecimal(getProcessCpu())));
+ fields.add(String.valueOf(twoDecimal((double) runtime.freeMemory() /
GB)));
+ fields.add(String.valueOf(twoDecimal((double) runtime.maxMemory() /
GB)));
+ fields.add(String.valueOf(twoDecimal((double) runtime.totalMemory() /
GB)));
+ fields.add(System.getProperty("os.version"));
+ fields.add(conf.get(AGENT_INSTALL_PLATFORM, ""));
+ fields.add(System.getProperty("user.dir"));
+ fields.add(System.getProperty("user.name"));
+ fields.add(String.valueOf(getProcessId()));
+ if (AgentManager.getAgentConfigInfo() != null) {
+ fields.add(AgentManager.getAgentConfigInfo().getMd5());
+ } else {
+ fields.add("");
+ }
+ fields.add(agentManager.getTaskManager().getTaskResultMd5());
+
fields.add(String.valueOf(agentManager.getTaskManager().getTaskStore().getTasks().size()));
+
fields.add(String.valueOf(OffsetManager.getInstance().getRunningInstanceCount()));
+ fields.add(systemStartupTime);
+ fields.add(String.valueOf(sendPackageCount.getAndSet(0)));
+ fields.add(String.valueOf(sendDataLen.getAndSet(0)));
+
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_SOURCE_PERMIT)));
+
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT)));
+
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_WRITER_PERMIT)));
+ fields.add(String.valueOf(threadMXBean.getThreadCount()));
+ return fields;
+ }
+
+ private void initStatusFieldsPre() {
+ statusFieldsPre.add("ip: ");
+ statusFieldsPre.add("cluster: ");
+ statusFieldsPre.add("tag: ");
+ statusFieldsPre.add("agent version: ");
+ statusFieldsPre.add("agent start time: ");
+ statusFieldsPre.add("cpu core: ");
+ statusFieldsPre.add("proc cpu: ");
+ statusFieldsPre.add("free mem: ");
+ statusFieldsPre.add("max mem: ");
+ statusFieldsPre.add("use mem: ");
+ statusFieldsPre.add("os: ");
+ statusFieldsPre.add("install platform: ");
+ statusFieldsPre.add("usr dir: ");
+ statusFieldsPre.add("usr name: ");
+ statusFieldsPre.add("process id: ");
+ statusFieldsPre.add("global config md5: ");
+ statusFieldsPre.add("task md5: ");
+ statusFieldsPre.add("task num: ");
+ statusFieldsPre.add("instance num: ");
+ statusFieldsPre.add("boot time: ");
+ statusFieldsPre.add("send package count: ");
+ statusFieldsPre.add("send data len: ");
+ statusFieldsPre.add("source permit left: ");
+ statusFieldsPre.add("queue permit left: ");
+ statusFieldsPre.add("writer permit left: ");
+ statusFieldsPre.add("active thread count: ");
+ }
+
+ public double twoDecimal(double doubleValue) {
+ BigDecimal bigDecimal = new BigDecimal(doubleValue).setScale(2,
RoundingMode.HALF_UP);
+ return bigDecimal.doubleValue();
+ }
+
+ private Long getProcessId() {
+ try {
+ RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+ String name = runtime.getName();
+ String pid = name.substring(0, name.indexOf('@'));
+ return Long.parseLong(pid);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index d9879586f8..05886ebd8a 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.core;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.core.task.MemoryManager;
-import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
@@ -33,9 +32,8 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.regex.Pattern;
+import java.util.List;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_CHARGES;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
@@ -50,32 +48,23 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
private static final Logger LOGGER =
LoggerFactory.getLogger(HeartbeatManager.class);
public static final int PRINT_MEMORY_PERMIT_INTERVAL_SECOND = 60;
+ public static final int HEARTBEAT_INTERVAL_SECOND = 60;
+
private static HeartbeatManager heartbeatManager = null;
- private final TaskManager taskManager;
private final AgentConfiguration conf;
private final HttpManager httpManager;
private final String baseManagerUrl;
private final String reportHeartbeatUrl;
- private final Pattern numberPattern = Pattern.compile("^[-+]?[\\d]*$");
/**
* Init heartbeat manager.
*/
private HeartbeatManager(AgentManager agentManager) {
this.conf = AgentConfiguration.getAgentConf();
- taskManager = agentManager.getTaskManager();
httpManager = new HttpManager(conf);
baseManagerUrl = httpManager.getBaseUrl();
reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
- }
-
- private HeartbeatManager() {
- conf = AgentConfiguration.getAgentConf();
- httpManager = new HttpManager(conf);
- baseManagerUrl = httpManager.getBaseUrl();
- reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
-
- taskManager = null;
+ AgentStatusManager.getInstance(agentManager);
}
public static HeartbeatManager getInstance(AgentManager agentManager) {
@@ -122,10 +111,14 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(" {} report heartbeat to manager",
heartbeatMsg);
}
- SECONDS.sleep(heartbeatInterval());
+ List<String> fields =
AgentStatusManager.getInstance().getStatusMessage();
+ AgentStatusManager.getInstance().sendStatusMsg(fields);
+ AgentStatusManager.getInstance().printStatusMsg(fields);
} catch (Throwable e) {
LOGGER.error("interrupted while report heartbeat", e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
e);
+ } finally {
+
AgentUtils.silenceSleepInSeconds(HEARTBEAT_INTERVAL_SECOND);
}
}
};
@@ -188,10 +181,4 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
private String buildReportHeartbeatUrl(String baseUrl) {
return baseUrl + conf.get(AGENT_MANAGER_HEARTBEAT_HTTP_PATH,
DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH);
}
-
- public static void main(String[] args) throws Exception {
- HeartbeatManager heartbeatManager = new HeartbeatManager();
-
heartbeatManager.reportHeartbeat(heartbeatManager.buildDeadHeartbeatMsg());
- System.out.println("Success send dead heartbeat message to manager.");
- }
}
\ No newline at end of file
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 05aca28f28..631e8a889e 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -179,6 +179,10 @@ public class OffsetManager extends AbstractDaemon {
}
}
+ public int getRunningInstanceCount() {
+ return instanceStore.getRunningInstanceCount();
+ }
+
@Override
public void start() throws Exception {
submitWorker(coreThread());
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index c1b7da07f3..6db94a546b 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -80,6 +80,8 @@ public class TaskManager extends AbstractDaemon {
private static final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
// instance profile queue.
private final BlockingQueue<TaskAction> actionQueue;
+ private String taskResultMd5;
+ private Integer taskResultVersion = -1;
private class TaskPrintStat {
@@ -529,6 +531,22 @@ public class TaskManager extends AbstractDaemon {
return taskStore.getTask(taskId);
}
+ public String getTaskResultMd5() {
+ return taskResultMd5;
+ }
+
+ public void setTaskResultMd5(String taskResultMd5) {
+ this.taskResultMd5 = taskResultMd5;
+ }
+
+ public Integer getTaskResultVersion() {
+ return taskResultVersion;
+ }
+
+ public void setTaskResultVersion(Integer taskResultVersion) {
+ this.taskResultVersion = taskResultVersion;
+ }
+
@Override
public void start() throws Exception {
restoreFromStore();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 3746c36574..e0125751c3 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -84,10 +84,6 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
private String uuid;
private String clusterTag;
private String clusterName;
- private String taskResultMd5;
- private Integer taskResultVersion = -1;
- private String agentConfigMd5;
- private Integer agentConfigVersion = -1;
public ManagerFetcher(AgentManager agentManager) {
this.agentManager = agentManager;
@@ -167,7 +163,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
public TaskRequest getTaskRequest() {
TaskRequest request = new TaskRequest();
- request.setMd5(taskResultMd5);
+ request.setMd5(agentManager.getTaskManager().getTaskResultMd5());
request.setAgentIp(localIp);
request.setUuid(uuid);
request.setClusterName(clusterName);
@@ -178,7 +174,9 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
public AgentConfigRequest getAgentConfigInfoRequest() {
AgentConfigRequest request = new AgentConfigRequest();
- request.setMd5(agentConfigMd5);
+ if (AgentManager.getAgentConfigInfo() != null) {
+ request.setMd5(AgentManager.getAgentConfigInfo().getMd5());
+ }
request.setClusterTag(clusterTag);
request.setClusterName(clusterName);
request.setIp(localIp);
@@ -197,22 +195,22 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
try {
TaskResult taskResult = getStaticConfig();
if (taskResult != null &&
taskResult.getCode().equals(AgentResponseCode.SUCCESS)
- && taskResultVersion < taskResult.getVersion()) {
+ &&
agentManager.getTaskManager().getTaskResultVersion() < taskResult.getVersion())
{
List<TaskProfile> taskProfiles = new ArrayList<>();
taskResult.getDataConfigs().forEach((config) -> {
TaskProfile profile =
TaskProfile.convertToTaskProfile(config);
taskProfiles.add(profile);
});
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
- taskResultMd5 = taskResult.getMd5();
- taskResultVersion = taskResult.getVersion();
+
agentManager.getTaskManager().setTaskResultMd5(taskResult.getMd5());
+
agentManager.getTaskManager().setTaskResultVersion(taskResult.getVersion());
}
AgentConfigInfo config = getAgentConfigInfo();
- if (config != null &&
config.getCode().equals(AgentResponseCode.SUCCESS)
- && agentConfigVersion < config.getVersion()) {
- agentManager.subNewAgentConfigInfo(config);
- agentConfigMd5 = config.getMd5();
- agentConfigVersion = config.getVersion();
+ if (config != null &&
config.getCode().equals(AgentResponseCode.SUCCESS)) {
+ if (AgentManager.getAgentConfigInfo() == null
+ ||
AgentManager.getAgentConfigInfo().getVersion() < config.getVersion()) {
+ agentManager.subNewAgentConfigInfo(config);
+ }
}
} catch (Throwable ex) {
LOGGER.warn("exception caught", ex);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 3422f3b05e..fff55577c6 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.AgentStatusManager;
import org.apache.inlong.agent.message.file.SenderMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
@@ -154,7 +155,7 @@ public class SenderManager {
}
public void Start() throws Exception {
- createMessageSender(inlongGroupId);
+ createMessageSender();
EXECUTOR_SERVICE.execute(flushResendQueue());
started = true;
}
@@ -194,10 +195,8 @@ public class SenderManager {
/**
* createMessageSender
- *
- * @param tagName we use group id as tag name
*/
- private void createMessageSender(String tagName) throws Exception {
+ private void createMessageSender() throws Exception {
ProxyClientConfig proxyClientConfig = new
ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
@@ -359,6 +358,8 @@ public class SenderManager {
dataTime, message.getMsgCnt(), message.getTotalSize(),
auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, groupId,
streamId,
AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize(), auditVersion);
+
AgentStatusManager.sendPackageCount.addAndGet(message.getMsgCnt());
+
AgentStatusManager.sendDataLen.addAndGet(message.getTotalSize());
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {},
instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId,
dataTime, retry, result);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index bc0aeedbc9..91b3c6c10a 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -87,7 +87,7 @@ public class TestSenderManager {
try {
profile.set(TaskConstants.INODE_INFO,
FileDataUtils.getInodeInfo(profile.getInstanceId()));
SenderManager senderManager = PowerMockito.spy(new
SenderManager(profile, "inlongGroupId", "sourceName"));
- PowerMockito.doNothing().when(senderManager,
"createMessageSender", Mockito.anyString());
+ PowerMockito.doNothing().when(senderManager,
"createMessageSender");
PowerMockito.doAnswer(invocation -> {
SendMessageCallback cb = invocation.getArgument(0);