This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a95b57bdc4 [INLONG-9710][Agent] Improve the accuracy of instance 
heartbeat auditing (#9711)
a95b57bdc4 is described below

commit a95b57bdc4d11b9d972a8e391452b18a99e7a6df
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Feb 21 10:30:53 2024 +0800

    [INLONG-9710][Agent] Improve the accuracy of instance heartbeat auditing 
(#9711)
---
 .../inlong/agent/plugin/instance/FileInstance.java | 34 +++++++++++++++++-----
 1 file changed, 26 insertions(+), 8 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index f01084cdab..5b1fe89c75 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -37,6 +37,8 @@ import org.apache.inlong.common.enums.InstanceStateEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * file instance contains source and sink.
  * main job is to read from source and write to sink
@@ -44,10 +46,11 @@ import org.slf4j.LoggerFactory;
 public class FileInstance extends Instance {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FileInstance.class);
+    public static final int HEARTBEAT_CHECK_GAP = 10;
     private Source source;
     private Sink sink;
     private InstanceProfile profile;
-    public static final int CORE_THREAD_SLEEP_TIME = 1;
+    public static final int CORE_THREAD_SLEEP_TIME = 10;
     private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
     private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
     private final int WRITE_FAILED_WAIT_TIME_MS = 10;
@@ -55,6 +58,8 @@ public class FileInstance extends Instance {
     private volatile boolean running = false;
     private volatile boolean inited = false;
     private volatile int checkFinishCount = 0;
+    private int heartbeatcheckCount = 0;
+    private long heartBeatStartTime = AgentUtils.getCurrentTime();
 
     @Override
     public boolean init(Object srcManager, InstanceProfile srcProfile) {
@@ -121,29 +126,42 @@ public class FileInstance extends Instance {
                 } else {
                     checkFinishCount = 0;
                 }
-                AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
-                String inlongGroupId = profile.getInlongGroupId();
-                String inlongStreamId = profile.getInlongStreamId();
-                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
inlongGroupId, inlongStreamId,
-                        AgentUtils.getCurrentTime(), 1, 1);
+                heartbeatStatic();
+                AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
             } else {
                 boolean suc = false;
                 while (!isFinished() && !suc) {
                     suc = sink.write(msg);
                     if (!suc) {
+                        heartbeatStatic();
                         AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
                     }
                 }
+                heartbeatcheckCount++;
+                if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) {
+                    heartbeatStatic();
+                }
             }
         }
     }
 
+    private void heartbeatStatic() {
+        String inlongGroupId = profile.getInlongGroupId();
+        String inlongStreamId = profile.getInlongStreamId();
+        if (AgentUtils.getCurrentTime() - heartBeatStartTime > 
TimeUnit.SECONDS.toMillis(1)) {
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
inlongGroupId, inlongStreamId,
+                    AgentUtils.getCurrentTime(), 1, 1);
+            heartbeatcheckCount = 0;
+            heartBeatStartTime = AgentUtils.getCurrentTime();
+        }
+    }
+
     private void handleReadEnd() {
         InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
         while (!isFinished() && !instanceManager.submitAction(action)) {
             LOGGER.error("instance manager action queue is full: taskId {}",
                     instanceManager.getTaskId());
-            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
         }
     }
 
@@ -155,7 +173,7 @@ public class FileInstance extends Instance {
         while (!isFinished() && !instanceManager.submitAction(action)) {
             LOGGER.error("instance manager action queue is full: taskId {}",
                     instanceManager.getTaskId());
-            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
         }
     }
 

Reply via email to