This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d4d4dd5b4 [INLONG-6332][Agent] Fix reboot will reset file position 
error (#6333)
d4d4dd5b4 is described below

commit d4d4dd5b4cae8240f03ddf413f2abf13fa979456
Author: ganfengtan <[email protected]>
AuthorDate: Fri Nov 4 21:18:58 2022 +0800

    [INLONG-6332][Agent] Fix reboot will reset file position error (#6333)
    
    Co-authored-by: healchow <[email protected]>
---
 .../agent/core/task/TaskPositionManager.java       |   2 +
 .../sources/reader/file/MonitorTextFile.java       | 101 ++++++++++++---------
 2 files changed, 60 insertions(+), 43 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index c3bfa0abe..c5b944210 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -139,6 +139,8 @@ public class TaskPositionManager extends AbstractDaemon {
         ConcurrentHashMap<String, Long> positionTemp = new 
ConcurrentHashMap<>();
         ConcurrentHashMap<String, Long> position = 
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
         if (position == null) {
+            JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
+            positionTemp.put(sourcePath, jobProfile.getLong(sourcePath + 
POSITION_SUFFIX, 0));
             position = positionTemp;
         }
         Long beforePosition = position.getOrDefault(sourcePath, 0L);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index d8160bd8b..1f5958cdf 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -21,10 +21,10 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.attribute.BasicFileAttributes;
-import java.util.Objects;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -35,26 +35,28 @@ import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_EXP
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INTERVAL;
 
 /**
- * monitor files
+ * Monitor for text files
  */
 public final class MonitorTextFile {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MonitorTextFile.class);
-    private static volatile MonitorTextFile monitorTextFile = null;
     // monitor thread pool
-    private final ThreadPoolExecutor runningPool = new ThreadPoolExecutor(
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
             60L, TimeUnit.SECONDS,
             new SynchronousQueue<>(),
             new AgentThreadFactory("monitor-file"));
 
+    private static volatile MonitorTextFile monitorTextFile = null;
+
     private MonitorTextFile() {
 
     }
 
     /**
-     * Mode of singleton
-     * @return MonitorTextFile instance
+     * Get a singleton instance of MonitorTextFile.
+     *
+     * @return monitor text file instance
      */
     public static MonitorTextFile getInstance() {
         if (monitorTextFile == null) {
@@ -68,37 +70,35 @@ public final class MonitorTextFile {
     }
 
     public void monitor(FileReaderOperator fileReaderOperator, TextFileReader 
textFileReader) {
-        MonitorEventRunnable monitorEvent = new 
MonitorEventRunnable(fileReaderOperator, textFileReader);
-        runningPool.execute(monitorEvent);
+        EXECUTOR_SERVICE.execute(new MonitorEventRunnable(fileReaderOperator, 
textFileReader));
     }
 
     /**
-     * monitor file event
+     * Runnable for monitor the file event
      */
-    private class MonitorEventRunnable implements Runnable {
+    private static class MonitorEventRunnable implements Runnable {
 
         private static final int WAIT_TIME = 30;
         private final FileReaderOperator fileReaderOperator;
         private final TextFileReader textFileReader;
         private final Long interval;
         private final long startTime = System.currentTimeMillis();
+        private long lastFlushTime = System.currentTimeMillis();
         private String path;
-        /**
-         * the last modify time of the file
-         */
+
+        // the last modify time of the file
         private BasicFileAttributes attributesBefore;
 
-        public MonitorEventRunnable(FileReaderOperator fileReaderOperator, 
TextFileReader textFileReader) {
-            this.fileReaderOperator = fileReaderOperator;
+        public MonitorEventRunnable(FileReaderOperator readerOperator, 
TextFileReader textFileReader) {
+            this.fileReaderOperator = readerOperator;
             this.textFileReader = textFileReader;
-            this.interval = Long
-                    
.parseLong(fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_INTERVAL, 
INTERVAL_MILLISECONDS));
+            this.interval = Long.parseLong(
+                    readerOperator.jobConf.get(JOB_FILE_MONITOR_INTERVAL, 
INTERVAL_MILLISECONDS));
             try {
-                this.attributesBefore = Files
-                        .readAttributes(fileReaderOperator.file.toPath(), 
BasicFileAttributes.class);
-                this.path = this.fileReaderOperator.file.getCanonicalPath();
+                this.attributesBefore = 
Files.readAttributes(readerOperator.file.toPath(), BasicFileAttributes.class);
+                this.path = readerOperator.file.getCanonicalPath();
             } catch (IOException e) {
-                LOGGER.error("get {} last modify time error:", 
fileReaderOperator.file.getName(), e);
+                LOGGER.error("get {} last modify time error:", 
readerOperator.file.getName(), e);
             }
         }
 
@@ -106,10 +106,10 @@ public final class MonitorTextFile {
         public void run() {
             try {
                 TimeUnit.SECONDS.sleep(WAIT_TIME);
-                LOGGER.info("start {} monitor", 
this.fileReaderOperator.file.getAbsolutePath());
-                while (!this.fileReaderOperator.finished) {
-                    long expireTime = Long.parseLong(fileReaderOperator.jobConf
-                            .get(JOB_FILE_MONITOR_EXPIRE, 
JOB_FILE_MONITOR_DEFAULT_EXPIRE));
+                LOGGER.info("start {} monitor", 
fileReaderOperator.file.getAbsolutePath());
+                while (!fileReaderOperator.finished) {
+                    long expireTime = Long.parseLong(
+                            
fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_EXPIRE, 
JOB_FILE_MONITOR_DEFAULT_EXPIRE));
                     long currentTime = System.currentTimeMillis();
                     if (expireTime != 
Long.parseLong(JOB_FILE_MONITOR_DEFAULT_EXPIRE)
                             && currentTime - this.startTime > expireTime) {
@@ -119,39 +119,54 @@ public final class MonitorTextFile {
                     TimeUnit.MILLISECONDS.sleep(interval);
                 }
             } catch (Exception e) {
-                LOGGER.error("monitor {} error:", 
this.fileReaderOperator.file.getName(), e);
+                LOGGER.error(String.format("monitor %s error", 
fileReaderOperator.file.getName()), e);
             }
         }
 
         private void listen() throws IOException {
             BasicFileAttributes attributesAfter;
             String currentPath;
+            File file = fileReaderOperator.file;
             try {
-                attributesAfter = Files
-                        .readAttributes(this.fileReaderOperator.file.toPath(), 
BasicFileAttributes.class);
-                currentPath = this.fileReaderOperator.file.getCanonicalPath();
+                attributesAfter = Files.readAttributes(file.toPath(), 
BasicFileAttributes.class);
+                currentPath = file.getCanonicalPath();
             } catch (Exception e) {
-                // Set position 0 when split file
-                this.fileReaderOperator.position = 0;
-                LOGGER.error("monitor {} error, reset position is 0:", 
this.fileReaderOperator.file.getName(), e);
+                // set position 0 when split file
+                fileReaderOperator.position = 0;
+                LOGGER.error(String.format("monitor file %s error, reset 
position to 0", file.getName()), e);
                 return;
             }
-            // If change symbolic links
+
+            // if change symbolic links
             if (attributesAfter.isSymbolicLink() && !path.equals(currentPath)) 
{
-                this.fileReaderOperator.position = 0;
+                fileReaderOperator.position = 0;
                 path = currentPath;
             }
             if 
(attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime())
 < 0) {
-                // Not triggered during data sending
-                if (Objects.nonNull(this.fileReaderOperator.iterator) && 
this.fileReaderOperator.iterator
-                        .hasNext()) {
-                    return;
-                }
-                this.textFileReader.getData();
-                this.textFileReader.mergeData(this.fileReaderOperator);
-                this.attributesBefore = attributesAfter;
-                this.fileReaderOperator.iterator = 
fileReaderOperator.stream.iterator();
+                // not triggered during data sending
+                getFileData();
+                attributesBefore = attributesAfter;
+                return;
+            }
+            lastFlushData();
+        }
+
+        private void lastFlushData() throws IOException {
+            long currentTime = System.currentTimeMillis();
+            if (interval * 100 > currentTime - lastFlushTime) {
+                return;
+            }
+            getFileData();
+        }
+
+        private void getFileData() throws IOException {
+            if (fileReaderOperator.iterator != null && 
fileReaderOperator.iterator.hasNext()) {
+                return;
             }
+            this.textFileReader.getData();
+            this.textFileReader.mergeData(this.fileReaderOperator);
+            this.fileReaderOperator.iterator = 
fileReaderOperator.stream.iterator();
+            this.lastFlushTime = System.currentTimeMillis();
         }
     }
 }

Reply via email to