This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6816f9f505 [INLONG-11614][Agent] Fix the signal leakage issue of 
AbstractSource class (#11615)
6816f9f505 is described below

commit 6816f9f505bb6267fb7a2c7c7704fa2f469a4683
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Dec 20 11:43:58 2024 +0800

    [INLONG-11614][Agent] Fix the signal leakage issue of AbstractSource class 
(#11615)
---
 .../inlong/agent/plugin/sources/file/AbstractSource.java   | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 72d14327a2..df3d14652e 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -110,6 +110,7 @@ public abstract class AbstractSource implements Source {
             new SynchronousQueue<>(),
             new AgentThreadFactory("source-pool"));
     protected OffsetProfile offsetProfile;
+    protected boolean sourceError = false;
 
     @Override
     public void init(InstanceProfile profile) {
@@ -200,10 +201,16 @@ public abstract class AbstractSource implements Source {
      * @return true if prepared ok
      */
     private boolean prepareToRead() {
-        if (!doPrepareToRead()) {
+        try {
+            if (!doPrepareToRead()) {
+                return false;
+            }
+            return waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
+        } catch (Throwable e) {
+            LOGGER.error("prepare to read {} error:", instanceId, e);
+            sourceError = true;
             return false;
         }
-        return waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
     }
 
     /**
@@ -416,6 +423,9 @@ public abstract class AbstractSource implements Source {
 
     @Override
     public boolean sourceFinish() {
+        if (sourceError) {
+            return true;
+        }
         if (isRealTime) {
             return false;
         }

Reply via email to