This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ec047e8e9 [INLONG-9556][Agent] Prevent thread freeze caused by 
deleting data sources when the backend cannot send out (#9557)
6ec047e8e9 is described below

commit 6ec047e8e9d0ec63c1f60104e99ad9cd1082cc4a
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Jan 12 14:44:49 2024 +0800

    [INLONG-9556][Agent] Prevent thread freeze caused by deleting data sources 
when the backend cannot send out (#9557)
---
 .../java/org/apache/inlong/agent/plugin/file/Sink.java     |  2 +-
 .../apache/inlong/agent/plugin/instance/FileInstance.java  |  9 ++++++++-
 .../inlong/agent/plugin/sinks/filecollect/ProxySink.java   | 14 ++------------
 3 files changed, 11 insertions(+), 14 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
index 0978ea8025..15c888f1f8 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
@@ -30,7 +30,7 @@ public interface Sink {
      *
      * @param message message
      */
-    void write(Message message);
+    boolean write(Message message);
 
     /**
      * set source file name where the message is generated
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 1f2a2cfc40..f01084cdab 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -50,6 +50,7 @@ public class FileInstance extends Instance {
     public static final int CORE_THREAD_SLEEP_TIME = 1;
     private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
     private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
+    private final int WRITE_FAILED_WAIT_TIME_MS = 10;
     private InstanceManager instanceManager;
     private volatile boolean running = false;
     private volatile boolean inited = false;
@@ -126,7 +127,13 @@ public class FileInstance extends Instance {
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
inlongGroupId, inlongStreamId,
                         AgentUtils.getCurrentTime(), 1, 1);
             } else {
-                sink.write(msg);
+                boolean suc = false;
+                while (!isFinished() && !suc) {
+                    suc = sink.write(msg);
+                    if (!suc) {
+                        AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
+                    }
+                }
             }
         }
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index 5a01f64fa9..fea332c96d 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -56,7 +56,6 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
 public class ProxySink extends AbstractSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxySink.class);
-    private final int WRITE_FAILED_WAIT_TIME_MS = 10;
     private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
     public final int SAVE_OFFSET_INTERVAL_MS = 1000;
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
@@ -76,18 +75,9 @@ public class ProxySink extends AbstractSink {
     private volatile boolean offsetRunning = false;
     private OffsetManager offsetManager;
 
-    public ProxySink() {
-    }
-
     @Override
-    public void write(Message message) {
-        boolean suc = false;
-        while (!shutdown && !suc) {
-            suc = putInCache(message);
-            if (!suc) {
-                AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
-            }
-        }
+    public boolean write(Message message) {
+        return putInCache(message);
     }
 
     private boolean putInCache(Message message) {

Reply via email to