This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3de83614e8 [INLONG-11762][Agent] Modify the logic for determining the
end of the data source (#11763)
3de83614e8 is described below
commit 3de83614e89ebdfac9e5f10727f3e31368a55617
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Feb 14 15:59:18 2025 +0800
[INLONG-11762][Agent] Modify the logic for determining the end of the data
source (#11763)
---
.../inlong/agent/plugin/sources/LogFileSource.java | 6 ++++++
.../agent/plugin/sources/file/AbstractSource.java | 17 +++++++++++++++--
.../inlong/agent/plugin/sources/TestLogFileSource.java | 2 +-
3 files changed, 22 insertions(+), 3 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 6774751a34..a78abe3305 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -30,6 +30,7 @@ import
org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.file.FileUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -374,4 +375,9 @@ public class LogFileSource extends AbstractSource {
}
}
}
+
+ @Override
+ public long getLastModifyTime() {
+ return FileUtils.getFileLastModifyTime(fileName);
+ }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index df3d14652e..419d511bf0 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -81,7 +81,7 @@ public abstract class AbstractSource implements Source {
protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
protected final Integer WAIT_TIMEOUT_MS = 10;
- private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60 * 100;
+ private final Integer SOURCE_NO_UPDATE_INTERVAL_MS = 5 * 60 * 1000;
private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
protected BlockingQueue<SourceData> queue;
@@ -429,6 +429,19 @@ public abstract class AbstractSource implements Source {
if (isRealTime) {
return false;
}
- return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
+ if (emptyCount == 0) {
+ return false;
+ }
+ if (profile.isRetry()) {
+ return true;
+ }
+ if (AgentUtils.getCurrentTime() - getLastModifyTime() >
SOURCE_NO_UPDATE_INTERVAL_MS) {
+ return true;
+ }
+ return false;
+ }
+
+ public long getLastModifyTime() {
+ return 0;
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 2bbe41859a..1fefbbc1d2 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -95,7 +95,7 @@ public class TestLogFileSource {
Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10);
Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS",
0);
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE",
2);
- Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
+ Whitebox.setInternalState(source, "SOURCE_NO_UPDATE_INTERVAL_MS",
1000);
Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
if (lineOffset > 0) {
String finalOffset = Long.toString(lineOffset);