This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d7cfb74ce [INLONG-8180][Agent] Improve the efficiency and safety of 
log file reading (#8182)
1d7cfb74ce is described below

commit 1d7cfb74ceb44219882b889ae736067bc4f33231
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Jun 14 09:45:19 2023 +0800

    [INLONG-8180][Agent] Improve the efficiency and safety of log file reading 
(#8182)
    
    Co-authored-by: wenweihuang <[email protected]>
---
 .../inlong/agent/constant/CommonConstants.java     |   3 -
 .../inlong/agent/message/PackProxyMessage.java     |  19 +-
 .../apache/inlong/agent/core/task/TaskManager.java |   1 +
 .../agent/core/task/TaskPositionManager.java       |  38 ++-
 .../inlong/agent/plugin/sinks/KafkaSink.java       |   3 -
 .../inlong/agent/plugin/sinks/ProxySink.java       |  13 +
 .../inlong/agent/plugin/sinks/PulsarSink.java      |   5 +-
 .../inlong/agent/plugin/sinks/SenderManager.java   |   3 -
 .../sources/reader/file/FileReaderOperator.java    | 278 ++++++++++-----------
 .../sources/reader/file/MonitorTextFile.java       |  23 +-
 .../agent/plugin/sources/TestTextFileReader.java   |  15 +-
 .../inlong/agent/plugin/task/TestTextFileTask.java |   5 +-
 12 files changed, 194 insertions(+), 212 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 84972767f9..81fbc7b202 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -58,9 +58,6 @@ public class CommonConstants {
     // max size of message list
     public static final String PROXY_PACKAGE_MAX_SIZE = 
"proxy.package.maxSize";
 
-    // determine if the send method is sync or async
-    public static final String PROXY_SEND_SYNC = "proxy.sync";
-
     // the same task must have the same Partition Key if choose sync
     public static final String PROXY_SEND_PARTITION_KEY = "proxy.partitionKey";
 
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
index 4f37de51a7..15d558748e 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
@@ -144,15 +144,12 @@ public class PackProxyMessage {
             while (!messageQueue.isEmpty()) {
                 // pre check message size
                 ProxyMessage peekMessage = messageQueue.peek();
-                if (peekMessage == null) {
-                    break;
-                }
-
-                // if the message size is greater than max pack size,should 
drop it.
                 int peekMessageLength = peekMessage.getBody().length;
                 if (peekMessageLength > maxPackSize) {
                     LOGGER.warn("message size is {}, greater than max pack 
size {}, drop it!",
                             peekMessage.getBody().length, maxPackSize);
+                    int bodySize = peekMessage.getBody().length;
+                    queueSize.addAndGet(-bodySize);
                     messageQueue.remove();
                     break;
                 }
@@ -160,13 +157,11 @@ public class PackProxyMessage {
                     break;
                 }
                 ProxyMessage message = messageQueue.remove();
-                if (message != null) {
-                    int bodySize = message.getBody().length;
-                    resultBatchSize += bodySize;
-                    // decrease queue size.
-                    queueSize.addAndGet(-bodySize);
-                    result.add(message.getBody());
-                }
+                int bodySize = message.getBody().length;
+                resultBatchSize += bodySize;
+                // decrease queue size.
+                queueSize.addAndGet(-bodySize);
+                result.add(message.getBody());
             }
             // make sure result is not empty.
             if (!result.isEmpty()) {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 344cafba2e..1aa03e7d94 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -81,6 +81,7 @@ public class TaskManager extends AbstractDaemon {
         this.taskMetrics = new 
AgentMetricItemSet(this.getClass().getSimpleName());
         this.dimensions = new HashMap<>();
         this.dimensions.put(KEY_COMPONENT_NAME, 
this.getClass().getSimpleName());
+        MetricRegister.unregister(taskMetrics);
         MetricRegister.register(taskMetrics);
 
         tasks = new ConcurrentHashMap<>();
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index 882578a880..85f94710d3 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -134,33 +134,29 @@ public class TaskPositionManager extends AbstractDaemon {
     /**
      * update job sink position
      *
-     * @param size add this size to beforePosition
+     * @param newPosition
      */
-    public void updateSinkPosition(String jobInstanceId, String sourcePath, 
long size, boolean reset) {
+    public void updateSinkPosition(String jobInstanceId, String sourcePath, 
long newPosition) {
+        LOGGER.info("updateSinkPosition jobInstanceId {} sourcePath {} 
newPosition {}", jobInstanceId, sourcePath,
+                newPosition);
         ConcurrentHashMap<String, Long> positionTemp = new 
ConcurrentHashMap<>();
-        ConcurrentHashMap<String, Long> position = 
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
-        if (position == null) {
-            JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
-            if (jobProfile == null) {
-                return;
-            }
-            positionTemp.put(sourcePath, jobProfile.getLong(sourcePath + 
POSITION_SUFFIX, 0));
-            position = positionTemp;
-        }
-
-        if (!reset) {
-            Long beforePosition = position.getOrDefault(sourcePath, 0L);
-            position.put(sourcePath, beforePosition + size);
+        ConcurrentHashMap<String, Long> lastPosition = 
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
+        if (lastPosition == null) {
+            positionTemp.put(sourcePath, newPosition);
         } else {
-            position.put(sourcePath, size);
+            lastPosition.put(sourcePath, newPosition);
         }
     }
 
-    public ConcurrentHashMap<String, Long> getTaskPositionMap(String jobId) {
-        return jobTaskPositionMap.get(jobId);
-    }
+    public long getPosition(String sourcePath, String jobInstanceId) {
+        JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
+        if (jobProfile == null) {
+            LOGGER.error("getPosition but jobProfile not exist! sourcePath {} 
jobInstanceId {} return position 0",
+                    sourcePath,
+                    jobInstanceId);
+            return 0;
+        }
 
-    public ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> 
getJobTaskPosition() {
-        return jobTaskPositionMap;
+        return jobProfile.getLong(sourcePath + POSITION_SUFFIX, 0);
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
index 0c1b3342fc..22f96d8eaf 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
@@ -255,9 +255,6 @@ public class KafkaSink extends AbstractSink {
                 batchMsg.getStreamId(), batchMsg.getDataTime(), 
batchMsg.getMsgCnt(),
                 batchMsg.getTotalSize());
         sinkMetric.pluginSendSuccessCount.addAndGet(batchMsg.getMsgCnt());
-        if (sourceName != null) {
-            taskPositionManager.updateSinkPosition(batchMsg.getJobId(), 
sourceName, batchMsg.getMsgCnt(), false);
-        }
     }
 
     private KafkaProducer<String, byte[]> selectProducer() {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 3f75fb748f..248796a9ec 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -41,6 +41,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
 
 /**
  * sink message data to inlong-dataproxy
@@ -56,12 +58,22 @@ public class ProxySink extends AbstractSink {
     private SenderManager senderManager;
     private byte[] fieldSplitter;
     private volatile boolean shutdown = false;
+    private int maxPackSize;
 
     public ProxySink() {
     }
 
     @Override
     public void write(Message message) {
+        if (message == null) {
+            return;
+        }
+        // if the message size is greater than max pack size,should drop it.
+        if (message.getBody().length > maxPackSize) {
+            LOGGER.warn("message size is {}, greater than max pack size {}, 
drop it!",
+                    message.getBody().length, maxPackSize);
+            return;
+        }
         boolean suc = false;
         while (!suc) {
             suc = putInCache(message);
@@ -163,6 +175,7 @@ public class ProxySink extends AbstractSink {
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
+        this.maxPackSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
         messageFilter = initMessageFilter(jobConf);
         fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER, 
DEFAULT_FIELD_SPLITTER).getBytes(
                 StandardCharsets.UTF_8);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index c3284c88d3..d2801e4b6d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -197,6 +197,7 @@ public class PulsarSink extends AbstractSink {
             AgentUtils.silenceSleepInMs(batchFlushInterval);
         }
         shutdown = true;
+        EXECUTOR_SERVICE.shutdown();
         if (CollectionUtils.isNotEmpty(pulsarSenders)) {
             for (PulsarTopicSender sender : pulsarSenders) {
                 sender.close();
@@ -299,7 +300,6 @@ public class PulsarSink extends AbstractSink {
                     updateSuccessSendMetrics(batchMsg);
                 }
             });
-
         } else {
             try {
                 
producer.newMessage().eventTime(batchMsg.getDataTime()).value(message.buildArray()).send();
@@ -319,9 +319,6 @@ public class PulsarSink extends AbstractSink {
                 batchMsg.getStreamId(), batchMsg.getDataTime(), 
batchMsg.getMsgCnt(),
                 batchMsg.getTotalSize());
         sinkMetric.pluginSendSuccessCount.addAndGet(batchMsg.getMsgCnt());
-        if (sourceName != null) {
-            taskPositionManager.updateSinkPosition(batchMsg.getJobId(), 
sourceName, batchMsg.getMsgCnt(), false);
-        }
     }
 
     private Producer selectProducer() {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 84612d7460..615f4700bd 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -346,9 +346,6 @@ public class SenderManager {
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId, dataTime, msgCnt,
                         batchMessage.getTotalSize());
                 getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
-                if (sourcePath != null) {
-                    
taskPositionManager.updateSinkPosition(batchMessage.getJobId(), sourcePath, 
msgCnt, false);
-                }
             } else {
                 LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime 
{} fail with times {}, "
                         + "error {}", groupId, streamId, jobId, dataTime, 
retry, result);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index b84898a650..82c3a40592 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -39,12 +39,10 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.LineNumberReader;
 import java.io.RandomAccessFile;
 import java.nio.charset.StandardCharsets;
@@ -60,15 +58,13 @@ import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
 import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
-import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_DEFAULT_STATUS;
@@ -91,13 +87,13 @@ public class FileReaderOperator extends AbstractReader {
     public static final int NEVER_STOP_SIGN = -1;
     public static final int BATCH_READ_SIZE = 10000;
     public static final int CACHE_QUEUE_SIZE = 10 * BATCH_READ_SIZE;
-    private static final int LINE_SEPARATOR_SIZE = 
System.lineSeparator().getBytes(StandardCharsets.UTF_8).length;
+    public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
     private static final SimpleDateFormat RECORD_TIME_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
     private static final Gson GSON = new Gson();
 
     public File file;
-    public int position = 0;
-    public int bytePosition = 0;
+    public long position = 0;
+    public long bytePosition = 0;
     private long readEndpoint = Long.MAX_VALUE;
     public String md5;
     public Map<String, String> metadata;
@@ -110,7 +106,8 @@ public class FileReaderOperator extends AbstractReader {
     private long waitTimeout;
     private long lastTime = 0;
     private List<Validator> validators = new ArrayList<>();
-    public boolean firstStored = false;
+    private static final byte[] inBuf = new byte[DEFAULT_BUFFER_SIZE];
+    private static int maxPackSize;
 
     private final BlockingQueue<String> queue = new 
LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
     private final StringBuffer sb = new StringBuffer();
@@ -139,7 +136,6 @@ public class FileReaderOperator extends AbstractReader {
         } catch (InterruptedException e) {
             LOGGER.warn("poll {} data get interruptted.", file.getPath(), e);
         }
-
         return Optional.ofNullable(data)
                 .map(this::metadataMessage)
                 .filter(this::filterMessage)
@@ -225,6 +221,7 @@ public class FileReaderOperator extends AbstractReader {
             this.jobConf = jobConf;
             super.init(jobConf);
             this.instanceId = jobConf.getInstanceId();
+            this.maxPackSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
             initReadTimeout(jobConf);
             String md5 = AgentUtils.getFileMd5(file);
             if (StringUtils.isNotBlank(this.md5) && !this.md5.equals(md5)) {
@@ -237,21 +234,31 @@ public class FileReaderOperator extends AbstractReader {
                     .equals(JOB_FILE_MONITOR_DEFAULT_STATUS)) {
                 readEndpoint = Files.lines(file.toPath()).count();
             }
-
-            this.bytePosition = getStartBytePosition(this.position);
-
-            isFirstStore();
-
-            if (jobConf.hasKey(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE) && 
DataCollectType.INCREMENT
-                    
.equalsIgnoreCase(jobConf.get(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE))
-                    && this.firstStored) {
+            try {
+                position = 
TaskPositionManager.getInstance().getPosition(getReadSource(), instanceId);
+            } catch (Exception ex) {
+                position = 0;
+                LOGGER.error("get position from position manager error, only 
occur in ut: {}", ex.getMessage());
+            }
+            this.bytePosition = getStartBytePosition(position);
+            LOGGER.info("FileReaderOperator init file {} instanceId {} history 
position {}", getReadSource(),
+                    instanceId,
+                    position);
+            if (isIncrement(jobConf)) {
                 LOGGER.info("FileReaderOperator DataCollectType INCREMENT: 
start bytePosition {},{}",
                         file.length(), file.getAbsolutePath());
-                this.bytePosition = (int) file.length();
-
-                storeRocksDB();
+                this.bytePosition = file.length();
+                try (LineNumberReader lineNumberReader = new 
LineNumberReader(new FileReader(file.getPath()))) {
+                    lineNumberReader.skip(Long.MAX_VALUE);
+                    position = lineNumberReader.getLineNumber();
+                    TaskPositionManager.getInstance().updateSinkPosition(
+                            getJobInstanceId(), getReadSource(), position);
+                    LOGGER.info("for increment update {}, position to {}", 
file.getAbsolutePath(), position);
+
+                } catch (IOException ex) {
+                    LOGGER.error("get position error, file absolute path: {}", 
file.getAbsolutePath());
+                }
             }
-
             try {
                 resiterMeta(jobConf);
             } catch (Exception ex) {
@@ -263,6 +270,42 @@ public class FileReaderOperator extends AbstractReader {
         }
     }
 
+    private long getStartBytePosition(long lineNum) throws IOException {
+        long pos = 0;
+        long readCount = 0;
+        RandomAccessFile input = null;
+        try {
+            input = new RandomAccessFile(file, "r");
+            while (readCount < lineNum) {
+                List<String> lines = new ArrayList<>();
+                pos = readLines(input, pos, lines,
+                        Math.min((int) (lineNum - readCount), 
FileReaderOperator.BATCH_READ_SIZE));
+                readCount += lines.size();
+                if (lines.size() == 0) {
+                    LOGGER.error("getStartBytePosition LineNum {} larger than 
the real file");
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("getStartBytePosition error {}", e.getMessage());
+        } finally {
+            if (input != null) {
+                input.close();
+            }
+        }
+        LOGGER.info("getStartBytePosition LineNum {} position {}", lineNum, 
pos);
+        return pos;
+    }
+
+    private boolean isIncrement(JobProfile jobConf) {
+        if (jobConf.hasKey(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE) && 
DataCollectType.INCREMENT
+                
.equalsIgnoreCase(jobConf.get(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE))
+                && isFirstStore(jobConf)) {
+            return true;
+        }
+        return false;
+    }
+
     // default value is -1 and never stop task
     private void initReadTimeout(JobProfile jobConf) {
         int waitTime = jobConf.getInt(JOB_FILE_MAX_WAIT,
@@ -326,97 +369,38 @@ public class FileReaderOperator extends AbstractReader {
         // every line (include empty line) should be sent, otherwise the read 
position will be offset when
         // restarting and recovering. In the same time, Regex end line 
spiltted line also has this problem, because
         // recovering is based on line position.
-        List<String> lines = bytePosition == 0 ? readFromLine(position) : 
readFromPos(bytePosition);
+        List<String> lines = readFromPos(bytePosition);
         if (!lines.isEmpty()) {
             LOGGER.info("path is {}, line is {}, byte position is {}, reads 
data lines {}",
                     file.getName(), position, bytePosition, lines.size());
         }
         List<String> resultLines = lines;
-        // TODO line regular expression matching
-        if (jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
-            Pattern pattern = 
Pattern.compile(jobConf.get(JOB_FILE_LINE_END_PATTERN));
-            resultLines = lines.stream().flatMap(line -> {
-                sb.append(line + System.lineSeparator());
-                String data = sb.toString();
-                Matcher matcher = pattern.matcher(data);
-                List<String> tmpResultLines = new ArrayList<>();
-                int beginPos = 0;
-                while (matcher.find()) {
-                    String endLineStr = matcher.group();
-                    int endPos = data.indexOf(endLineStr, beginPos);
-                    tmpResultLines.add(data.substring(beginPos, endPos));
-                    beginPos = endPos + endLineStr.length();
-                }
-                String lastWord = data.substring(beginPos);
-                sb.setLength(0);
-                sb.append(lastWord);
-                return tmpResultLines.stream();
-            }).collect(Collectors.toList());
-        }
-
         resultLines.forEach(line -> {
             try {
                 boolean offerSuc = queue.offer(line, 1, TimeUnit.SECONDS);
                 while (offerSuc != true) {
                     offerSuc = queue.offer(line, 1, TimeUnit.SECONDS);
                 }
+                LOGGER.debug("Read from file {} for {}", getReadSource(), 
line);
             } catch (InterruptedException e) {
                 LOGGER.error("fetchData offer failed {}", e.getMessage());
             }
         });
-        bytePosition += lines.stream()
-                .mapToInt(line -> line.getBytes(StandardCharsets.UTF_8).length 
+ LINE_SEPARATOR_SIZE)
-                .sum();
-        position += lines.size();
         if (position >= readEndpoint) {
             finished = true;
         }
     }
 
-    private List<String> readFromLine(int lineNum) throws IOException {
-        String line = null;
-        List<String> lines = new ArrayList<>();
-        BufferedReader reader = null;
-        try {
-            reader = new BufferedReader(new InputStreamReader(new 
FileInputStream(file),
-                    StandardCharsets.UTF_8));
-            int count = 0;
-            while ((line = reader.readLine()) != null) {
-                if (++count > lineNum) {
-                    LOGGER.debug("read from line line-num {},  data {}", 
lineNum, line);
-                    lines.add(line);
-                }
-                if (lines.size() >= FileReaderOperator.BATCH_READ_SIZE) {
-                    break;
-                }
-            }
-        } catch (Exception e) {
-            LOGGER.error("readFromLine error  {}", e.getMessage());
-        } finally {
-            if (reader != null) {
-                reader.close();
-            }
-        }
-        return lines;
-    }
-
-    private List<String> readFromPos(int pos) throws IOException {
-        String line = null;
+    private List<String> readFromPos(long pos) throws IOException {
         List<String> lines = new ArrayList<>();
         RandomAccessFile input = null;
         try {
             input = new RandomAccessFile(file, "r");
-            input.skipBytes(pos);
-            while ((line = input.readLine()) != null) {
-                String lineChart = new 
String(line.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8);
-                LOGGER.debug("read from pos pos-num {}, data {}", pos, 
lineChart);
-                lines.add(lineChart);
-                if (lines.size() >= FileReaderOperator.BATCH_READ_SIZE) {
-                    break;
-                }
-            }
+            bytePosition = readLines(input, pos, lines, 
FileReaderOperator.BATCH_READ_SIZE);
+            position += lines.size();
+            
TaskPositionManager.getInstance().updateSinkPosition(getJobInstanceId(), 
getReadSource(), position);
         } catch (Exception e) {
-            LOGGER.error("readFromPos error  {}", e.getMessage());
+            LOGGER.error("readFromPos error {}", e.getMessage());
         } finally {
             if (input != null) {
                 input.close();
@@ -425,67 +409,75 @@ public class FileReaderOperator extends AbstractReader {
         return lines;
     }
 
-    private int getStartBytePosition(int lineNum) throws IOException {
-        int startBytePosition = 0;
-        BufferedReader reader = null;
-        try {
-            LOGGER.info("get start line {}", lineNum);
-            String line = null;
-            List<String> lines = new ArrayList<>();
-            reader = new BufferedReader(new InputStreamReader(new 
FileInputStream(file),
-                    StandardCharsets.UTF_8));
-            int count = 0;
-            while ((line = reader.readLine()) != null) {
-                if (++count > lineNum) {
-                    LOGGER.info("get startBytePosition end at line {}", count);
+    /**
+     * Read new lines.
+     *
+     * @param reader The file to read
+     * @return The new position after the lines have been read
+     * @throws java.io.IOException if an I/O error occurs.
+     */
+    private static long readLines(RandomAccessFile reader, long pos, 
List<String> lines, int maxLineCount)
+            throws IOException {
+        if (maxLineCount == 0) {
+            return pos;
+        }
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        reader.seek(pos);
+        long rePos = pos; // position to re-read
+        int num;
+        LOGGER.debug("readLines from {}", pos);
+        boolean overLen = false;
+        while ((num = reader.read(inBuf)) != -1) {
+            int i = 0;
+            for (; i < num; i++) {
+                byte ch = inBuf[i];
+                switch (ch) {
+                    case '\n':
+                        lines.add(new String(baos.toByteArray()));
+                        rePos = pos + i + 1;
+                        if (overLen) {
+                            LOGGER.warn("readLines over len finally string len 
{}",
+                                    new String(baos.toByteArray()).length());
+                        }
+                        baos.reset();
+                        overLen = false;
+                        break;
+                    case '\r':
+                        break;
+                    default:
+                        if (baos.size() < maxPackSize) {
+                            baos.write(ch);
+                        } else {
+                            overLen = true;
+                        }
+                }
+                if (lines.size() >= maxLineCount) {
                     break;
                 }
-                startBytePosition = startBytePosition
-                        + line.getBytes(StandardCharsets.UTF_8).length + 
LINE_SEPARATOR_SIZE;
             }
-        } catch (Exception e) {
-            LOGGER.error("getStartPositon err {}", e);
-        } finally {
-            if (reader != null) {
-                reader.close();
+            if (lines.size() >= maxLineCount) {
+                break;
+            }
+            if (i == num) {
+                pos = reader.getFilePointer();
             }
         }
-        LOGGER.info("getStartPositon bytePosition {}", startBytePosition);
-        return startBytePosition;
-    }
-
-    private void isFirstStore() {
-        if (!jobConf.hasKey(JobConstants.JOB_STORE_TIME)) {
-            LOGGER.info("isFirstStore {},{}", file.getAbsolutePath(), 
this.firstStored);
-            this.firstStored = true;
-            return;
-        }
-        long jobStoreTime = 
Long.parseLong(jobConf.get(JobConstants.JOB_STORE_TIME));
-        LOGGER.info("jobStoreTime {},{}", file.getAbsolutePath(), 
jobStoreTime);
-        long storeTime = AgentConfiguration.getAgentConf().getLong(
-                AgentConstants.AGENT_JOB_STORE_TIME, 
AgentConstants.DEFAULT_JOB_STORE_TIME);
-
-        if (System.currentTimeMillis() - jobStoreTime > storeTime) {
-            this.firstStored = false;
-        } else {
-            this.firstStored = true;
-        }
-        LOGGER.info("isFirstStore {},{}", file.getAbsolutePath(), 
this.firstStored);
+        baos.close();
+        reader.seek(rePos); // Ensure we can re-read if necessary
+        return rePos;
     }
 
-    private void storeRocksDB() {
-        try (LineNumberReader lineNumberReader = new LineNumberReader(new 
FileReader(file.getPath()))) {
-            lineNumberReader.skip(Long.MAX_VALUE);
-            int seekPosition = lineNumberReader.getLineNumber();
-
-            String jobInstanceId = getJobInstanceId();
-            if (jobInstanceId != null) {
-                TaskPositionManager.getInstance().updateSinkPosition(
-                        jobInstanceId, getReadSource(), seekPosition, true);
-                LOGGER.info("storeRocksDB {},{}", file.getAbsolutePath(), 
seekPosition);
+    private boolean isFirstStore(JobProfile jobConf) {
+        boolean isFirst = true;
+        if (jobConf.hasKey(JobConstants.JOB_STORE_TIME)) {
+            long jobStoreTime = 
Long.parseLong(jobConf.get(JobConstants.JOB_STORE_TIME));
+            long storeTime = AgentConfiguration.getAgentConf().getLong(
+                    AgentConstants.AGENT_JOB_STORE_TIME, 
AgentConstants.DEFAULT_JOB_STORE_TIME);
+            if (System.currentTimeMillis() - jobStoreTime > storeTime) {
+                isFirst = false;
             }
-        } catch (IOException ex) {
-            LOGGER.error("get position error, file absolute path: {}", 
file.getAbsolutePath());
         }
+        LOGGER.info("is first store job {}, {}", file.getAbsolutePath(), 
isFirst);
+        return isFirst;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index 48c2215342..531b5d0821 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -43,9 +43,7 @@ import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INT
 public final class MonitorTextFile {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MonitorTextFile.class);
-    /**
-     * monitor thread pool
-      */
+    // monitor thread pool
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
             60L, TimeUnit.SECONDS,
@@ -136,12 +134,9 @@ public final class MonitorTextFile {
                 attributesAfter = Files.readAttributes(file.toPath(), 
BasicFileAttributes.class);
                 currentPath = file.getCanonicalPath();
 
-                if (attributesAfter.fileKey() == null) {
-                    return;
-                }
-
                 // Determine whether the inode has changed
                 if (isInodeChanged(attributesAfter.fileKey().toString())) {
+                    LOGGER.info("{} inode changed resetPosition", 
fileReaderOperator.file.toPath());
                     resetPosition();
                 }
                 fileReaderOperator.fileKey = 
attributesAfter.fileKey().toString();
@@ -154,6 +149,7 @@ public final class MonitorTextFile {
 
             // if change symbolic links
             if (attributesAfter.isSymbolicLink() && !path.equals(currentPath)) 
{
+                LOGGER.info("{} symbolicLink changed resetPosition", 
fileReaderOperator.file.toPath());
                 resetPosition();
                 path = currentPath;
             }
@@ -168,7 +164,7 @@ public final class MonitorTextFile {
         }
 
         /**
-         * Reset the position and bytePosition
+         * reset the position and bytePosition
          */
         private void resetPosition() {
             LOGGER.info("reset position {}", fileReaderOperator.file.toPath());
@@ -178,22 +174,25 @@ public final class MonitorTextFile {
             String jobInstanceId = fileReaderOperator.getJobInstanceId();
             if (jobInstanceId != null) {
                 TaskPositionManager.getInstance().updateSinkPosition(
-                        jobInstanceId, fileReaderOperator.getReadSource(), 0, 
true);
+                        jobInstanceId, fileReaderOperator.getReadSource(), 0);
             }
         }
 
         /**
          * Determine whether the inode has changed
          *
-         * @param currentFileKey current file key
-         * @return true if the inode changed, otherwise false
+         * @param currentFileKey
+         * @return
          */
         private boolean isInodeChanged(String currentFileKey) {
             if (fileReaderOperator.fileKey == null || currentFileKey == null) {
                 return false;
             }
 
-            return !fileReaderOperator.fileKey.equals(currentFileKey);
+            if (fileReaderOperator.fileKey.equals(currentFileKey)) {
+                return false;
+            }
+            return true;
         }
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 3979388358..76f5338beb 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -21,7 +21,6 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.DataCollectType;
 import org.apache.inlong.agent.constant.FileTriggerType;
 import org.apache.inlong.agent.constant.MetadataConstants;
-import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Reader;
@@ -70,7 +69,8 @@ import static 
org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
 import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES;
 import static org.apache.inlong.agent.constant.MetadataConstants.ENV_CVM;
 
-@PowerMockIgnore({"javax.management.*", "javax.script.*", 
"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*",
+@PowerMockIgnore({"javax.net.ssl.*", "javax.management.*", "javax.script.*", 
"com.sun.org.apache.xerces.*",
+        "javax.xml.*", "org.xml.*",
         "org.w3c.*"})
 @PrepareForTest({MetricRegister.class})
 public class TestTextFileReader {
@@ -87,7 +87,7 @@ public class TestTextFileReader {
     }
 
     @AfterClass
-    public static void teardown() {
+    public static void teardown() throws Exception {
         helper.teardownAgentHome();
     }
 
@@ -196,11 +196,11 @@ public class TestTextFileReader {
                 break;
             }
             String content = getContent(message.toString());
+            LOGGER.info("content is {}", content);
             Assert.assertTrue(
-                    content.equalsIgnoreCase("hello ")
-                            || content.equalsIgnoreCase(" aa" + 
System.lineSeparator() + "world ")
-                            || content.equalsIgnoreCase(System.lineSeparator() 
+ "agent "));
-            LOGGER.info("message is {}", message.toString());
+                    content.equalsIgnoreCase("hello line-end-symbol aa")
+                            || content.equalsIgnoreCase("world 
line-end-symbol")
+                            || content.equalsIgnoreCase("agent 
line-end-symbol"));
         }
     }
 
@@ -240,7 +240,6 @@ public class TestTextFileReader {
 
     @Test
     public void testTextSeekReader() throws Exception {
-        final AgentManager agentManager = new AgentManager();
         Path localPath = Paths.get(testDir.toString(), "test.txt");
         LOGGER.info("start to create {}", localPath);
         List<String> beforeList = new ArrayList<>();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
index c0fec85586..4c6ff071c9 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
@@ -73,7 +73,7 @@ import static 
org.powermock.api.support.membermodification.MemberMatcher.field;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({TaskManager.class, MetricRegister.class})
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*"})
 public class TestTextFileTask {
 
     public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
@@ -92,7 +92,6 @@ public class TestTextFileTask {
         taskCache = new ArrayList<>();
         TMP_FOLDER.create();
 
-        // mock metrics
         taskManager = new TaskManager(null);
         agentMetricItemSet = mock(AgentMetricItemSet.class);
         agentMetricItem = mock(AgentMetricItem.class);
@@ -106,7 +105,7 @@ public class TestTextFileTask {
     }
 
     @AfterClass
-    public static void teardown() {
+    public static void teardown() throws Exception {
         TMP_FOLDER.delete();
     }
 


Reply via email to