This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 73f477aae0 [INLONG-11145][Agent] Optimize the logic of supplementing
data (#11146)
73f477aae0 is described below
commit 73f477aae0e97d725918e985c4539612626d076a
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Sep 19 17:27:19 2024 +0800
[INLONG-11145][Agent] Optimize the logic of supplementing data (#11146)
---
.../agent/core/instance/InstanceManager.java | 23 +++++++++-------------
.../inlong/agent/plugin/task/AbstractTask.java | 3 ++-
.../inlong/agent/plugin/task/file/LogFileTask.java | 5 ++++-
3 files changed, 15 insertions(+), 16 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 333c12a7d2..48aedfd09d 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -79,6 +79,7 @@ public class InstanceManager extends AbstractDaemon {
private volatile boolean runAtLeastOneTime = false;
private volatile boolean running = false;
private final double reserveCoefficient = 0.8;
+ private long finishedInstanceCount = 0;
private class InstancePrintStat {
@@ -318,6 +319,8 @@ public class InstanceManager extends AbstractDaemon {
private void addInstance(InstanceProfile profile) {
if (instanceMap.size() >= instanceLimit) {
LOGGER.error("instanceMap size {} over limit {}",
instanceMap.size(), instanceLimit);
+ actionQueue.offer(new InstanceAction(ActionType.ADD, profile));
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
return;
}
LOGGER.info("add instance taskId {} instanceId {}", taskId,
profile.getInstanceId());
@@ -337,6 +340,7 @@ public class InstanceManager extends AbstractDaemon {
deleteFromMemory(profile.getInstanceId());
LOGGER.info("finished instance state {} taskId {} instanceId {}",
profile.getState(),
profile.getTaskId(), profile.getInstanceId());
+ finishedInstanceCount++;
}
private void deleteInstance(String instanceId) {
@@ -458,23 +462,14 @@ public class InstanceManager extends AbstractDaemon {
return (instanceMap.size() + actionQueue.size()) >= instanceLimit *
reserveCoefficient;
}
- public boolean allInstanceFinished() {
- if (!runAtLeastOneTime) {
- return false;
- }
- if (!instanceMap.isEmpty()) {
- return false;
- }
- if (!actionQueue.isEmpty()) {
- return false;
- }
+ public long getFinishedInstanceCount() {
+ int count = 0;
List<InstanceProfile> instances = instanceStore.getInstances(taskId);
for (int i = 0; i < instances.size(); i++) {
- InstanceProfile profile = instances.get(i);
- if (profile.getState() != InstanceStateEnum.FINISHED) {
- return false;
+ if (instances.get(i).getState() == InstanceStateEnum.FINISHED) {
+ count++;
}
}
- return true;
+ return count;
}
}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index 75d87bb235..d9ec53ab0b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -52,6 +52,7 @@ public abstract class AbstractTask extends Task {
protected boolean initOK = false;
protected long lastPrintTime = 0;
protected long auditVersion;
+ protected long instanceCount = 0;
@Override
public void init(Object srcManager, TaskProfile taskProfile, Store
basicStore) throws IOException {
@@ -152,7 +153,7 @@ public abstract class AbstractTask extends Task {
}
protected boolean allInstanceFinished() {
- return instanceManager.allInstanceFinished();
+ return instanceCount == instanceManager.getFinishedInstanceCount();
}
protected boolean shouldAddAgain(String fileName, long lastModifyTime) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index fbee956b0f..4f49cfdd7d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -241,7 +241,7 @@ public class LogFileTask extends AbstractTask {
runAtLeastOneTime = true;
}
dealWithEventMap();
- if (instanceQueue.isEmpty() && allInstanceFinished()) {
+ if (allInstanceFinished()) {
LOGGER.info("retry task finished, send action to task manager,
taskId {}", getTaskId());
TaskAction action = new
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
taskManager.submitAction(action);
@@ -264,6 +264,9 @@ public class LogFileTask extends AbstractTask {
LOGGER.info("taskId {} scan {} get file count {}", getTaskId(),
originPattern, fileInfos.size());
fileInfos.forEach((fileInfo) -> {
addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
+ if (retry) {
+ instanceCount++;
+ }
});
});
}