This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e29b8512 [INLONG-8251][Agent] Add global memory limit for file 
collect (#8253)
32e29b8512 is described below

commit 32e29b85125e1750e6accf9331e2f4f7939b852c
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Jun 16 10:29:54 2023 +0800

    [INLONG-8251][Agent] Add global memory limit for file collect (#8253)
---
 .../inlong/agent/constant/CommonConstants.java     |   5 +-
 .../inlong/agent/constant/FetcherConstants.java    |  12 ++
 .../inlong/agent/message/PackProxyMessage.java     |  11 +-
 .../org/apache/inlong/agent/core/AgentManager.java |  14 +-
 .../apache/inlong/agent/core/HeartbeatManager.java |  12 ++
 .../apache/inlong/agent/core/job/JobManager.java   |   1 +
 .../inlong/agent/core/task/MemoryManager.java      | 115 ++++++++++++
 ...skPositionManager.java => PositionManager.java} |  91 ++++-----
 .../inlong/agent/core/task/TestMemoryManager.java  |  84 +++++++++
 .../inlong/agent/plugin/sinks/KafkaSink.java       |   6 +-
 .../inlong/agent/plugin/sinks/ProxySink.java       |  30 +--
 .../inlong/agent/plugin/sinks/PulsarSink.java      |   6 +-
 .../inlong/agent/plugin/sinks/SenderManager.java   |  81 +++-----
 .../agent/plugin/sources/TextFileSource.java       |   5 -
 .../sources/reader/file/FileReaderOperator.java    | 203 +++++++++++++--------
 .../sources/reader/file/MonitorTextFile.java       |  25 +--
 .../org/apache/inlong/agent/plugin/MiniAgent.java  |  14 +-
 .../apache/inlong/agent/plugin/TestFileAgent.java  |   2 +-
 .../agent/plugin/trigger/TestTriggerManager.java   |  12 +-
 19 files changed, 492 insertions(+), 237 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 81fbc7b202..1cd9cbf029 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -64,9 +64,6 @@ public class CommonConstants {
     // max size of single batch in bytes, default is 800KB.
     public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 800000;
 
-    public static final String PROXY_MESSAGE_SEMAPHORE = "proxy.semaphore";
-    public static final int DEFAULT_PROXY_MESSAGE_SEMAPHORE = 20000;
-
     public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 
"proxy.group.queue.maxNumber";
     public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 
10000;
 
@@ -74,7 +71,7 @@ public class CommonConstants {
     public static final int DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000;
 
     public static final String PROXY_BATCH_FLUSH_INTERVAL = 
"proxy.batch.flush.interval";
-    public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 1000;
+    public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 100;
 
     public static final String PROXY_SENDER_MAX_TIMEOUT = 
"proxy.sender.maxTimeout";
     // max timeout in seconds.
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
index f45242bc1e..6b48ff1a25 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
@@ -67,4 +67,16 @@ public class FetcherConstants {
 
     public static final String AGENT_MANAGER_AUTH_SECRET_ID = 
"agent.manager.auth.secretId";
     public static final String AGENT_MANAGER_AUTH_SECRET_KEY = 
"agent.manager.auth.secretKey";
+
+    public static final String AGENT_GLOBAL_READER_SOURCE_PERMIT = 
"agent.global.reader.source.permit";
+    public static final int DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT = 16 * 
1000 * 1000;
+
+    public static final String AGENT_GLOBAL_READER_QUEUE_PERMIT = 
"agent.global.reader.queue.permit";
+    public static final int DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT = 16 * 
1000 * 1000;
+
+    public static final String AGENT_GLOBAL_CHANNEL_PERMIT = 
"agent.global.channel.permit";
+    public static final int DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT = 16 * 1000 * 
1000;
+
+    public static final String AGENT_GLOBAL_WRITER_PERMIT = 
"agent.global.writer.permit";
+    public static final int DEFAULT_AGENT_GLOBAL_WRITER_PERMIT = 96 * 1000 * 
1000;
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
index 15d558748e..8e257b3972 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
@@ -145,19 +145,18 @@ public class PackProxyMessage {
                 // pre check message size
                 ProxyMessage peekMessage = messageQueue.peek();
                 int peekMessageLength = peekMessage.getBody().length;
+                if (resultBatchSize + peekMessageLength > maxPackSize) {
+                    break;
+                }
+                ProxyMessage message = messageQueue.remove();
+                int bodySize = message.getBody().length;
                 if (peekMessageLength > maxPackSize) {
                     LOGGER.warn("message size is {}, greater than max pack 
size {}, drop it!",
                             peekMessage.getBody().length, maxPackSize);
-                    int bodySize = peekMessage.getBody().length;
                     queueSize.addAndGet(-bodySize);
                     messageQueue.remove();
                     break;
                 }
-                if (resultBatchSize + peekMessageLength > maxPackSize) {
-                    break;
-                }
-                ProxyMessage message = messageQueue.remove();
-                int bodySize = message.getBody().length;
                 resultBatchSize += bodySize;
                 // decrease queue size.
                 queueSize.addAndGet(-bodySize);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index d7b921f355..11f9ce793e 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -25,8 +25,8 @@ import org.apache.inlong.agent.conf.TriggerProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.core.conf.ConfigJetty;
 import org.apache.inlong.agent.core.job.JobManager;
+import org.apache.inlong.agent.core.task.PositionManager;
 import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
 import org.apache.inlong.agent.core.trigger.TriggerManager;
 import org.apache.inlong.agent.db.CommandDb;
 import org.apache.inlong.agent.db.Db;
@@ -57,7 +57,7 @@ public class AgentManager extends AbstractDaemon {
     private final JobManager jobManager;
     private final TaskManager taskManager;
     private final TriggerManager triggerManager;
-    private final TaskPositionManager taskPositionManager;
+    private final PositionManager positionManager;
     private final HeartbeatManager heartbeatManager;
     private final ProfileFetcher fetcher;
     private final AgentConfiguration conf;
@@ -82,7 +82,7 @@ public class AgentManager extends AbstractDaemon {
         taskManager = new TaskManager(this);
         fetcher = initFetcher(this);
         heartbeatManager = HeartbeatManager.getInstance(this);
-        taskPositionManager = TaskPositionManager.getInstance(this);
+        positionManager = PositionManager.getInstance(this);
         // need to be an option.
         if (conf.getBoolean(
                 AgentConstants.AGENT_ENABLE_HTTP, 
AgentConstants.DEFAULT_AGENT_ENABLE_HTTP)) {
@@ -174,8 +174,8 @@ public class AgentManager extends AbstractDaemon {
         return triggerManager;
     }
 
-    public TaskPositionManager getTaskPositionManager() {
-        return taskPositionManager;
+    public PositionManager getTaskPositionManager() {
+        return positionManager;
     }
 
     public TaskManager getTaskManager() {
@@ -206,7 +206,7 @@ public class AgentManager extends AbstractDaemon {
         LOGGER.info("starting heartbeat manager");
         heartbeatManager.start();
         LOGGER.info("starting task position manager");
-        taskPositionManager.start();
+        positionManager.start();
         LOGGER.info("starting read job from local");
         // read job profiles from local
         List<JobProfile> profileList = localProfile.readFromLocal();
@@ -249,7 +249,7 @@ public class AgentManager extends AbstractDaemon {
         jobManager.stop();
         taskManager.stop();
         heartbeatManager.stop();
-        taskPositionManager.stop();
+        positionManager.stop();
         agentConfMonitor.shutdown();
         this.db.close();
     }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 9214de04e8..b945969d6f 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.core.job.Job;
 import org.apache.inlong.agent.core.job.JobManager;
 import org.apache.inlong.agent.core.job.JobWrapper;
+import org.apache.inlong.agent.core.task.MemoryManager;
 import org.apache.inlong.agent.state.State;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.HttpManager;
@@ -69,6 +70,7 @@ import static 
org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
 public class HeartbeatManager extends AbstractDaemon implements 
AbstractHeartbeatManager {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HeartbeatManager.class);
+    public static final int PRINT_MEMORY_PERMIT_INTERVAL_SECOND = 60;
     private static HeartbeatManager heartbeatManager = null;
     private final JobManager jobmanager;
     private final AgentConfiguration conf;
@@ -122,6 +124,16 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
     public void start() throws Exception {
         submitWorker(snapshotReportThread());
         submitWorker(heartbeatReportThread());
+        submitWorker(printMemoryPermitThread());
+    }
+
+    private Runnable printMemoryPermitThread() {
+        return () -> {
+            while (isRunnable()) {
+                MemoryManager.getInstance().printAll();
+                
AgentUtils.silenceSleepInSeconds(PRINT_MEMORY_PERMIT_INTERVAL_SECOND);
+            }
+        };
     }
 
     private Runnable snapshotReportThread() {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 576d5682b6..c1e933d8ae 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -108,6 +108,7 @@ public class JobManager extends AbstractDaemon {
         this.dimensions = new HashMap<>();
         this.dimensions.put(KEY_COMPONENT_NAME, 
this.getClass().getSimpleName());
         this.jobMetrics = new 
AgentMetricItemSet(this.getClass().getSimpleName());
+        MetricRegister.unregister(jobMetrics);
         MetricRegister.register(jobMetrics);
     }
 
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
new file mode 100644
index 0000000000..d67a15fb5a
--- /dev/null
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.task;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT;
+
+/**
+ * used to limit global memory to avoid oom
+ */
+public class MemoryManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MemoryManager.class);
+    private static volatile MemoryManager memoryManager = null;
+    private final AgentConfiguration conf;
+    private ConcurrentHashMap<String, Semaphore> semaphoreMap = new 
ConcurrentHashMap<>();
+
+    private MemoryManager() {
+        this.conf = AgentConfiguration.getAgentConf();
+        Semaphore semaphore = null;
+        semaphore = new Semaphore(
+                conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT));
+        semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore);
+
+        semaphore = new Semaphore(
+                conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT));
+        semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore);
+
+        semaphore = new Semaphore(
+                conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT, 
DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT));
+        semaphoreMap.put(AGENT_GLOBAL_CHANNEL_PERMIT, semaphore);
+
+        semaphore = new Semaphore(
+                conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, 
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT));
+        semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore);
+    }
+
+    /**
+     * manager singleton
+     */
+    public static MemoryManager getInstance() {
+        if (memoryManager == null) {
+            synchronized (MemoryManager.class) {
+                if (memoryManager == null) {
+                    memoryManager = new MemoryManager();
+                }
+            }
+        }
+        return memoryManager;
+    }
+
+    public boolean tryAcquire(String semaphoreName, int permit) {
+        Semaphore semaphore = semaphoreMap.get(semaphoreName);
+        if (semaphore == null) {
+            LOGGER.error("tryAcquire {} not exist");
+            return false;
+        }
+        return semaphore.tryAcquire(permit);
+    }
+
+    public void release(String semaphoreName, int permit) {
+        Semaphore semaphore = semaphoreMap.get(semaphoreName);
+        if (semaphore == null) {
+            LOGGER.error("release {} not exist");
+            return;
+        }
+        semaphore.release(permit);
+    }
+
+    public void printDetail(String semaphoreName) {
+        Semaphore semaphore = semaphoreMap.get(semaphoreName);
+        if (semaphore == null) {
+            LOGGER.error("printDetail {} not exist");
+            return;
+        }
+        LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(), 
semaphore.getQueueLength(),
+                semaphoreName);
+    }
+
+    public void printAll() {
+        printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT);
+        printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT);
+        printDetail(AGENT_GLOBAL_CHANNEL_PERMIT);
+        printDetail(AGENT_GLOBAL_WRITER_PERMIT);
+    }
+}
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
similarity index 65%
rename from 
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
rename to 
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
index 85f94710d3..48f6525d01 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
@@ -39,16 +39,16 @@ import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FE
  * where key is task read file name and value is task sink position
  * note that this class is generated
  */
