This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6ec047e8e9 [INLONG-9556][Agent] Prevent thread freeze caused by
deleting data sources when the backend cannot send out (#9557)
6ec047e8e9 is described below
commit 6ec047e8e9d0ec63c1f60104e99ad9cd1082cc4a
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Jan 12 14:44:49 2024 +0800
[INLONG-9556][Agent] Prevent thread freeze caused by deleting data sources
when the backend cannot send out (#9557)
---
.../java/org/apache/inlong/agent/plugin/file/Sink.java | 2 +-
.../apache/inlong/agent/plugin/instance/FileInstance.java | 9 ++++++++-
.../inlong/agent/plugin/sinks/filecollect/ProxySink.java | 14 ++------------
3 files changed, 11 insertions(+), 14 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
index 0978ea8025..15c888f1f8 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
@@ -30,7 +30,7 @@ public interface Sink {
*
* @param message message
*/
- void write(Message message);
+ boolean write(Message message);
/**
* set source file name where the message is generated
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 1f2a2cfc40..f01084cdab 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -50,6 +50,7 @@ public class FileInstance extends Instance {
public static final int CORE_THREAD_SLEEP_TIME = 1;
private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
+ private final int WRITE_FAILED_WAIT_TIME_MS = 10;
private InstanceManager instanceManager;
private volatile boolean running = false;
private volatile boolean inited = false;
@@ -126,7 +127,13 @@ public class FileInstance extends Instance {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT,
inlongGroupId, inlongStreamId,
AgentUtils.getCurrentTime(), 1, 1);
} else {
- sink.write(msg);
+ boolean suc = false;
+ while (!isFinished() && !suc) {
+ suc = sink.write(msg);
+ if (!suc) {
+ AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
+ }
+ }
}
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index 5a01f64fa9..fea332c96d 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -56,7 +56,6 @@ import static
org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
public class ProxySink extends AbstractSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProxySink.class);
- private final int WRITE_FAILED_WAIT_TIME_MS = 10;
private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
public final int SAVE_OFFSET_INTERVAL_MS = 1000;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
@@ -76,18 +75,9 @@ public class ProxySink extends AbstractSink {
private volatile boolean offsetRunning = false;
private OffsetManager offsetManager;
- public ProxySink() {
- }
-
@Override
- public void write(Message message) {
- boolean suc = false;
- while (!shutdown && !suc) {
- suc = putInCache(message);
- if (!suc) {
- AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
- }
- }
+ public boolean write(Message message) {
+ return putInCache(message);
}
private boolean putInCache(Message message) {