This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.10 by this push:
new f4148e675f [INLONG-9457][Agent] Add task and instance heartbeat audit
(#9458)
f4148e675f is described below
commit f4148e675f56892d2cda411d71b2937a52c59386
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Dec 11 20:09:39 2023 +0800
[INLONG-9457][Agent] Add task and instance heartbeat audit (#9458)
* [INLONG-9457][Agent] Add task and instance heartbeat audit
* [INLONG-9457][Agent] Add task and instance heartbeat audit
(cherry picked from commit 0dac3fa34493d91fb11626dd1c295a52f55c933f)
---
.../org/apache/inlong/agent/conf/TaskProfile.java | 12 ++++++++++++
.../inlong/agent/metrics/audit/AuditUtils.java | 20 ++++++++++++--------
.../inlong/agent/core/instance/InstanceManager.java | 8 +++++++-
.../inlong/agent/core/task/file/TaskManager.java | 3 +++
.../inlong/agent/plugin/instance/FileInstance.java | 5 +++++
.../plugin/task/filecollect/LogFileCollectTask.java | 5 +++++
inlong-manager/manager-web/sql/changes-1.10.0.sql | 15 ++++++++++++++-
7 files changed, 58 insertions(+), 10 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index de863a7aa0..0da4a8d349 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -32,6 +32,10 @@ import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.TimeZone;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
@@ -89,6 +93,14 @@ public class TaskProfile extends AbstractConfiguration {
set(TaskConstants.TASK_CLASS, className);
}
+ public String getInlongGroupId() {
+ return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+ }
+
+ public String getInlongStreamId() {
+ return get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+ }
+
/**
* parse json string to configuration instance.
*
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index faaf3400c6..290d3b71bb 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -42,15 +42,19 @@ public class AuditUtils {
public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
- public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 47;
- public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 48;
public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
- public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 10026;
- public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 49;
- public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 50;
- public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 51;
- public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 52;
- public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 10028;
+ public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 30001;
+ public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 30002;
+ public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 30003;
+ public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 30004;
+ public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 30005;
+ public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 30006;
+ public static final int AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT = 30007;
+ public static final int AUDIT_ID_AGENT_TASK_HEARTBEAT = 30008;
+ public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009;
+ public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
+ public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
+ public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
private static boolean IS_AUDIT = true;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 1ed8e86cf8..3a86f32fc2 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -62,7 +62,8 @@ public class InstanceManager extends AbstractDaemon {
public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
// instance in db
private final InstanceDb instanceDb;
- TaskProfileDb taskProfileDb;
+ private TaskProfileDb taskProfileDb;
+ private TaskProfile taskFromDb;
// task in memory
private final ConcurrentHashMap<String, Instance> instanceMap;
// instance profile queue.
@@ -161,6 +162,10 @@ public class InstanceManager extends AbstractDaemon {
cleanDbInstance();
dealWithActionQueue(actionQueue);
keepPaceWithDb();
+ String inlongGroupId = taskFromDb.getInlongGroupId();
+ String inlongStreamId = taskFromDb.getInlongStreamId();
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId,
inlongStreamId,
+ AgentUtils.getCurrentTime(), 1, 1);
} catch (Throwable ex) {
LOGGER.error("coreThread {}", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
@@ -323,6 +328,7 @@ public class InstanceManager extends AbstractDaemon {
}
private void restoreFromDb() {
+ taskFromDb = taskProfileDb.getTask(taskId);
List<InstanceProfile> profileList = instanceDb.getInstances(taskId);
profileList.forEach((profile) -> {
InstanceStateEnum state = profile.getState();
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index e188e4f207..684600dbb2 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -27,6 +27,7 @@ import org.apache.inlong.agent.core.task.TaskAction;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.RocksDbImp;
import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.file.Task;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
@@ -197,6 +198,8 @@ public class TaskManager extends AbstractDaemon {
printTaskDetail();
dealWithConfigQueue(configQueue);
dealWithActionQueue(actionQueue);
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT, "", "",
+ AgentUtils.getCurrentTime(), 1, 1);
} catch (Throwable ex) {
LOGGER.error("exception caught", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 1785b4245f..23566acd9f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.core.instance.ActionType;
import org.apache.inlong.agent.core.instance.InstanceAction;
import org.apache.inlong.agent.core.instance.InstanceManager;
import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Sink;
@@ -109,6 +110,10 @@ public class FileInstance extends Instance {
checkFinishCount = 0;
}
AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+ String inlongGroupId = profile.getInlongGroupId();
+ String inlongStreamId = profile.getInlongStreamId();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT,
inlongGroupId, inlongStreamId,
+ AgentUtils.getCurrentTime(), 1, 1);
} else {
sink.write(msg);
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 25f3ef456d..c506d698d0 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -27,6 +27,7 @@ import org.apache.inlong.agent.core.instance.InstanceManager;
import org.apache.inlong.agent.core.task.TaskAction;
import org.apache.inlong.agent.core.task.file.TaskManager;
import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.file.Task;
import
org.apache.inlong.agent.plugin.task.filecollect.FileScanner.BasicFileInfo;
import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
@@ -262,6 +263,10 @@ public class LogFileCollectTask extends Task {
} else {
runForNormal();
}
+ String inlongGroupId = taskProfile.getInlongGroupId();
+ String inlongStreamId = taskProfile.getInlongStreamId();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
inlongGroupId, inlongStreamId,
+ AgentUtils.getCurrentTime(), 1, 1);
}
running = false;
}
diff --git a/inlong-manager/manager-web/sql/changes-1.10.0.sql
b/inlong-manager/manager-web/sql/changes-1.10.0.sql
index a09a83faf7..2ebdf6838a 100644
--- a/inlong-manager/manager-web/sql/changes-1.10.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.10.0.sql
@@ -34,7 +34,20 @@ VALUES ('audit_sort_mysql_binlog_input', 'MYSQL_BINLOG', 0,
'29'),
('audit_sort_pulsar_input', 'PULSAR', 0, '31'),
('audit_sort_pulsar_output', 'PULSAR', 1, '32'),
('audit_sort_tube_input', 'TUBEMQ', 0, '33'),
- ('audit_sort_tube_output', 'TUBEMQ', 1, '34');
+ ('audit_sort_tube_output', 'TUBEMQ', 1, '34'),
+ ('audit_agent_sent_failed', 'AGENT', 2, '10004'),
+ ('audit_agent_read_realtime', 'AGENT', 3, '30001'),
+ ('audit_agent_send_realtime', 'AGENT', 4, '30002'),
+ ('audit_agent_add_instance_mem', 'AGENT', 5, '30003'),
+ ('audit_agent_del_instance_mem', 'AGENT', 6, '30004'),
+ ('audit_agent_add_instance_db', 'AGENT', 7, '30005'),
+ ('audit_agent_del_instance_db', 'AGENT', 8, '30006'),
+ ('audit_agent_task_mgr_heartbeat', 'AGENT', 9, '30007'),
+ ('audit_agent_task_heartbeat', 'AGENT', 10, '30008'),
+ ('audit_agent_instance_mgr_heartbeat', 'AGENT', 11, '30009'),
+ ('audit_agent_instance_heartbeat', 'AGENT', 12, '30010'),
+ ('audit_agent_sent_failed_realtime', 'AGENT', 13, '30011'),
+ ('audit_agent_del_instance_mem_unusual', 'AGENT', 14, '30014');
ALTER TABLE `operation_log`
ADD COLUMN `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong
group id';