-public class TaskPositionManager extends AbstractDaemon {
+public class PositionManager extends AbstractDaemon {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskPositionManager.class);
-    private static volatile TaskPositionManager taskPositionManager = null;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PositionManager.class);
+    private static volatile PositionManager positionManager = null;
     private final AgentManager agentManager;
     private final JobProfileDb jobConfDb;
     private final AgentConfiguration conf;
     private ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> 
jobTaskPositionMap;
 
-    private TaskPositionManager(AgentManager agentManager) {
+    private PositionManager(AgentManager agentManager) {
         this.conf = AgentConfiguration.getAgentConf();
         this.agentManager = agentManager;
         this.jobConfDb = agentManager.getJobManager().getJobConfDb();
@@ -58,25 +58,25 @@ public class TaskPositionManager extends AbstractDaemon {
     /**
      * task position manager singleton, can only generated by agent manager
      */
-    public static TaskPositionManager getInstance(AgentManager agentManager) {
-        if (taskPositionManager == null) {
-            synchronized (TaskPositionManager.class) {
-                if (taskPositionManager == null) {
-                    taskPositionManager = new 
TaskPositionManager(agentManager);
+    public static PositionManager getInstance(AgentManager agentManager) {
+        if (positionManager == null) {
+            synchronized (PositionManager.class) {
+                if (positionManager == null) {
+                    positionManager = new PositionManager(agentManager);
                 }
             }
         }
-        return taskPositionManager;
+        return positionManager;
     }
 
     /**
      * get taskPositionManager singleton
      */
-    public static TaskPositionManager getInstance() {
-        if (taskPositionManager == null) {
+    public static PositionManager getInstance() {
+        if (positionManager == null) {
             throw new RuntimeException("task position manager has not been 
initialized by agentManager");
         }
-        return taskPositionManager;
+        return positionManager;
     }
 
     @Override
@@ -87,30 +87,34 @@ public class TaskPositionManager extends AbstractDaemon {
     private Runnable taskPositionFlushThread() {
         return () -> {
             while (isRunnable()) {
-                try {
-                    // check pending jobs and try to submit again.
-                    for (String jobId : jobTaskPositionMap.keySet()) {
-                        JobProfile jobProfile = jobConfDb.getJobById(jobId);
-                        if (jobProfile == null) {
-                            LOGGER.warn("jobProfile {} cannot be found in db, "
-                                    + "might be deleted by standalone mode, 
now delete job position in memory", jobId);
-                            deleteJobPosition(jobId);
-                            continue;
-                        }
-                        flushJobProfile(jobId, jobProfile);
-                    }
-                } catch (Throwable ex) {
-                    LOGGER.error("error caught", ex);
-                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
-                } finally {
-                    int flushTime = conf.getInt(AGENT_HEARTBEAT_INTERVAL,
-                            DEFAULT_AGENT_FETCHER_INTERVAL);
-                    AgentUtils.silenceSleepInSeconds(flushTime);
-                }
+                doFlush();
             }
         };
     }
 
+    private void doFlush() {
+        try {
+            // check pending jobs and try to submit again.
+            for (String jobId : jobTaskPositionMap.keySet()) {
+                JobProfile jobProfile = jobConfDb.getJobById(jobId);
+                if (jobProfile == null) {
+                    LOGGER.warn("jobProfile {} cannot be found in db, "
+                            + "might be deleted by standalone mode, now delete 
job position in memory", jobId);
+                    deleteJobPosition(jobId);
+                    continue;
+                }
+                flushJobProfile(jobId, jobProfile);
+            }
+        } catch (Throwable ex) {
+            LOGGER.error("error caught", ex);
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
+        } finally {
+            int flushTime = conf.getInt(AGENT_HEARTBEAT_INTERVAL,
+                    DEFAULT_AGENT_FETCHER_INTERVAL);
+            AgentUtils.silenceSleepInSeconds(flushTime);
+        }
+    }
+
     private void flushJobProfile(String jobId, JobProfile jobProfile) {
         jobTaskPositionMap.get(jobId).forEach(
                 (fileName, position) -> jobProfile.setLong(fileName + 
POSITION_SUFFIX, position));
@@ -134,17 +138,22 @@ public class TaskPositionManager extends AbstractDaemon {
     /**
      * update job sink position
      *
-     * @param newPosition
+     * @param size add this size to beforePosition
      */
-    public void updateSinkPosition(String jobInstanceId, String sourcePath, 
long newPosition) {
-        LOGGER.info("updateSinkPosition jobInstanceId {} sourcePath {} 
newPosition {}", jobInstanceId, sourcePath,
-                newPosition);
+    public void updateSinkPosition(String jobInstanceId, String sourcePath, 
long size, boolean reset) {
         ConcurrentHashMap<String, Long> positionTemp = new 
ConcurrentHashMap<>();
-        ConcurrentHashMap<String, Long> lastPosition = 
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
-        if (lastPosition == null) {
-            positionTemp.put(sourcePath, newPosition);
+        ConcurrentHashMap<String, Long> position = 
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
+        if (position == null) {
+            JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
+            positionTemp.put(sourcePath, jobProfile.getLong(sourcePath + 
POSITION_SUFFIX, 0));
+            position = positionTemp;
+        }
+
+        if (!reset) {
+            Long beforePosition = position.getOrDefault(sourcePath, 0L);
+            position.put(sourcePath, beforePosition + size);
         } else {
-            lastPosition.put(sourcePath, newPosition);
+            position.put(sourcePath, size);
         }
     }
 
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java
new file mode 100644
index 0000000000..9118230115
--- /dev/null
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.task;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT;
+
+public class TestMemoryManager {
+
+    private static AgentConfiguration conf;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        conf = AgentConfiguration.getAgentConf();
+    }
+
+    @Test
+    public void testAll() {
+        int sourcePermit = conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT);
+        int readerQueuePermit = conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
+        int channelPermit = conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT, 
DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT);
+        int writerPermit = conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, 
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT);
+
+        boolean suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
sourcePermit);
+        Assert.assertTrue(suc);
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
readerQueuePermit);
+        Assert.assertTrue(suc);
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, 
channelPermit);
+        Assert.assertTrue(suc);
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 
writerPermit);
+        Assert.assertTrue(suc);
+
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, 1);
+        Assert.assertFalse(suc);
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, 1);
+        Assert.assertFalse(suc);
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, 1);
+        Assert.assertFalse(suc);
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 1);
+        Assert.assertFalse(suc);
+
+        MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
sourcePermit);
+        MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
readerQueuePermit);
+        MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT, 
channelPermit);
+        MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
writerPermit);
+
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
sourcePermit);
+        Assert.assertTrue(suc);
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
readerQueuePermit);
+        Assert.assertTrue(suc);
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, 
channelPermit);
+        Assert.assertTrue(suc);
+        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 
writerPermit);
+        Assert.assertTrue(suc);
+    }
+
+}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
index 22f96d8eaf..1e9029e334 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
@@ -20,7 +20,7 @@ package org.apache.inlong.agent.plugin.sinks;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.PositionManager;
 import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.message.EndMessage;
 import org.apache.inlong.agent.message.PackProxyMessage;
