This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a95b57bdc4 [INLONG-9710][Agent] Improve the accuracy of instance
heartbeat auditing (#9711)
a95b57bdc4 is described below
commit a95b57bdc4d11b9d972a8e391452b18a99e7a6df
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Feb 21 10:30:53 2024 +0800
[INLONG-9710][Agent] Improve the accuracy of instance heartbeat auditing
(#9711)
---
.../inlong/agent/plugin/instance/FileInstance.java | 34 +++++++++++++++++-----
1 file changed, 26 insertions(+), 8 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index f01084cdab..5b1fe89c75 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -37,6 +37,8 @@ import org.apache.inlong.common.enums.InstanceStateEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+
/**
* file instance contains source and sink.
* main job is to read from source and write to sink
@@ -44,10 +46,11 @@ import org.slf4j.LoggerFactory;
public class FileInstance extends Instance {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileInstance.class);
+ public static final int HEARTBEAT_CHECK_GAP = 10;
private Source source;
private Sink sink;
private InstanceProfile profile;
- public static final int CORE_THREAD_SLEEP_TIME = 1;
+ public static final int CORE_THREAD_SLEEP_TIME = 10;
private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
private final int WRITE_FAILED_WAIT_TIME_MS = 10;
@@ -55,6 +58,8 @@ public class FileInstance extends Instance {
private volatile boolean running = false;
private volatile boolean inited = false;
private volatile int checkFinishCount = 0;
+ private int heartbeatcheckCount = 0;
+ private long heartBeatStartTime = AgentUtils.getCurrentTime();
@Override
public boolean init(Object srcManager, InstanceProfile srcProfile) {
@@ -121,29 +126,42 @@ public class FileInstance extends Instance {
} else {
checkFinishCount = 0;
}
- AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
- String inlongGroupId = profile.getInlongGroupId();
- String inlongStreamId = profile.getInlongStreamId();
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT,
inlongGroupId, inlongStreamId,
- AgentUtils.getCurrentTime(), 1, 1);
+ heartbeatStatic();
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
} else {
boolean suc = false;
while (!isFinished() && !suc) {
suc = sink.write(msg);
if (!suc) {
+ heartbeatStatic();
AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
}
}
+ heartbeatcheckCount++;
+ if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) {
+ heartbeatStatic();
+ }
}
}
}
+ private void heartbeatStatic() {
+ String inlongGroupId = profile.getInlongGroupId();
+ String inlongStreamId = profile.getInlongStreamId();
+ if (AgentUtils.getCurrentTime() - heartBeatStartTime >
TimeUnit.SECONDS.toMillis(1)) {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT,
inlongGroupId, inlongStreamId,
+ AgentUtils.getCurrentTime(), 1, 1);
+ heartbeatcheckCount = 0;
+ heartBeatStartTime = AgentUtils.getCurrentTime();
+ }
+ }
+
private void handleReadEnd() {
InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
- AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
}
@@ -155,7 +173,7 @@ public class FileInstance extends Instance {
while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
- AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
}