This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d4d4dd5b4 [INLONG-6332][Agent] Fix reboot will reset file position
error (#6333)
d4d4dd5b4 is described below
commit d4d4dd5b4cae8240f03ddf413f2abf13fa979456
Author: ganfengtan <[email protected]>
AuthorDate: Fri Nov 4 21:18:58 2022 +0800
[INLONG-6332][Agent] Fix reboot will reset file position error (#6333)
Co-authored-by: healchow <[email protected]>
---
.../agent/core/task/TaskPositionManager.java | 2 +
.../sources/reader/file/MonitorTextFile.java | 101 ++++++++++++---------
2 files changed, 60 insertions(+), 43 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index c3bfa0abe..c5b944210 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -139,6 +139,8 @@ public class TaskPositionManager extends AbstractDaemon {
ConcurrentHashMap<String, Long> positionTemp = new
ConcurrentHashMap<>();
ConcurrentHashMap<String, Long> position =
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
if (position == null) {
+ JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
+ positionTemp.put(sourcePath, jobProfile.getLong(sourcePath +
POSITION_SUFFIX, 0));
position = positionTemp;
}
Long beforePosition = position.getOrDefault(sourcePath, 0L);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index d8160bd8b..1f5958cdf 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -21,10 +21,10 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.BasicFileAttributes;
-import java.util.Objects;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -35,26 +35,28 @@ import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_EXP
import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INTERVAL;
/**
- * monitor files
+ * Monitor for text files
*/
public final class MonitorTextFile {
private static final Logger LOGGER =
LoggerFactory.getLogger(MonitorTextFile.class);
- private static volatile MonitorTextFile monitorTextFile = null;
// monitor thread pool
- private final ThreadPoolExecutor runningPool = new ThreadPoolExecutor(
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new AgentThreadFactory("monitor-file"));
+ private static volatile MonitorTextFile monitorTextFile = null;
+
private MonitorTextFile() {
}
/**
- * Mode of singleton
- * @return MonitorTextFile instance
+ * Get a singleton instance of MonitorTextFile.
+ *
+ * @return monitor text file instance
*/
public static MonitorTextFile getInstance() {
if (monitorTextFile == null) {
@@ -68,37 +70,35 @@ public final class MonitorTextFile {
}
public void monitor(FileReaderOperator fileReaderOperator, TextFileReader
textFileReader) {
- MonitorEventRunnable monitorEvent = new
MonitorEventRunnable(fileReaderOperator, textFileReader);
- runningPool.execute(monitorEvent);
+ EXECUTOR_SERVICE.execute(new MonitorEventRunnable(fileReaderOperator,
textFileReader));
}
/**
- * monitor file event
+ * Runnable for monitor the file event
*/
- private class MonitorEventRunnable implements Runnable {
+ private static class MonitorEventRunnable implements Runnable {
private static final int WAIT_TIME = 30;
private final FileReaderOperator fileReaderOperator;
private final TextFileReader textFileReader;
private final Long interval;
private final long startTime = System.currentTimeMillis();
+ private long lastFlushTime = System.currentTimeMillis();
private String path;
- /**
- * the last modify time of the file
- */
+
+ // the last modify time of the file
private BasicFileAttributes attributesBefore;
- public MonitorEventRunnable(FileReaderOperator fileReaderOperator,
TextFileReader textFileReader) {
- this.fileReaderOperator = fileReaderOperator;
+ public MonitorEventRunnable(FileReaderOperator readerOperator,
TextFileReader textFileReader) {
+ this.fileReaderOperator = readerOperator;
this.textFileReader = textFileReader;
- this.interval = Long
-
.parseLong(fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_INTERVAL,
INTERVAL_MILLISECONDS));
+ this.interval = Long.parseLong(
+ readerOperator.jobConf.get(JOB_FILE_MONITOR_INTERVAL,
INTERVAL_MILLISECONDS));
try {
- this.attributesBefore = Files
- .readAttributes(fileReaderOperator.file.toPath(),
BasicFileAttributes.class);
- this.path = this.fileReaderOperator.file.getCanonicalPath();
+ this.attributesBefore =
Files.readAttributes(readerOperator.file.toPath(), BasicFileAttributes.class);
+ this.path = readerOperator.file.getCanonicalPath();
} catch (IOException e) {
- LOGGER.error("get {} last modify time error:",
fileReaderOperator.file.getName(), e);
+ LOGGER.error("get {} last modify time error:",
readerOperator.file.getName(), e);
}
}
@@ -106,10 +106,10 @@ public final class MonitorTextFile {
public void run() {
try {
TimeUnit.SECONDS.sleep(WAIT_TIME);
- LOGGER.info("start {} monitor",
this.fileReaderOperator.file.getAbsolutePath());
- while (!this.fileReaderOperator.finished) {
- long expireTime = Long.parseLong(fileReaderOperator.jobConf
- .get(JOB_FILE_MONITOR_EXPIRE,
JOB_FILE_MONITOR_DEFAULT_EXPIRE));
+ LOGGER.info("start {} monitor",
fileReaderOperator.file.getAbsolutePath());
+ while (!fileReaderOperator.finished) {
+ long expireTime = Long.parseLong(
+
fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_EXPIRE,
JOB_FILE_MONITOR_DEFAULT_EXPIRE));
long currentTime = System.currentTimeMillis();
if (expireTime !=
Long.parseLong(JOB_FILE_MONITOR_DEFAULT_EXPIRE)
&& currentTime - this.startTime > expireTime) {
@@ -119,39 +119,54 @@ public final class MonitorTextFile {
TimeUnit.MILLISECONDS.sleep(interval);
}
} catch (Exception e) {
- LOGGER.error("monitor {} error:",
this.fileReaderOperator.file.getName(), e);
+ LOGGER.error(String.format("monitor %s error",
fileReaderOperator.file.getName()), e);
}
}
private void listen() throws IOException {
BasicFileAttributes attributesAfter;
String currentPath;
+ File file = fileReaderOperator.file;
try {
- attributesAfter = Files
- .readAttributes(this.fileReaderOperator.file.toPath(),
BasicFileAttributes.class);
- currentPath = this.fileReaderOperator.file.getCanonicalPath();
+ attributesAfter = Files.readAttributes(file.toPath(),
BasicFileAttributes.class);
+ currentPath = file.getCanonicalPath();
} catch (Exception e) {
- // Set position 0 when split file
- this.fileReaderOperator.position = 0;
- LOGGER.error("monitor {} error, reset position is 0:",
this.fileReaderOperator.file.getName(), e);
+ // set position 0 when split file
+ fileReaderOperator.position = 0;
+ LOGGER.error(String.format("monitor file %s error, reset
position to 0", file.getName()), e);
return;
}
- // If change symbolic links
+
+ // if change symbolic links
if (attributesAfter.isSymbolicLink() && !path.equals(currentPath))
{
- this.fileReaderOperator.position = 0;
+ fileReaderOperator.position = 0;
path = currentPath;
}
if
(attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime())
< 0) {
- // Not triggered during data sending
- if (Objects.nonNull(this.fileReaderOperator.iterator) &&
this.fileReaderOperator.iterator
- .hasNext()) {
- return;
- }
- this.textFileReader.getData();
- this.textFileReader.mergeData(this.fileReaderOperator);
- this.attributesBefore = attributesAfter;
- this.fileReaderOperator.iterator =
fileReaderOperator.stream.iterator();
+ // not triggered during data sending
+ getFileData();
+ attributesBefore = attributesAfter;
+ return;
+ }
+ lastFlushData();
+ }
+
+ private void lastFlushData() throws IOException {
+ long currentTime = System.currentTimeMillis();
+ if (interval * 100 > currentTime - lastFlushTime) {
+ return;
+ }
+ getFileData();
+ }
+
+ private void getFileData() throws IOException {
+ if (fileReaderOperator.iterator != null &&
fileReaderOperator.iterator.hasNext()) {
+ return;
}
+ this.textFileReader.getData();
+ this.textFileReader.mergeData(this.fileReaderOperator);
+ this.fileReaderOperator.iterator =
fileReaderOperator.stream.iterator();
+ this.lastFlushTime = System.currentTimeMillis();
}
}
}