This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cd2a7a84a0 [INLONG-10035][Agent] Report audit data using the new SDK 
interface (#10038)
cd2a7a84a0 is described below

commit cd2a7a84a02dbd84a2ffa6851f5185cda8524461
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Apr 22 20:52:56 2024 +0800

    [INLONG-10035][Agent] Report audit data using the new SDK interface (#10038)
---
 .../agent/message/file/ProxyMessageCache.java      |  2 ++
 .../inlong/agent/metrics/audit/AuditUtils.java     | 14 ++++++++++++--
 .../agent/core/instance/InstanceManager.java       | 16 +++++++++-------
 .../agent/plugin/instance/CommonInstance.java      |  4 +++-
 .../plugin/sinks/filecollect/SenderManager.java    | 22 ++++++++++++----------
 .../inlong/agent/plugin/sources/LogFileSource.java | 12 ++++++++++--
 .../agent/plugin/sources/file/AbstractSource.java  |  8 +++++---
 .../inlong/agent/plugin/task/AbstractTask.java     |  4 +++-
 8 files changed, 56 insertions(+), 26 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
index 08a7b10d33..9216461579 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
@@ -39,6 +39,7 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PAC
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
+import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION;
 
 /**
  * Handle List of Proxy Message, which belong to the same stream id.
@@ -77,6 +78,7 @@ public class ProxyMessageCache {
         dataTime = instanceProfile.getSinkDataTime();
         extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
         
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
+        extraMap.put(AUDIT_VERSION, taskId);
     }
 
     public void generateExtraMap(String dataKey) {
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index e4cc7691c8..7b5edb5a0b 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -30,6 +30,8 @@ import static 
org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE;
 import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_KEY_PROXYS;
 import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_ENABLE;
 import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_PROXYS;
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static 
org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;
 
 /**
  * AuditUtils
@@ -42,6 +44,7 @@ public class AuditUtils {
     public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
     public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3;
     public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
+    public static final int AUDIT_ID_AGENT_READ_FAILED = 10003;
     public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
     public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 30001;
     public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 30002;
@@ -54,6 +57,7 @@ public class AuditUtils {
     public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009;
     public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
     public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
+    public static final int AUDIT_ID_AGENT_READ_FAILED_REAL_TIME = 30012;
     public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30013;
     public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
     public static final int AUDIT_ID_AGENT_TRY_SEND = 30020;
@@ -93,11 +97,17 @@ public class AuditUtils {
      * Add audit metric
      */
     public static void add(int auditID, String inlongGroupId, String 
inlongStreamId,
-            long logTime, int count, long size) {
+            long logTime, int count, long size, long version) {
         if (!IS_AUDIT) {
             return;
         }
-        AuditOperator.getInstance().add(auditID, inlongGroupId, 
inlongStreamId, logTime, count, size);
+        AuditOperator.getInstance()
+                .add(auditID, DEFAULT_AUDIT_TAG, inlongGroupId, 
inlongStreamId, logTime, count, size, version);
+    }
+
+    public static void add(int auditID, String inlongGroupId, String 
inlongStreamId,
+            long logTime, int count, long size) {
+        add(auditID, inlongGroupId, inlongStreamId, logTime, count, size, 
DEFAULT_AUDIT_VERSION);
     }
 
     /**
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 2c9bd721d8..5545039f5e 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -80,6 +80,7 @@ public class InstanceManager extends AbstractDaemon {
     private final int instanceLimit;
     private final AgentConfiguration agentConf;
     private final String taskId;
+    private long auditVersion;
     private volatile boolean runAtLeastOneTime = false;
     private volatile boolean running = false;
     private final double reserveCoefficient = 0.8;
@@ -122,6 +123,7 @@ public class InstanceManager extends AbstractDaemon {
      */
     public InstanceManager(String taskId, int instanceLimit, Db basicDb, 
TaskProfileDb taskProfileDb) {
         this.taskId = taskId;
+        this.auditVersion = Long.parseLong(taskId);
         instanceDb = new InstanceDb(basicDb);
         this.taskProfileDb = taskProfileDb;
         this.agentConf = AgentConfiguration.getAgentConf();
@@ -171,7 +173,7 @@ public class InstanceManager extends AbstractDaemon {
                     String inlongGroupId = taskFromDb.getInlongGroupId();
                     String inlongStreamId = taskFromDb.getInlongStreamId();
                     
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, 
inlongStreamId,
-                            AgentUtils.getCurrentTime(), 1, 1);
+                            AgentUtils.getCurrentTime(), 1, 1, auditVersion);
                 } catch (Throwable ex) {
                     LOGGER.error("coreThread error: ", ex);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
@@ -387,7 +389,7 @@ public class InstanceManager extends AbstractDaemon {
         LOGGER.info("delete instance from db: taskId {} instanceId {} result 
{}", taskId,
                 instanceId, instanceDb.getInstance(taskId, instanceId));
         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, 
inlongGroupId, inlongStreamId,
-                profile.getSinkDataTime(), 1, 1);
+                profile.getSinkDataTime(), 1, 1, auditVersion);
     }
 
     private void deleteFromMemory(String instanceId) {
@@ -403,7 +405,7 @@ public class InstanceManager extends AbstractDaemon {
         instanceMap.remove(instanceId);
         LOGGER.info("delete instance from memory: taskId {} instanceId {}", 
taskId, instance.getInstanceId());
         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM, 
inlongGroupId, inlongStreamId,
-                instance.getProfile().getSinkDataTime(), 1, 1);
+                instance.getProfile().getSinkDataTime(), 1, 1, auditVersion);
     }
 
     private void addToDb(InstanceProfile profile, boolean addNew) {
@@ -413,7 +415,7 @@ public class InstanceManager extends AbstractDaemon {
             String inlongGroupId = profile.getInlongGroupId();
             String inlongStreamId = profile.getInlongStreamId();
             AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_DB, 
inlongGroupId, inlongStreamId,
-                    profile.getSinkDataTime(), 1, 1);
+                    profile.getSinkDataTime(), 1, 1, auditVersion);
         }
     }
 
