This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6816f9f505 [INLONG-11614][Agent] Fix the signal leakage issue of
AbstractSource class (#11615)
6816f9f505 is described below
commit 6816f9f505bb6267fb7a2c7c7704fa2f469a4683
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Dec 20 11:43:58 2024 +0800
[INLONG-11614][Agent] Fix the signal leakage issue of AbstractSource class
(#11615)
---
.../inlong/agent/plugin/sources/file/AbstractSource.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 72d14327a2..df3d14652e 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -110,6 +110,7 @@ public abstract class AbstractSource implements Source {
new SynchronousQueue<>(),
new AgentThreadFactory("source-pool"));
protected OffsetProfile offsetProfile;
+ protected boolean sourceError = false;
@Override
public void init(InstanceProfile profile) {
@@ -200,10 +201,16 @@ public abstract class AbstractSource implements Source {
* @return true if prepared ok
*/
private boolean prepareToRead() {
- if (!doPrepareToRead()) {
+ try {
+ if (!doPrepareToRead()) {
+ return false;
+ }
+ return waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
+ } catch (Throwable e) {
+ LOGGER.error("prepare to read {} error:", instanceId, e);
+ sourceError = true;
return false;
}
- return waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
}
/**
@@ -416,6 +423,9 @@ public abstract class AbstractSource implements Source {
@Override
public boolean sourceFinish() {
+ if (sourceError) {
+ return true;
+ }
if (isRealTime) {
return false;
}