@@ -76,7 +76,7 @@ public class KafkaSink extends AbstractSink {
     private static final ExecutorService EXECUTOR_SERVICE = new 
ThreadPoolExecutor(0, Integer.MAX_VALUE,
             60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new 
AgentThreadFactory("KafkaSink"));
     private final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
-    private TaskPositionManager taskPositionManager;
+    private PositionManager taskPositionManager;
     private volatile boolean shutdown = false;
 
     private List<MQClusterInfo> mqClusterInfos;
@@ -92,7 +92,7 @@ public class KafkaSink extends AbstractSink {
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
-        taskPositionManager = TaskPositionManager.getInstance();
+        taskPositionManager = PositionManager.getInstance();
         int sendQueueSize = agentConf.getInt(KAFKA_SINK_SEND_QUEUE_SIZE, 
DEFAULT_SEND_QUEUE_SIZE);
         kafkaSendQueue = new LinkedBlockingQueue<>(sendQueueSize);
         producerNum = agentConf.getInt(KAFKA_SINK_PRODUCER_NUM, 
DEFAULT_PRODUCER_NUM);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 248796a9ec..7bf723e965 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin.sinks;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.task.MemoryManager;
 import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.message.EndMessage;
 import org.apache.inlong.agent.message.PackProxyMessage;
@@ -43,6 +44,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
 
 /**
  * sink message data to inlong-dataproxy
@@ -68,12 +71,6 @@ public class ProxySink extends AbstractSink {
         if (message == null) {
             return;
         }
-        // if the message size is greater than max pack size,should drop it.
-        if (message.getBody().length > maxPackSize) {
-            LOGGER.warn("message size is {}, greater than max pack size {}, 
drop it!",
-                    message.getBody().length, maxPackSize);
-            return;
-        }
         boolean suc = false;
         while (!suc) {
             suc = putInCache(message);
@@ -98,6 +95,13 @@ public class ProxySink extends AbstractSink {
             }
             AtomicBoolean suc = new AtomicBoolean(false);
             ProxyMessage proxyMessage = new ProxyMessage(message);
+            boolean writerPermitSuc = MemoryManager.getInstance()
+                    .tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 
message.getBody().length);
+            if (!writerPermitSuc) {
+                LOGGER.warn("writer tryAcquire failed");
+                
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT);
+                return false;
+            }
             // add proxy message to cache.
             cache.compute(proxyMessage.getBatchKey(),
                     (s, packProxyMessage) -> {
@@ -111,11 +115,11 @@ public class ProxySink extends AbstractSink {
                         return packProxyMessage;
                     });
             if (suc.get()) {
-                // semaphore should be acquired only when the message was put 
in cache successfully
-                senderManager.acquireSemaphore(1);
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT, 
message.getBody().length);
                 // increment the count of successful sinks
                 sinkMetric.sinkSuccessCount.incrementAndGet();
             } else {
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
message.getBody().length);
                 // increment the count of failed sinks
                 sinkMetric.sinkFailCount.incrementAndGet();
             }
@@ -147,7 +151,7 @@ public class ProxySink extends AbstractSink {
      */
     private Runnable flushCache() {
         return () -> {
-            LOGGER.info("start flush cache thread for {} ProxySink", 
inlongGroupId);
+            LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName);
             while (!shutdown) {
                 try {
                     cache.forEach((batchKey, packProxyMessage) -> {
@@ -159,7 +163,6 @@ public class ProxySink extends AbstractSink {
                                     batchProxyMessage.getDataList().size(), 
jobInstanceId, sourceName,
                                     batchProxyMessage.getDataTime());
                         }
-
                     });
                 } catch (Exception ex) {
                     LOGGER.error("error caught", ex);
@@ -169,6 +172,7 @@ public class ProxySink extends AbstractSink {
                     AgentUtils.silenceSleepInMs(batchFlushInterval);
                 }
             }
+            LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName);
         };
     }
 
