This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 702b8e6242 [INLONG-9454][Agent] Increase exit conditions to prevent
dead loops (#9455)
702b8e6242 is described below
commit 702b8e6242964a7ee36b7cc9b847ea4a675e9ded
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Dec 11 15:29:18 2023 +0800
[INLONG-9454][Agent] Increase exit conditions to prevent dead loops (#9455)
---
.../apache/inlong/agent/plugin/instance/FileInstance.java | 4 ++--
.../apache/inlong/agent/plugin/sources/LogFileSource.java | 12 ++++++++----
.../agent/plugin/task/filecollect/LogFileCollectTask.java | 2 +-
3 files changed, 11 insertions(+), 7 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index fb6e0442ff..1785b4245f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -118,7 +118,7 @@ public class FileInstance extends Instance {
private void handleReadEnd() {
InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
- while (!instanceManager.submitAction(action)) {
+ while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
@@ -130,7 +130,7 @@ public class FileInstance extends Instance {
profile.setState(InstanceStateEnum.DELETE);
profile.setModifyTime(AgentUtils.getCurrentTime());
InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
- while (!instanceManager.submitAction(action)) {
+ while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 0cc97afb8c..e4de812a8a 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -495,16 +495,20 @@ public class LogFileSource extends AbstractSource {
}
private void putIntoQueue(SourceData sourceData) {
+ if (sourceData == null) {
+ return;
+ }
try {
boolean offerSuc = false;
- while (offerSuc != true) {
+ while (isRunnable() && offerSuc != true) {
offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
}
- LOGGER.debug("Read {} from file {}", sourceData.getData(),
fileName);
- } catch (InterruptedException e) {
- if (sourceData != null) {
+ if (!offerSuc) {
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length());
}
+ LOGGER.debug("Read {} from file {}", sourceData.getData(),
fileName);
+ } catch (InterruptedException e) {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length());
LOGGER.error("fetchData offer failed {}", e.getMessage());
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 96e4a5e22e..25f3ef456d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -409,7 +409,7 @@ public class LogFileCollectTask extends Task {
if (!isCurrentDataTime && instanceManager.isFull()) {
return;
}
- while (!instanceManager.submitAction(action)) {
+ while (!isFinished() && !instanceManager.submitAction(action))
{
LOGGER.error("instance manager action queue is full:
taskId {}", instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}