This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f9c28bad17 [INLONG-9244][Agent] Fix bug: miss file from next data time
(#9245)
f9c28bad17 is described below
commit f9c28bad17278d193038ae77a88dba62b1c8a40d
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Nov 9 09:56:25 2023 +0800
[INLONG-9244][Agent] Fix bug: miss file from next data time (#9245)
---
.../agent/plugin/task/filecollect/LogFileCollectTask.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 1629b89595..26c61efa7a 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -325,7 +325,7 @@ public class LogFileCollectTask extends Task {
for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
Map<String, InstanceProfile> sameDataTimeEvents = entry.getValue();
if (sameDataTimeEvents.isEmpty()) {
- return;
+ continue;
}
/*
* Calculate whether the event needs to be processed at the
current time based on its data time, business
@@ -442,10 +442,10 @@ public class LogFileCollectTask extends Task {
private void handleFilePath(Path filePath, WatchEntity entity) {
String newFileName = filePath.toFile().getAbsolutePath();
- LOGGER.info("[New File] {} {}", newFileName,
entity.getOriginPattern());
+ LOGGER.info("new file {} {}", newFileName, entity.getOriginPattern());
Matcher matcher = entity.getPattern().matcher(newFileName);
if (matcher.matches() || matcher.lookingAt()) {
- LOGGER.info("[Matched File] {} {}", newFileName,
entity.getOriginPattern());
+ LOGGER.info("matched file {} {}", newFileName,
entity.getOriginPattern());
String dataTime = getDataTimeFromFileName(newFileName,
entity.getOriginPattern(),
entity.getDateExpression());
if (!checkFileNameForTime(newFileName, entity)) {
@@ -458,10 +458,14 @@ public class LogFileCollectTask extends Task {
private void addToEvenMap(String fileName, String dataTime) {
if (isInEventMap(fileName, dataTime)) {
+ LOGGER.info("addToEvenMap isInEventMap returns true skip taskId {}
dataTime {} fileName {}",
+ taskProfile.getTaskId(), dataTime, fileName);
return;
}
Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName);
if (!instanceManager.shouldAddAgain(fileName, fileUpdateTime)) {
+ LOGGER.info("addToEvenMap shouldAddAgain returns false skip taskId
{} dataTime {} fileName {}",
+ taskProfile.getTaskId(), dataTime, fileName);
return;
}
Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.computeIfAbsent(dataTime,
@@ -474,6 +478,7 @@ public class LogFileCollectTask extends Task {
InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE,
fileName, dataTime, fileUpdateTime);
sameDataTimeEvents.put(fileName, instanceProfile);
+ LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
}
private boolean checkFileNameForTime(String newFileName, WatchEntity
entity) {