@@ -182,7 +186,6 @@ public class ProxySink extends AbstractSink {
         executorService.execute(flushCache());
         senderManager = new SenderManager(jobConf, inlongGroupId, sourceName);
         try {
-            senderManager.addMessageSender();
             senderManager.Start();
         } catch (Throwable ex) {
             LOGGER.error("error while init sender for group id {}", 
inlongGroupId);
@@ -193,14 +196,15 @@ public class ProxySink extends AbstractSink {
 
     @Override
     public void destroy() {
-        LOGGER.info("destroy sink which sink from source name {}", sourceName);
+        LOGGER.info("destroy sink source name {}", sourceName);
         while (!sinkFinish()) {
-            LOGGER.info("job {} wait until cache all flushed to proxy", 
jobInstanceId);
+            LOGGER.info("sourceName {} wait until cache all flushed to proxy", 
sourceName);
             AgentUtils.silenceSleepInMs(batchFlushInterval);
         }
         shutdown = true;
         executorService.shutdown();
         senderManager.Stop();
+        LOGGER.info("destroy sink source name {} end", sourceName);
     }
 
     /**
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index d2801e4b6d..348cd1c4b7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -20,7 +20,7 @@ package org.apache.inlong.agent.plugin.sinks;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.PositionManager;
 import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.message.EndMessage;
 import org.apache.inlong.agent.message.PackProxyMessage;
@@ -91,7 +91,7 @@ public class PulsarSink extends AbstractSink {
     private static final ExecutorService EXECUTOR_SERVICE = new 
ThreadPoolExecutor(0, Integer.MAX_VALUE,
             60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new 
AgentThreadFactory("PulsarSink"));
     private final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
-    private TaskPositionManager taskPositionManager;
+    private PositionManager positionManager;
     private volatile boolean shutdown = false;
     private List<MQClusterInfo> mqClusterInfos;
     private String topic;
@@ -119,7 +119,7 @@ public class PulsarSink extends AbstractSink {
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
-        taskPositionManager = TaskPositionManager.getInstance();
+        positionManager = PositionManager.getInstance();
         // agentConf
         sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE, 
DEFAULT_SEND_QUEUE_SIZE);
         sendQueueSemaphore = new Semaphore(sendQueueSize);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 615f4700bd..3447eb0058 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -21,7 +21,8 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.core.task.PositionManager;
 import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
@@ -40,14 +41,10 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -56,6 +53,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
@@ -73,18 +71,16 @@ public class SenderManager {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SenderManager.class);
     private static final SequentialID SEQUENTIAL_ID = 
SequentialID.getInstance();
-    private static final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
+    private final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
     // cache for group and sender list, share the map cross agent lifecycle.
-    private static final ConcurrentHashMap<String, List<DefaultMessageSender>> 
SENDER_MAP =
-            new ConcurrentHashMap<>();
+    private DefaultMessageSender sender;
     private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
     private final ExecutorService resendExecutorService = new 
ThreadPoolExecutor(1, 1,
             0L, TimeUnit.MILLISECONDS,
             new LinkedBlockingQueue<>(), new 
AgentThreadFactory("SendManager-Resend"));
     // sharing worker threads between sender client
     // in case of thread abusing.
-    private static final ThreadFactory SHARED_FACTORY = new 
DefaultThreadFactory("agent-client-io",
-            Thread.currentThread().isDaemon());
+    private ThreadFactory SHARED_FACTORY;
     private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
     private final String managerHost;
     private final int managerPort;
@@ -104,14 +100,12 @@ public class SenderManager {
     private final String sourcePath;
     private final boolean proxySend;
     private volatile boolean shutdown = false;
-
     // metric
     private AgentMetricItemSet metricItemSet;
     private Map<String, String> dimensions;
-    private TaskPositionManager taskPositionManager;
+    private PositionManager positionManager;
     private int ioThreadNum;
     private boolean enableBusyWait;
-    private Semaphore semaphore;
     private String authSecretId;
     private String authSecretKey;
     protected int batchFlushInterval;
@@ -144,9 +138,7 @@ public class SenderManager {
         retrySleepTime = jobConf.getLong(
                 CommonConstants.PROXY_RETRY_SLEEP, 
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
         isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE, 
CommonConstants.DEFAULT_IS_FILE);
-        taskPositionManager = TaskPositionManager.getInstance();
-        semaphore = new 
Semaphore(jobConf.getInt(CommonConstants.PROXY_MESSAGE_SEMAPHORE,
-                CommonConstants.DEFAULT_PROXY_MESSAGE_SEMAPHORE));
+        positionManager = PositionManager.getInstance();
         ioThreadNum = 
jobConf.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
                 CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
         enableBusyWait = 
jobConf.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
@@ -167,13 +159,15 @@ public class SenderManager {
         resendQueue = new LinkedBlockingQueue<>();
     }
 
-    public void Start() {
+    public void Start() throws Exception {
+        sender = createMessageSender(inlongGroupId);
         resendExecutorService.execute(flushResendQueue());
     }
 
     public void Stop() {
         shutdown = true;
         resendExecutorService.shutdown();
+        sender.close();
     }
 
     private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) 
{
@@ -190,26 +184,6 @@ public class SenderManager {
         return getMetricItem(dims);
     }
 
-    /**
-     * Select by group.
-     *
-     * @param group inlong group id
-     * @return default message sender
-     */
-    private DefaultMessageSender selectSender(String group) {
-        List<DefaultMessageSender> senderList = SENDER_MAP.get(group);
-        return senderList.get((SENDER_INDEX.getAndIncrement() & 0x7FFFFFFF) % 
senderList.size());
-    }
-
-    public void acquireSemaphore(int messageNum) {
-        try {
-            semaphore.acquire(messageNum);
-        } catch (Exception e) {
-            LOGGER.error("acquire messageNum {} fail, current semaphore {}",
-                    messageNum, semaphore.availablePermits());
-        }
-    }
-
     /**
      * sender
      *
@@ -228,28 +202,15 @@ public class SenderManager {
         proxyClientConfig.setEnableBusyWait(enableBusyWait);
         proxyClientConfig.setProtocolType(ProtocolType.TCP);
 
+        SHARED_FACTORY = new DefaultThreadFactory("agent-client-" + sourcePath,
+                Thread.currentThread().isDaemon());
+
         DefaultMessageSender sender = new 
DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
         sender.setMsgtype(msgType);
         sender.setCompress(isCompress);
         return sender;
     }
 
-    /**
-     * Add new sender for group id if max size is not satisfied.
-     */
-    public void addMessageSender() throws Exception {
-        List<DefaultMessageSender> tmpList = new ArrayList<>();
-        List<DefaultMessageSender> senderList = 
SENDER_MAP.putIfAbsent(inlongGroupId, tmpList);
-        if (senderList == null) {
-            senderList = tmpList;
-        }
-        if (senderList.size() > maxSenderPerGroup) {
-            return;
-        }
-        DefaultMessageSender sender = createMessageSender(inlongGroupId);
-        senderList.add(sender);
-    }
-
     public void sendBatch(BatchProxyMessage batchMessage) {
         sendBatchWithRetryCount(batchMessage, 0);
     }
@@ -261,7 +222,7 @@ public class SenderManager {
         boolean suc = false;
         while (!suc) {
             try {
-                selectSender(batchMessage.getGroupId()).asyncSendMessage(new 
AgentSenderCallback(batchMessage, retry),
+                sender.asyncSendMessage(new AgentSenderCallback(batchMessage, 
retry),
                         batchMessage.getDataList(), batchMessage.getGroupId(), 
batchMessage.getStreamId(),
                         batchMessage.getDataTime(), 
SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS,
                         batchMessage.getExtraMap(), proxySend);
@@ -271,13 +232,14 @@ public class SenderManager {
             } catch (Exception exception) {
                 suc = false;
                 if (retry > maxSenderRetry) {
-                    LOGGER.warn("max retry reached, retry count is {}, sleep 
and send again", retry);
+                    if (retry % 10 == 0) {
+                        LOGGER.error("max retry reached, sample log Exception 
caught", exception);
+                    }
                 } else {
                     LOGGER.error("Exception caught", exception);
                 }
                 retry++;
                 AgentUtils.silenceSleepInMs(retrySleepTime);
-
             }
         }
     }
@@ -289,7 +251,7 @@ public class SenderManager {
      */
     private Runnable flushResendQueue() {
         return () -> {
-            LOGGER.info("start flush cache thread for {} ProxySink", 
inlongGroupId);
+            LOGGER.info("start flush resend queue {}:{}", inlongGroupId, 
sourcePath);
             while (!shutdown) {
                 try {
                     AgentSenderCallback callback = resendQueue.poll(1, 
TimeUnit.SECONDS);
@@ -304,6 +266,7 @@ public class SenderManager {
                     AgentUtils.silenceSleepInMs(batchFlushInterval);
                 }
             }
+            LOGGER.info("stop flush resend queue {}:{}", inlongGroupId, 
sourcePath);
         };
     }
 
@@ -342,10 +305,12 @@ public class SenderManager {
             String jobId = batchMessage.getJobId();
             long dataTime = batchMessage.getDataTime();
             if (result != null && result.equals(SendResult.OK)) {
-                semaphore.release(msgCnt);
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int) 
batchMessage.getTotalSize());
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId, dataTime, msgCnt,
                         batchMessage.getTotalSize());
                 getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
+                PositionManager.getInstance()
+                        .updateSinkPosition(batchMessage.getJobId(), 
sourcePath, msgCnt, false);
             } else {
                 LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime 
{} fail with times {}, "
                         + "error {}", groupId, streamId, jobId, dataTime, 
retry, result);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index f288ae7b3a..d680d49f0b 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -70,7 +70,6 @@ public class TextFileSource extends AbstractSource {
             FileReaderOperator fileReader = new FileReaderOperator(file, 
startPosition);
             long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT, 
DEFAULT_JOB_READ_WAIT_TIMEOUT);
             fileReader.setWaitMillisecond(waitTimeout);
-            addValidator(filterPattern, fileReader);
             result.add(fileReader);
         }
         // increment the count of successful sources
@@ -83,8 +82,4 @@ public class TextFileSource extends AbstractSource {
         seekPosition = jobConf.getInt(file.getAbsolutePath() + 
POSITION_SUFFIX, 0);
         return seekPosition;
     }
-
-    private void addValidator(String filterPattern, FileReaderOperator 
fileReader) {
-        fileReader.addPatternValidator(filterPattern);
-    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 82c3a40592..bcd4b59272 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -22,15 +22,14 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.DataCollectType;
 import org.apache.inlong.agent.constant.JobConstants;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.core.task.PositionManager;
 import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Validator;
 import org.apache.inlong.agent.plugin.sources.reader.AbstractReader;
 import org.apache.inlong.agent.plugin.utils.FileDataUtils;
-import org.apache.inlong.agent.plugin.validator.PatternValidator;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 import com.google.gson.Gson;
@@ -54,7 +53,6 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -64,6 +62,9 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PAC
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
 import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
@@ -85,11 +86,12 @@ public class FileReaderOperator extends AbstractReader {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FileReaderOperator.class);
 
     public static final int NEVER_STOP_SIGN = -1;
-    public static final int BATCH_READ_SIZE = 10000;
-    public static final int CACHE_QUEUE_SIZE = 10 * BATCH_READ_SIZE;
-    public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-    private static final SimpleDateFormat RECORD_TIME_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-    private static final Gson GSON = new Gson();
+    public static final int BATCH_READ_LINE_COUNT = 10000;
+    public static final int BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
+    public static final int CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
+    public static int DEFAULT_BUFFER_SIZE = 64 * 1024;
+    private final SimpleDateFormat RECORD_TIME_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+    private final Gson GSON = new Gson();
 
     public File file;
     public long position = 0;
@@ -104,11 +106,11 @@ public class FileReaderOperator extends AbstractReader {
     public String fileKey = null;
     private long timeout;
     private long waitTimeout;
+    public volatile long monitorUpdateTime;
     private long lastTime = 0;
-    private List<Validator> validators = new ArrayList<>();
-    private static final byte[] inBuf = new byte[DEFAULT_BUFFER_SIZE];
-    private static int maxPackSize;
-
+    private final byte[] inBuf = new byte[DEFAULT_BUFFER_SIZE];
+    private int maxPackSize;
+    private final long monitorActiveInterval = 60 * 1000;
     private final BlockingQueue<String> queue = new 
LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
     private final StringBuffer sb = new StringBuffer();
 
@@ -124,31 +126,67 @@ public class FileReaderOperator extends AbstractReader {
         this.metadata = new HashMap<>();
     }
 
-    public FileReaderOperator(File file) {
-        this(file, 0);
-    }
-
     @Override
     public Message read() {
         String data = null;
         try {
             data = queue.poll(DEFAULT_JOB_READ_WAIT_TIMEOUT, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-            LOGGER.warn("poll {} data get interruptted.", file.getPath(), e);
+            LOGGER.warn("poll {} data get interrupted.", file.getPath(), e);
+        }
+        if (data == null) {
+            keepMonitorActive();
+            return null;
+        } else {
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
data.length());
+        }
+        Message finalMsg = createMessage(data);
+        if (finalMsg == null) {
+            return null;
         }
-        return Optional.ofNullable(data)
-                .map(this::metadataMessage)
-                .filter(this::filterMessage)
-                .map(message -> {
-                    AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, 
inlongGroupId, inlongStreamId,
-                            System.currentTimeMillis(), 1, message.length());
-                    readerMetric.pluginReadSuccessCount.incrementAndGet();
-                    readerMetric.pluginReadCount.incrementAndGet();
-                    String proxyPartitionKey = 
jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
-                    Map<String, String> header = new HashMap<>();
-                    header.put(PROXY_KEY_DATA, proxyPartitionKey);
-                    return new 
DefaultMessage(message.getBytes(StandardCharsets.UTF_8), header);
-                }).orElse(null);
+        boolean channelPermit = MemoryManager.getInstance()
+                .tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, 
finalMsg.getBody().length);
+        if (channelPermit == false) {
+            LOGGER.warn("channel tryAcquire failed");
+            
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_CHANNEL_PERMIT);
+            AgentUtils.silenceSleepInSeconds(1);
+            return null;
+        }
+        return finalMsg;
+    }
+
+    private Message createMessage(String data) {
+        String msgWithMetaData = fillMetaData(data);
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
inlongStreamId,
+                System.currentTimeMillis(), 1, msgWithMetaData.length());
+        readerMetric.pluginReadSuccessCount.incrementAndGet();
+        readerMetric.pluginReadCount.incrementAndGet();
+        String proxyPartitionKey = jobConf.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
+        Map<String, String> header = new HashMap<>();
+        header.put(PROXY_KEY_DATA, proxyPartitionKey);
+        Message finalMsg = new 
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
+        // if the message size is greater than max pack size,should drop it.
+        if (finalMsg.getBody().length > maxPackSize) {
+            LOGGER.warn("message size is {}, greater than max pack size {}, 
drop it!",
+                    finalMsg.getBody().length, maxPackSize);
+            return null;
+        }
+        return finalMsg;
+    }
+
+    public void keepMonitorActive() {
+        if (!isMonitorActive()) {
+            LOGGER.error("monitor not active, create a new one");
+            MonitorTextFile.getInstance().monitor(this);
+        }
+    }
+
+    private boolean isMonitorActive() {
+        long currentTime = System.currentTimeMillis();
+        if (currentTime - monitorUpdateTime > monitorActiveInterval) {
+            return false;
+        }
+        return true;
     }
 
     @Override
@@ -207,13 +245,6 @@ public class FileReaderOperator extends AbstractReader {
         return true;
     }
 
-    public void addPatternValidator(String pattern) {
-        if (pattern.isEmpty()) {
-            return;
-        }
-        validators.add(new PatternValidator(pattern));
-    }
-
     @Override
     public void init(JobProfile jobConf) {
         try {
@@ -228,22 +259,23 @@ public class FileReaderOperator extends AbstractReader {
                 LOGGER.warn("md5 is differ from origin, origin: {}, new {}", 
this.md5, md5);
             }
             LOGGER.info("file name for task is {}, md5 is {}", file, md5);
-
+            monitorUpdateTime = System.currentTimeMillis();
             MonitorTextFile.getInstance().monitor(this);
             if (!jobConf.get(JOB_FILE_MONITOR_STATUS, 
JOB_FILE_MONITOR_DEFAULT_STATUS)
                     .equals(JOB_FILE_MONITOR_DEFAULT_STATUS)) {
                 readEndpoint = Files.lines(file.toPath()).count();
             }
             try {
-                position = 
TaskPositionManager.getInstance().getPosition(getReadSource(), instanceId);
+                position = 
PositionManager.getInstance().getPosition(getReadSource(), instanceId);
             } catch (Exception ex) {
                 position = 0;
                 LOGGER.error("get position from position manager error, only 
occur in ut: {}", ex.getMessage());
             }
             this.bytePosition = getStartBytePosition(position);
-            LOGGER.info("FileReaderOperator init file {} instanceId {} history 
position {}", getReadSource(),
+            LOGGER.info("FileReaderOperator init file {} instanceId {} history 
position {} readEndpoint {}",
+                    getReadSource(),
                     instanceId,
-                    position);
+                    position, readEndpoint);
             if (isIncrement(jobConf)) {
                 LOGGER.info("FileReaderOperator DataCollectType INCREMENT: 
start bytePosition {},{}",
                         file.length(), file.getAbsolutePath());
@@ -251,8 +283,8 @@ public class FileReaderOperator extends AbstractReader {
                 try (LineNumberReader lineNumberReader = new 
LineNumberReader(new FileReader(file.getPath()))) {
                     lineNumberReader.skip(Long.MAX_VALUE);
                     position = lineNumberReader.getLineNumber();
-                    TaskPositionManager.getInstance().updateSinkPosition(
-                            getJobInstanceId(), getReadSource(), position);
+                    PositionManager.getInstance().updateSinkPosition(
+                            getJobInstanceId(), getReadSource(), position, 
true);
                     LOGGER.info("for increment update {}, position to {}", 
file.getAbsolutePath(), position);
 
                 } catch (IOException ex) {
@@ -260,7 +292,7 @@ public class FileReaderOperator extends AbstractReader {
                 }
             }
             try {
-                resiterMeta(jobConf);
+                registerMeta(jobConf);
             } catch (Exception ex) {
                 LOGGER.error("init metadata error", ex);
             }
@@ -278,8 +310,8 @@ public class FileReaderOperator extends AbstractReader {
             input = new RandomAccessFile(file, "r");
             while (readCount < lineNum) {
                 List<String> lines = new ArrayList<>();
-                pos = readLines(input, pos, lines,
-                        Math.min((int) (lineNum - readCount), 
FileReaderOperator.BATCH_READ_SIZE));
+                pos = readLines(input, pos, lines, Math.min((int) (lineNum - 
readCount), BATCH_READ_LINE_COUNT),
+                        BATCH_READ_LINE_TOTAL_LEN, true);
                 readCount += lines.size();
                 if (lines.size() == 0) {
                     LOGGER.error("getStartBytePosition LineNum {} larger than 
the real file");
@@ -293,7 +325,7 @@ public class FileReaderOperator extends AbstractReader {
                 input.close();
             }
         }
-        LOGGER.info("getStartBytePosition LineNum {} position {}", lineNum, 
pos);
+        LOGGER.info("getStartBytePosition {} LineNum {} position {}", 
getReadSource(), lineNum, pos);
         return pos;
     }
 
@@ -319,23 +351,26 @@ public class FileReaderOperator extends AbstractReader {
 
     @Override
     public void destroy() {
+        LOGGER.info("destroy read source name {}", getReadSource());
         finished = true;
+        while (!queue.isEmpty()) {
+            String data = null;
+            try {
+                data = queue.poll(DEFAULT_JOB_READ_WAIT_TIMEOUT, 
TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                LOGGER.warn("poll {} data get interrupted.", file.getPath(), 
e);
+            }
+            if (data != null) {
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
data.length());
+            }
+        }
         queue.clear();
+        LOGGER.info("destroy read source name {} end", getReadSource());
         LOGGER.info("destroy reader with read {} num {}",
                 metricName, readerMetric == null ? 0 : 
readerMetric.pluginReadCount.get());
     }
 
-    public boolean filterMessage(String message) {
-        if (StringUtils.isBlank(message)) {
-            return false;
-        }
-        if (validators.isEmpty()) {
-            return true;
-        }
-        return validators.stream().allMatch(v -> v.validate(message));
-    }
-
-    public String metadataMessage(String message) {
+    public String fillMetaData(String message) {
         long timestamp = System.currentTimeMillis();
         boolean isJson = FileDataUtils.isJSON(message);
         Map<String, String> mergeData = new HashMap<>(metadata);
@@ -348,7 +383,7 @@ public class FileReaderOperator extends AbstractReader {
         return !queue.isEmpty();
     }
 
-    public void resiterMeta(JobProfile jobConf) {
+    public void registerMeta(JobProfile jobConf) {
         if (!jobConf.hasKey(JOB_FILE_META_ENV_LIST)) {
             return;
         }
@@ -365,10 +400,16 @@ public class FileReaderOperator extends AbstractReader {
     }
 
     public void fetchData() throws IOException {
-        // todo: TaskPositionManager stored position should be changed to byte 
position.Now it store msg sent, so here
-        // every line (include empty line) should be sent, otherwise the read 
position will be offset when
-        // restarting and recovering. In the same time, Regex end line 
spiltted line also has this problem, because
-        // recovering is based on line position.
+        boolean readFromPosPermit = false;
+        while (readFromPosPermit == false) {
+            readFromPosPermit = MemoryManager.getInstance()
+                    .tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
+            if (readFromPosPermit == false) {
+                LOGGER.warn("fetchData tryAcquire failed");
+                
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT);
+                AgentUtils.silenceSleepInSeconds(1);
+            }
+        }
         List<String> lines = readFromPos(bytePosition);
         if (!lines.isEmpty()) {
             LOGGER.info("path is {}, line is {}, byte position is {}, reads 
data lines {}",
@@ -376,8 +417,17 @@ public class FileReaderOperator extends AbstractReader {
         }
         List<String> resultLines = lines;
         resultLines.forEach(line -> {
+            boolean offerPermit = false;
+            while (offerPermit != true) {
+                offerPermit = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
line.length());
+                if (offerPermit != true) {
+                    LOGGER.warn("offerPermit tryAcquire failed");
+                    
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT);
+                    AgentUtils.silenceSleepInSeconds(1);
+                }
+            }
             try {
-                boolean offerSuc = queue.offer(line, 1, TimeUnit.SECONDS);
+                boolean offerSuc = false;
                 while (offerSuc != true) {
                     offerSuc = queue.offer(line, 1, TimeUnit.SECONDS);
                 }
@@ -386,7 +436,9 @@ public class FileReaderOperator extends AbstractReader {
                 LOGGER.error("fetchData offer failed {}", e.getMessage());
             }
         });
+        MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
         if (position >= readEndpoint) {
+            LOGGER.info("read to the end, set finished position {} 
readEndpoint {}", position, readEndpoint);
             finished = true;
         }
     }
@@ -396,9 +448,8 @@ public class FileReaderOperator extends AbstractReader {
         RandomAccessFile input = null;
         try {
             input = new RandomAccessFile(file, "r");
-            bytePosition = readLines(input, pos, lines, 
FileReaderOperator.BATCH_READ_SIZE);
+            bytePosition = readLines(input, pos, lines, BATCH_READ_LINE_COUNT, 
BATCH_READ_LINE_TOTAL_LEN, false);
             position += lines.size();
-            
TaskPositionManager.getInstance().updateSinkPosition(getJobInstanceId(), 
getReadSource(), position);
         } catch (Exception e) {
             LOGGER.error("readFromPos error {}", e.getMessage());
         } finally {
@@ -416,7 +467,8 @@ public class FileReaderOperator extends AbstractReader {
      * @return The new position after the lines have been read
      * @throws java.io.IOException if an I/O error occurs.
      */
-    private static long readLines(RandomAccessFile reader, long pos, 
List<String> lines, int maxLineCount)
+    private long readLines(RandomAccessFile reader, long pos, List<String> 
lines, int maxLineCount, int maxLineTotalLen,
+            boolean isCounting)
             throws IOException {
         if (maxLineCount == 0) {
             return pos;
@@ -425,6 +477,7 @@ public class FileReaderOperator extends AbstractReader {
         reader.seek(pos);
         long rePos = pos; // position to re-read
         int num;
+        int lineTotalLen = 0;
         LOGGER.debug("readLines from {}", pos);
         boolean overLen = false;
         while ((num = reader.read(inBuf)) != -1) {
@@ -433,7 +486,13 @@ public class FileReaderOperator extends AbstractReader {
                 byte ch = inBuf[i];
                 switch (ch) {
                     case '\n':
-                        lines.add(new String(baos.toByteArray()));
+                        if (isCounting) {
+                            lines.add(new String(""));
+                        } else {
+                            String temp = new String(baos.toByteArray(), 
StandardCharsets.UTF_8);
+                            lines.add(temp);
+                            lineTotalLen += temp.length();
+                        }
                         rePos = pos + i + 1;
                         if (overLen) {
                             LOGGER.warn("readLines over len finally string len 
{}",
@@ -451,11 +510,11 @@ public class FileReaderOperator extends AbstractReader {
                             overLen = true;
                         }
                 }
-                if (lines.size() >= maxLineCount) {
+                if (lines.size() >= maxLineCount || lineTotalLen >= 
maxLineTotalLen) {
                     break;
                 }
             }
-            if (lines.size() >= maxLineCount) {
+            if (lines.size() >= maxLineCount || lineTotalLen >= 
maxLineTotalLen) {
                 break;
             }
             if (i == num) {
@@ -477,7 +536,7 @@ public class FileReaderOperator extends AbstractReader {
                 isFirst = false;
             }
         }
-        LOGGER.info("is first store job {}, {}", file.getAbsolutePath(), 
isFirst);
+        LOGGER.info("isFirst {}, {}", file.getAbsolutePath(), isFirst);
         return isFirst;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index 531b5d0821..33207f5bd2 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.agent.plugin.sources.reader.file;
 
 import org.apache.inlong.agent.common.AgentThreadFactory;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.PositionManager;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -43,7 +43,9 @@ import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INT
 public final class MonitorTextFile {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MonitorTextFile.class);
-    // monitor thread pool
+    /**
+     * monitor thread pool
+      */
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
             60L, TimeUnit.SECONDS,
@@ -114,13 +116,17 @@ public final class MonitorTextFile {
                     long currentTime = System.currentTimeMillis();
                     if (expireTime != 
Long.parseLong(JOB_FILE_MONITOR_DEFAULT_EXPIRE)
                             && currentTime - this.startTime > expireTime) {
+                        LOGGER.info("monitor expire in {}", expireTime);
                         break;
                     }
                     if (fileReaderOperator.inited) {
                         listen();
                     }
+                    fileReaderOperator.monitorUpdateTime = currentTime;
                     TimeUnit.MILLISECONDS.sleep(interval);
                 }
+                LOGGER.info("Job {} stop monitor {}",
+                        fileReaderOperator.instanceId, 
fileReaderOperator.file.getAbsolutePath());
             } catch (Exception e) {
                 LOGGER.error(String.format("monitor %s error", 
fileReaderOperator.file.getName()), e);
             }
@@ -164,7 +170,7 @@ public final class MonitorTextFile {
         }
 
         /**
-         * reset the position and bytePosition
+         * Reset the position and bytePosition
          */
         private void resetPosition() {
             LOGGER.info("reset position {}", fileReaderOperator.file.toPath());
@@ -173,26 +179,23 @@ public final class MonitorTextFile {
 
             String jobInstanceId = fileReaderOperator.getJobInstanceId();
             if (jobInstanceId != null) {
-                TaskPositionManager.getInstance().updateSinkPosition(
-                        jobInstanceId, fileReaderOperator.getReadSource(), 0);
+                PositionManager.getInstance().updateSinkPosition(
+                        jobInstanceId, fileReaderOperator.getReadSource(), 0, 
true);
             }
         }
 
         /**
          * Determine whether the inode has changed
          *
-         * @param currentFileKey
-         * @return
+         * @param currentFileKey current file key
+         * @return true if the inode changed, otherwise false
          */
         private boolean isInodeChanged(String currentFileKey) {
             if (fileReaderOperator.fileKey == null || currentFileKey == null) {
                 return false;
             }
 
-            if (fileReaderOperator.fileKey.equals(currentFileKey)) {
-                return false;
-            }
-            return true;
+            return !fileReaderOperator.fileKey.equals(currentFileKey);
         }
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
index ce02d1a21c..113ea65465 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
@@ -23,7 +23,7 @@ import org.apache.inlong.agent.conf.ProfileFetcher;
 import org.apache.inlong.agent.conf.TriggerProfile;
 import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.core.HeartbeatManager;
-import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.core.task.PositionManager;
 
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.api.support.membermodification.MemberModifier;
@@ -55,16 +55,16 @@ public class MiniAgent {
     }
 
     private void init() throws Exception {
-        TaskPositionManager taskPositionManager = 
PowerMockito.mock(TaskPositionManager.class);
+        PositionManager positionManager = 
PowerMockito.mock(PositionManager.class);
         HeartbeatManager heartbeatManager = 
PowerMockito.mock(HeartbeatManager.class);
         ProfileFetcher profileFetcher = 
PowerMockito.mock(ProfileFetcher.class);
-        PowerMockito.doNothing().when(taskPositionManager, "start");
-        PowerMockito.doNothing().when(taskPositionManager, "stop");
+        PowerMockito.doNothing().when(positionManager, "start");
+        PowerMockito.doNothing().when(positionManager, "stop");
         PowerMockito.doNothing().when(heartbeatManager, "start");
         PowerMockito.doNothing().when(heartbeatManager, "stop");
         PowerMockito.doNothing().when(profileFetcher, "start");
         PowerMockito.doNothing().when(profileFetcher, "stop");
-        MemberModifier.field(AgentManager.class, 
"taskPositionManager").set(manager, taskPositionManager);
+        MemberModifier.field(AgentManager.class, 
"positionManager").set(manager, positionManager);
         MemberModifier.field(AgentManager.class, 
"heartbeatManager").set(manager, heartbeatManager);
         MemberModifier.field(AgentManager.class, "fetcher").set(manager, 
profileFetcher);
     }
@@ -105,8 +105,8 @@ public class MiniAgent {
 
     public void cleanupTriggers() {
         triggerProfileCache
-                .forEach(triggerProfile -> 
manager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(),
-                        false));
+                .forEach(triggerProfile -> manager.getTriggerManager()
+                        .deleteTrigger(triggerProfile.getTriggerId(), false));
         triggerProfileCache.clear();
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 547c132f00..13df91dc97 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -75,7 +75,7 @@ public class TestFileAgent {
             agent.start();
             testRootDir = helper.getTestRootDir();
         } catch (Exception e) {
-            LOGGER.error("setup failure");
+            LOGGER.error("setup failure", e);
         }
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
index 8ea308ec69..1e3a19f077 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
@@ -92,6 +92,7 @@ public class TestTriggerManager {
 
     @Test
     public void testRestartTriggerJobRestore() throws Exception {
+
         TriggerProfile triggerProfile1 = 
TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE);
         triggerProfile1.set(JobConstants.JOB_ID, "1");
         triggerProfile1.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
@@ -100,16 +101,15 @@ public class TestTriggerManager {
 
         WATCH_FOLDER.newFolder("tmp");
         TestUtils.createHugeFiles("1.log", 
WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd");
-        System.out.println(" task size " + 
agent.getManager().getTaskManager().getTaskSize());
+        LOGGER.info("testRestartTriggerJobRestore 1 task size " + 
agent.getManager().getTaskManager().getTaskSize());
         await().atMost(10, TimeUnit.SECONDS).until(() -> 
agent.getManager().getTaskManager().getTaskSize() == 1);
-
+        LOGGER.info("testRestartTriggerJobRestore 2 task size " + 
agent.getManager().getTaskManager().getTaskSize());
         agent.restart();
-        await().atMost(10, TimeUnit.SECONDS).until(() -> 
agent.getManager().getTaskManager().getTaskSize() == 1);
-
+        LOGGER.info("testRestartTriggerJobRestore 3 task size " + 
agent.getManager().getTaskManager().getTaskSize());
+        await().atMost(30, TimeUnit.SECONDS).until(() -> 
agent.getManager().getTaskManager().getTaskSize() == 1);
+        LOGGER.info("testRestartTriggerJobRestore 4 task size " + 
agent.getManager().getTaskManager().getTaskSize());
         // cleanup
         TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() + 
"/1.log");
-        TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() + 
"/2.log");
-        TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() + 
"/tmp/3.log");
     }
 
     @Test

Reply via email to