@@ -430,7 +432,7 @@ public class InstanceManager extends AbstractDaemon {
             LOGGER.error("old instance {} should not exist, try stop it first",
                     instanceProfile.getInstanceId());
             AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL, 
inlongGroupId, inlongStreamId,
-                    instanceProfile.getSinkDataTime(), 1, 1);
+                    instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
         }
         LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
         try {
@@ -445,12 +447,12 @@ public class InstanceManager extends AbstractDaemon {
                         instance.getInstanceId(), instanceMap.size(), 
EXECUTOR_SERVICE.getTaskCount(),
                         EXECUTOR_SERVICE.getActiveCount());
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, 
inlongGroupId, inlongStreamId,
-                        instanceProfile.getSinkDataTime(), 1, 1);
+                        instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
             } else {
                 LOGGER.error(
                         "add instance to memory init failed instanceId {}", 
instance.getInstanceId());
                 
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED, 
inlongGroupId, inlongStreamId,
-                        instanceProfile.getSinkDataTime(), 1, 1);
+                        instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
             }
         } catch (Throwable t) {
             LOGGER.error("add instance error {}", t.getMessage());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
index d381c88bd2..e8d848b36b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
@@ -59,12 +59,14 @@ public abstract class CommonInstance extends Instance {
     private volatile int checkFinishCount = 0;
     private int heartbeatcheckCount = 0;
     private long heartBeatStartTime = AgentUtils.getCurrentTime();
+    protected long auditVersion;
 
     @Override
     public boolean init(Object srcManager, InstanceProfile srcProfile) {
         try {
             instanceManager = (InstanceManager) srcManager;
             profile = srcProfile;
+            auditVersion = Long.parseLong(getTaskId());
             setInodeInfo(profile);
             LOGGER.info("task id: {} submit new instance {} profile detail 
{}.", profile.getTaskId(),
                     profile.getInstanceId(), profile.toJsonStr());
@@ -153,7 +155,7 @@ public abstract class CommonInstance extends Instance {
     private void heartbeatStatic() {
         if (AgentUtils.getCurrentTime() - heartBeatStartTime > 
TimeUnit.SECONDS.toMillis(1)) {
             AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
profile.getInlongGroupId(),
-                    profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 
1, 1);
+                    profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 
1, 1, auditVersion);
             heartbeatcheckCount = 0;
             heartBeatStartTime = AgentUtils.getCurrentTime();
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 4e5c07ca26..0eba81e4d2 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -106,9 +106,11 @@ public class SenderManager {
     private volatile boolean resendRunning = false;
     private volatile boolean started = false;
     private static final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
+    private long auditVersion;
 
     public SenderManager(InstanceProfile profile, String inlongGroupId, String 
sourcePath) {
         this.profile = profile;
+        auditVersion = Long.parseLong(profile.getTaskId());
         managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
         proxySend = profile.getBoolean(TASK_PROXY_SEND, 
DEFAULT_TASK_PROXY_SEND);
         totalAsyncBufSize = profile
@@ -233,10 +235,10 @@ public class SenderManager {
                 AgentSenderCallback cb = new AgentSenderCallback(message, 
retry);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND, 
message.getGroupId(),
                         message.getStreamId(), message.getDataTime(), 
message.getMsgCnt(),
-                        message.getTotalSize());
+                        message.getTotalSize(), auditVersion);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND_REAL_TIME, 
message.getGroupId(),
                         message.getStreamId(), AgentUtils.getCurrentTime(), 
message.getMsgCnt(),
-                        message.getTotalSize());
+                        message.getTotalSize(), auditVersion);
                 asyncSendByMessageSender(cb, message.getDataList(), 
message.getGroupId(),
                         message.getStreamId(), message.getDataTime(), 
SEQUENTIAL_ID.getNextUuid(),
                         maxSenderTimeout, TimeUnit.SECONDS, 
message.getExtraMap(), proxySend);
@@ -246,10 +248,10 @@ public class SenderManager {
             } catch (Exception exception) {
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION, 
message.getGroupId(),
                         message.getStreamId(), message.getDataTime(), 
message.getMsgCnt(),
-                        message.getTotalSize());
+                        message.getTotalSize(), auditVersion);
                 
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME, 
message.getGroupId(),
                         message.getStreamId(), AgentUtils.getCurrentTime(), 
message.getMsgCnt(),
-                        message.getTotalSize());
+                        message.getTotalSize(), auditVersion);
                 suc = false;
                 if (retry > maxSenderRetry) {
                     if (retry % 10 == 0) {
@@ -291,10 +293,10 @@ public class SenderManager {
                         SenderMessage message = callback.message;
                         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, 
message.getGroupId(),
                                 message.getStreamId(), message.getDataTime(), 
message.getMsgCnt(),
-                                message.getTotalSize());
+                                message.getTotalSize(), auditVersion);
                         
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND_REAL_TIME, message.getGroupId(),
                                 message.getStreamId(), 
AgentUtils.getCurrentTime(), message.getMsgCnt(),
-                                message.getTotalSize());
+                                message.getTotalSize(), auditVersion);
                         sendBatchWithRetryCount(callback.message, 
callback.retry + 1);
                     }
                 } catch (Exception ex) {
@@ -353,18 +355,18 @@ public class SenderManager {
                 message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
                 getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId,
-                        dataTime, message.getMsgCnt(), message.getTotalSize());
+                        dataTime, message.getMsgCnt(), message.getTotalSize(), 
auditVersion);
                 
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, groupId, 
streamId,
-                        AgentUtils.getCurrentTime(), message.getMsgCnt(), 
message.getTotalSize());
+                        AgentUtils.getCurrentTime(), message.getMsgCnt(), 
message.getTotalSize(), auditVersion);
             } else {
                 LOGGER.warn("send groupId {}, streamId {}, taskId {}, 
instanceId {}, dataTime {} fail with times {}, "
                         + "error {}", groupId, streamId, taskId, instanceId, 
dataTime, retry, result);
                 getMetricItem(groupId, 
streamId).pluginSendFailCount.addAndGet(msgCnt);
                 putInResendQueue(new AgentSenderCallback(message, retry));
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, groupId, 
streamId,
-                        dataTime, message.getMsgCnt(), message.getTotalSize());
+                        dataTime, message.getMsgCnt(), message.getTotalSize(), 
auditVersion);
                 
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, groupId, 
streamId,
-                        AgentUtils.getCurrentTime(), message.getMsgCnt(), 
message.getTotalSize());
+                        AgentUtils.getCurrentTime(), message.getMsgCnt(), 
message.getTotalSize(), auditVersion);
             }
         }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 09d200a742..dd6d9ce146 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -246,8 +246,16 @@ public class LogFileSource extends AbstractSource {
                         if (overLen) {
                             LOGGER.warn("readLines over len finally string len 
{}",
                                     new String(baos.toByteArray()).length());
-                            
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId,
-                                    inlongStreamId, 
AgentUtils.getCurrentTime(), 1, maxPackSize);
+                            long auditTime = 0;
+                            if (isRealTime) {
+                                auditTime = AgentUtils.getCurrentTime();
+                            } else {
+                                auditTime = profile.getSinkDataTime();
+                            }
+                            
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED, inlongGroupId, 
inlongStreamId,
+                                    auditTime, 1, maxPackSize, auditVersion);
+                            
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED_REAL_TIME, inlongGroupId,
+                                    inlongStreamId, 
AgentUtils.getCurrentTime(), 1, maxPackSize, auditVersion);
                         }
                         baos.reset();
                         overLen = false;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 192756abf8..2449d065e4 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -101,10 +101,11 @@ public abstract class AbstractSource implements Source {
     protected volatile boolean runnable = true;
     protected volatile boolean running = false;
     protected String taskId;
+    protected long auditVersion;
     protected String instanceId;
     protected InstanceProfile profile;
     private ExtendedHandler extendedHandler;
-    private boolean isRealTime = false;
+    protected boolean isRealTime = false;
     protected volatile long emptyCount = 0;
     protected int maxPackSize;
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
@@ -118,6 +119,7 @@ public abstract class AbstractSource implements Source {
     public void init(InstanceProfile profile) {
         this.profile = profile;
         taskId = profile.getTaskId();
+        auditVersion = Long.parseLong(taskId);
         instanceId = profile.getInstanceId();
         inlongGroupId = profile.getInlongGroupId();
         inlongStreamId = profile.getInlongStreamId();
@@ -333,9 +335,9 @@ public abstract class AbstractSource implements Source {
             auditTime = profile.getSinkDataTime();
         }
         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
-                auditTime, 1, sourceData.getData().length);
+                auditTime, 1, sourceData.getData().length, auditVersion);
         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, 
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
-                AgentUtils.getCurrentTime(), 1, sourceData.getData().length);
+                AgentUtils.getCurrentTime(), 1, sourceData.getData().length, 
auditVersion);
         Message finalMsg = new DefaultMessage(sourceData.getData(), header);
         // if the message size is greater than max pack size,should drop it.
         if (finalMsg.getBody().length > maxPackSize) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index 56a786c6e5..b288ff1946 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -48,12 +48,14 @@ public abstract class AbstractTask extends Task {
     protected volatile boolean running = false;
     protected boolean initOK = false;
     protected long lastPrintTime = 0;
+    protected long auditVersion;
 
     @Override
     public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
         taskManager = (TaskManager) srcManager;
         this.taskProfile = taskProfile;
         this.basicDb = basicDb;
+        auditVersion = Long.parseLong(taskProfile.getTaskId());
         instanceManager = new InstanceManager(taskProfile.getTaskId(), 
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
                 basicDb, taskManager.getTaskDb());
         try {
@@ -132,7 +134,7 @@ public abstract class AbstractTask extends Task {
 
     protected void taskHeartbeat() {
         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
taskProfile.getInlongGroupId(),
-                taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(), 
1, 1);
+                taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(), 
1, 1, auditVersion);
 
     }
 

Reply via email to