This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d597adee0 [INLONG-11591][Agent] Reduce duplicate code for log 
collection type tasks (#11592)
4d597adee0 is described below

commit 4d597adee042f3154f7ad416cb6af6a3f06020ce
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Dec 10 18:08:08 2024 +0800

    [INLONG-11591][Agent] Reduce duplicate code for log collection type tasks 
(#11592)
    
    * [INLONG-11591][Agent] Reduce duplicate code for log collection type tasks
    
    * Update 
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
    
    Co-authored-by: AloysZhang <[email protected]>
    
    * [INLONG-11591][Agent] Delete useless code
    
    ---------
    
    Co-authored-by: AloysZhang <[email protected]>
---
 .../apache/inlong/agent/conf/InstanceProfile.java  |  49 +++++-
 .../org/apache/inlong/agent/conf/TaskProfile.java  |  58 +++++--
 .../inlong/agent/constant/TaskConstants.java       |   7 +-
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  23 +--
 .../agent/core/instance/InstanceManager.java       |   2 +-
 .../inlong/agent/plugin/instance/FileInstance.java |   2 +-
 .../inlong/agent/plugin/sources/LogFileSource.java |   2 +-
 .../agent/plugin/task/FormatDateLogFileTask.java   |  27 ---
 .../apache/inlong/agent/plugin/task/KafkaTask.java |   5 +-
 .../inlong/agent/plugin/task/MongoDBTask.java      |   5 +-
 .../apache/inlong/agent/plugin/task/MqttTask.java  |   6 +-
 .../inlong/agent/plugin/task/OracleTask.java       |   9 +-
 .../inlong/agent/plugin/task/PostgreSQLTask.java   |   5 +-
 .../inlong/agent/plugin/task/PulsarTask.java       |   5 +-
 .../apache/inlong/agent/plugin/task/RedisTask.java |   5 +-
 .../inlong/agent/plugin/task/SQLServerTask.java    |   5 +-
 .../plugin/task/logcollection/LogAbstractTask.java | 185 +++++++++++++++++++++
 .../task/{ => logcollection}/cos/COSTask.java      | 166 ++----------------
 .../task/{ => logcollection}/cos/FileScanner.java  |   2 +-
 .../local}/FileDataUtils.java                      |   2 +-
 .../{file => logcollection/local}/FileScanner.java |   2 +-
 .../local/FileTask.java}                           | 164 ++----------------
 .../local}/FileTimeComparator.java                 |   2 +-
 .../task/{file => logcollection/local}/Files.java  |   2 +-
 .../{file => logcollection/local}/WatchEntity.java |   2 +-
 .../agent/plugin/instance/TestInstanceManager.java |  33 +++-
 .../inlong/agent/plugin/sinks/KafkaSinkTest.java   |   4 +-
 .../inlong/agent/plugin/sinks/PulsarSinkTest.java  |   4 +-
 .../sinks/filecollect/TestSenderManager.java       |   6 +-
 .../agent/plugin/sources/TestLogFileSource.java    |   6 +-
 .../agent/plugin/sources/TestRedisSource.java      |   4 +-
 .../agent/plugin/sources/TestSQLServerSource.java  |   4 +-
 .../inlong/agent/plugin/task/TestCOSTask.java      |  16 +-
 .../inlong/agent/plugin/task/TestLogFileTask.java  |   8 +-
 .../inlong/agent/plugin/task/TestTaskManager.java  |   8 +-
 35 files changed, 389 insertions(+), 446 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index 9e85872ff5..c3b20a0581 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.conf;
 import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.utils.file.FileUtils;
 import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
 import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
 
@@ -40,12 +41,24 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;
 
 /**
  * job profile which contains details describing properties of one job.
  */
 public class InstanceProfile extends AbstractConfiguration implements 
Comparable<InstanceProfile> {
 
+    public static final String DEFAULT_FILE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.FileInstance";
+    public static final String DEFAULT_COS_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.COSInstance";
+    public static final String DEFAULT_KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
+    public static final String DEFAULT_MONGODB_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
+    public static final String DEFAULT_MQTT_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MqttInstance";
+    public static final String DEFAULT_ORACLE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.OracleInstance";
+    public static final String DEFAULT_POSTGRES_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.PostgreSQLInstance";
+    public static final String DEFAULT_PULSAR_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
+    public static final String DEFAULT_REDIS_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.RedisInstance";
+    public static final String DEFAULT_SQLSERVER_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.SQLServerInstance";
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceProfile.class);
     private static final Gson GSON = new Gson();
 
@@ -64,12 +77,40 @@ public class InstanceProfile extends AbstractConfiguration 
implements Comparable
         return GSON.toJson(getConfigStorage());
     }
 
-    public void setInstanceClass(String className) {
-        set(TaskConstants.INSTANCE_CLASS, className);
+    public String getInstanceClass() {
+        TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE, 
TaskTypeEnum.FILE.getType()));
+        return getInstanceClassByTaskType(taskType);
     }
 
-    public String getInstanceClass() {
-        return get(TaskConstants.INSTANCE_CLASS);
+    public static String getInstanceClassByTaskType(TaskTypeEnum taskType) {
+        if (taskType == null) {
+            return null;
+        }
+        switch (taskType) {
+            case FILE:
+                return DEFAULT_FILE_INSTANCE;
+            case KAFKA:
+                return DEFAULT_KAFKA_INSTANCE;
+            case PULSAR:
+                return DEFAULT_PULSAR_INSTANCE;
+            case POSTGRES:
+                return DEFAULT_POSTGRES_INSTANCE;
+            case ORACLE:
+                return DEFAULT_ORACLE_INSTANCE;
+            case SQLSERVER:
+                return DEFAULT_SQLSERVER_INSTANCE;
+            case MONGODB:
+                return DEFAULT_MONGODB_INSTANCE;
+            case REDIS:
+                return DEFAULT_REDIS_INSTANCE;
+            case MQTT:
+                return DEFAULT_MQTT_INSTANCE;
+            case COS:
+                return DEFAULT_COS_INSTANCE;
+            default:
+                LOGGER.error("invalid task type {}", taskType);
+                return null;
+        }
     }
 
     public String getTaskId() {
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index 8c509e4240..c2a60a0598 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.enums.InstanceStateEnum;
 import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 
 import com.google.gson.Gson;
@@ -32,18 +33,32 @@ import org.slf4j.LoggerFactory;
 import java.text.ParseException;
 import java.util.TimeZone;
 
+import static java.util.Objects.requireNonNull;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;
 
 /**
  * job profile which contains details describing properties of one job.
  */
 public class TaskProfile extends AbstractConfiguration {
 
+    public static final String DEFAULT_FILE_TASK = 
"org.apache.inlong.agent.plugin.task.logcollection.local.FileTask";
+    public static final String DEFAULT_COS_TASK = 
"org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask";
+    public static final String DEFAULT_KAFKA_TASK = 
"org.apache.inlong.agent.plugin.task.KafkaTask";
+    public static final String DEFAULT_PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
+    public static final String DEFAULT_MONGODB_TASK = 
"org.apache.inlong.agent.plugin.task.MongoDBTask";
+    public static final String DEFAULT_ORACLE_TASK = 
"org.apache.inlong.agent.plugin.task.OracleTask";
+    public static final String DEFAULT_REDIS_TASK = 
"org.apache.inlong.agent.plugin.task.RedisTask";
+    public static final String DEFAULT_POSTGRESQL_TASK = 
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
+    public static final String DEFAULT_MQTT_TASK = 
"org.apache.inlong.agent.plugin.task.MqttTask";
+    public static final String DEFAULT_SQLSERVER_TASK = 
"org.apache.inlong.agent.plugin.task.SQLServerTask";
+    public static final String DEFAULT_MOCK_TASK = 
"org.apache.inlong.agent.plugin.task.MockTask";
+
     private static final Gson GSON = new Gson();
     private static final Logger logger = 
LoggerFactory.getLogger(TaskProfile.class);
 
@@ -57,6 +72,37 @@ public class TaskProfile extends AbstractConfiguration {
         return TaskProfileDto.convertToTaskProfile(dataConfig);
     }
 
+    public String getTaskClass() {
+        TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE, 
TaskTypeEnum.FILE.getType()));
+        switch (requireNonNull(taskType)) {
+            case FILE:
+                return DEFAULT_FILE_TASK;
+            case KAFKA:
+                return DEFAULT_KAFKA_TASK;
+            case PULSAR:
+                return DEFAULT_PULSAR_TASK;
+            case POSTGRES:
+                return DEFAULT_POSTGRESQL_TASK;
+            case ORACLE:
+                return DEFAULT_ORACLE_TASK;
+            case SQLSERVER:
+                return DEFAULT_SQLSERVER_TASK;
+            case MONGODB:
+                return DEFAULT_MONGODB_TASK;
+            case REDIS:
+                return DEFAULT_REDIS_TASK;
+            case MQTT:
+                return DEFAULT_MQTT_TASK;
+            case COS:
+                return DEFAULT_COS_TASK;
+            case MOCK:
+                return DEFAULT_MOCK_TASK;
+            default:
+                logger.error("invalid task type {}", taskType);
+                return null;
+        }
+    }
+
     public String getTaskId() {
         return get(TaskConstants.TASK_ID);
     }
@@ -81,14 +127,6 @@ public class TaskProfile extends AbstractConfiguration {
         return getBoolean(TASK_RETRY, false);
     }
 
-    public String getTaskClass() {
-        return get(TaskConstants.TASK_CLASS);
-    }
-
-    public void setTaskClass(String className) {
-        set(TaskConstants.TASK_CLASS, className);
-    }
-
     public String getInlongGroupId() {
         return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
     }
@@ -124,11 +162,9 @@ public class TaskProfile extends AbstractConfiguration {
         return GSON.toJson(getConfigStorage());
     }
 
-    public InstanceProfile createInstanceProfile(String instanceClass, String 
fileName, String cycleUnit,
-            String dataTime,
+    public InstanceProfile createInstanceProfile(String fileName, String 
cycleUnit, String dataTime,
             long fileUpdateTime) {
         InstanceProfile instanceProfile = 
InstanceProfile.parseJsonStr(toJsonStr());
-        instanceProfile.setInstanceClass(instanceClass);
         instanceProfile.setInstanceId(fileName);
         instanceProfile.setSourceDataTime(dataTime);
         Long sinkDataTime = 0L;
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index a4f9156577..201ec24713 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -22,11 +22,8 @@ package org.apache.inlong.agent.constant;
  */
 public class TaskConstants extends CommonConstants {
 
-    // job id
-    // public static final String JOB_ID = "job.id";
     public static final String TASK_ID = "task.id";
     public static final String INSTANCE_ID = "instance.id";
-    public static final String JOB_INSTANCE_ID = "job.instance.id";
     public static final String INSTANCE_CREATE_TIME = "instance.createTime";
     public static final String INSTANCE_MODIFY_TIME = "instance.modifyTime";
     public static final String TASK_GROUP_ID = "task.groupId";
@@ -36,9 +33,7 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_SOURCE = "task.source";
 
     public static final String TASK_CHANNEL = "task.channel";
-
-    public static final String TASK_CLASS = "task.taskClass";
-    public static final String INSTANCE_CLASS = "task.instance.class";
+    public static final String TASK_TYPE = "task.taskType";
     public static final String TASK_FILE_TRIGGER = "task.fileTask.trigger";
 
     // sink config
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index a9134de3a8..a30bf44ecc 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -52,22 +52,11 @@ import static 
org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_D
 public class TaskProfileDto {
 
     private static final Logger logger = 
LoggerFactory.getLogger(TaskProfileDto.class);
-
-    public static final String DEFAULT_FILE_TASK = 
"org.apache.inlong.agent.plugin.task.file.LogFileTask";
-    public static final String DEFAULT_KAFKA_TASK = 
"org.apache.inlong.agent.plugin.task.KafkaTask";
-    public static final String DEFAULT_PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
-    public static final String DEFAULT_MONGODB_TASK = 
"org.apache.inlong.agent.plugin.task.MongoDBTask";
-    public static final String DEFAULT_ORACLE_TASK = 
"org.apache.inlong.agent.plugin.task.OracleTask";
-    public static final String DEFAULT_REDIS_TASK = 
"org.apache.inlong.agent.plugin.task.RedisTask";
-    public static final String DEFAULT_POSTGRESQL_TASK = 
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
-    public static final String DEFAULT_MQTT_TASK = 
"org.apache.inlong.agent.plugin.task.MqttTask";
-    public static final String DEFAULT_SQLSERVER_TASK = 
"org.apache.inlong.agent.plugin.task.SQLServerTask";
     public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
     public static final String MANAGER_JOB = "MANAGER_JOB";
     public static final String DEFAULT_DATA_PROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
     public static final String PULSAR_SINK = 
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
     public static final String KAFKA_SINK = 
"org.apache.inlong.agent.plugin.sinks.KafkaSink";
-    public static final String DEFAULT_COS_TASK = 
"org.apache.inlong.agent.plugin.task.cos.COSTask";
     /**
      * file source
      */
@@ -470,6 +459,7 @@ public class TaskProfileDto {
 
         // common attribute
         task.setId(String.valueOf(dataConfig.getTaskId()));
+        task.setTaskType(dataConfig.getTaskType());
         task.setGroupId(dataConfig.getInlongGroupId());
         task.setStreamId(dataConfig.getInlongStreamId());
         task.setChannel(DEFAULT_CHANNEL);
@@ -517,7 +507,6 @@ public class TaskProfileDto {
                 profileDto.setTask(task);
                 break;
             case FILE:
-                task.setTaskClass(DEFAULT_FILE_TASK);
                 FileTask fileTask = getFileTask(dataConfig);
                 task.setCycleUnit(fileTask.getCycleUnit());
                 task.setFileTask(fileTask);
@@ -526,56 +515,48 @@ public class TaskProfileDto {
                 profileDto.setTask(task);
                 break;
             case KAFKA:
-                task.setTaskClass(DEFAULT_KAFKA_TASK);
                 KafkaTask kafkaTask = getKafkaTask(dataConfig);
                 task.setKafkaTask(kafkaTask);
                 task.setSource(KAFKA_SOURCE);
                 profileDto.setTask(task);
                 break;
             case PULSAR:
-                task.setTaskClass(DEFAULT_PULSAR_TASK);
                 PulsarTask pulsarTask = getPulsarTask(dataConfig);
                 task.setPulsarTask(pulsarTask);
                 task.setSource(PULSAR_SOURCE);
                 profileDto.setTask(task);
                 break;
             case POSTGRES:
-                task.setTaskClass(DEFAULT_POSTGRESQL_TASK);
                 PostgreSQLTask postgreSQLTask = getPostgresTask(dataConfig);
                 task.setPostgreSQLTask(postgreSQLTask);
                 task.setSource(POSTGRESQL_SOURCE);
                 profileDto.setTask(task);
                 break;
             case ORACLE:
-                task.setTaskClass(DEFAULT_ORACLE_TASK);
                 OracleTask oracleTask = getOracleTask(dataConfig);
                 task.setOracleTask(oracleTask);
                 task.setSource(ORACLE_SOURCE);
                 profileDto.setTask(task);
                 break;
             case SQLSERVER:
-                task.setTaskClass(DEFAULT_SQLSERVER_TASK);
                 SqlServerTask sqlserverTask = getSqlServerTask(dataConfig);
                 task.setSqlserverTask(sqlserverTask);
                 task.setSource(SQLSERVER_SOURCE);
                 profileDto.setTask(task);
                 break;
             case MONGODB:
-                task.setTaskClass(DEFAULT_MONGODB_TASK);
                 MongoTask mongoTask = getMongoTask(dataConfig);
                 task.setMongoTask(mongoTask);
                 task.setSource(MONGO_SOURCE);
                 profileDto.setTask(task);
                 break;
             case REDIS:
-                task.setTaskClass(DEFAULT_REDIS_TASK);
                 RedisTask redisTask = getRedisTask(dataConfig);
                 task.setRedisTask(redisTask);
                 task.setSource(REDIS_SOURCE);
                 profileDto.setTask(task);
                 break;
             case MQTT:
-                task.setTaskClass(DEFAULT_MQTT_TASK);
                 MqttTask mqttTask = getMqttTask(dataConfig);
                 task.setMqttTask(mqttTask);
                 task.setSource(MQTT_SOURCE);
@@ -585,7 +566,6 @@ public class TaskProfileDto {
                 profileDto.setTask(task);
                 break;
             case COS:
-                task.setTaskClass(DEFAULT_COS_TASK);
                 COSTask cosTask = getCOSTask(dataConfig);
                 task.setCycleUnit(cosTask.getCycleUnit());
                 task.setCosTask(cosTask);
@@ -619,6 +599,7 @@ public class TaskProfileDto {
         private String mqClusters;
         private String topicInfo;
         private String taskClass;
+        private Integer taskType;
         private String predefinedFields;
         private Integer state;
         private String cycleUnit;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 1b9dba8191..3396c3a591 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -453,7 +453,7 @@ public class InstanceManager extends AbstractDaemon {
                         instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
             }
         } catch (Throwable t) {
-            LOGGER.error("add instance error {}", t.getMessage());
+            LOGGER.error("add instance error.", t);
         }
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 38cb969376..3a4225abdf 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -19,7 +19,7 @@ package org.apache.inlong.agent.plugin.instance;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
 
 import java.io.IOException;
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index ecaa53196b..6774751a34 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -28,7 +28,7 @@ import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 import lombok.AllArgsConstructor;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileTask.java
deleted file mode 100644
index 697fad2861..0000000000
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileTask.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.task;
-
-import org.apache.inlong.agent.plugin.task.file.LogFileTask;
-
-/**
- * Directory trigger with format date.
- */
-public class FormatDateLogFileTask extends LogFileTask {
-
-}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
index 86b06079ab..79d5534e5c 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
@@ -35,7 +35,6 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_TOPIC;
 public class KafkaTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaTask.class);
-    public static final String DEFAULT_KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
     private boolean isAdded = false;
     private String topic;
     private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
@@ -58,8 +57,8 @@ public class KafkaTask extends AbstractTask {
             return list;
         }
         String dataTime = LocalDateTime.now().format(dateTimeFormatter);
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
-                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(topic, CycleUnitType.HOUR, dataTime,
+                AgentUtils.getCurrentTime());
         LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
         list.add(instanceProfile);
         this.isAdded = true;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
index 4a3ef07e26..bc1bcdc649 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
@@ -34,7 +34,6 @@ import java.util.List;
 public class MongoDBTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MongoDBTask.class);
-    public static final String DEFAULT_MONGODB_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
     private boolean isAdded = false;
     private String collection;
     private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
@@ -66,8 +65,8 @@ public class MongoDBTask extends AbstractTask {
             return list;
         }
         String dataTime = LocalDateTime.now().format(dateTimeFormatter);
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_MONGODB_INSTANCE, collection,
-                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(collection, CycleUnitType.HOUR, dataTime,
+                AgentUtils.getCurrentTime());
         LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
         list.add(instanceProfile);
         this.isAdded = true;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
index 1d7d9a3dc2..50de90a315 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
@@ -42,8 +42,6 @@ public class MqttTask extends AbstractTask {
 
     private AtomicBoolean isAdded = new AtomicBoolean(false);
 
-    public static final String DEFAULT_MQTT_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MqttInstance";
-
     private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
 
     @Override
@@ -93,8 +91,8 @@ public class MqttTask extends AbstractTask {
             return list;
         }
         String dataTime = LocalDateTime.now().format(dateTimeFormatter);
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_MQTT_INSTANCE, topic,
-                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(topic, CycleUnitType.HOUR, dataTime,
+                AgentUtils.getCurrentTime());
         LOGGER.info("taskProfile.createInstanceProfile(mqtt): {}", 
instanceProfile.toJsonStr());
         list.add(instanceProfile);
         isAdded.set(true);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
index 34b064d48d..4db4c0da21 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
@@ -32,13 +32,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.inlong.agent.constant.TaskConstants.*;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_DBNAME;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_TABLE_INCLUDE_LIST;
 
 public class OracleTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OracleTask.class);
 
-    public static final String DEFAULT_ORACLE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.OracleInstance";
     private AtomicBoolean isAdded = new AtomicBoolean(false);
     private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
 
@@ -103,9 +103,8 @@ public class OracleTask extends AbstractTask {
             return list;
         }
         String dataTime = LocalDateTime.now().format(dateTimeFormatter);
-        InstanceProfile instanceProfile =
-                taskProfile.createInstanceProfile(DEFAULT_ORACLE_INSTANCE, 
instanceId,
-                        CycleUnitType.HOUR, dataTime, 
AgentUtils.getCurrentTime());
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(instanceId, CycleUnitType.HOUR, dataTime,
+                AgentUtils.getCurrentTime());
         LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
         list.add(instanceProfile);
         this.isAdded.set(true);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
index 7cf382fbd6..db675c7e9f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
@@ -41,7 +41,6 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_USER;
 public class PostgreSQLTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PostgreSQLTask.class);
-    public static final String DEFAULT_KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
     private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
     private boolean isAdded = false;
     public static final int DEFAULT_INSTANCE_LIMIT = 1;
@@ -98,8 +97,8 @@ public class PostgreSQLTask extends AbstractTask {
             return list;
         }
         String dataTime = LocalDateTime.now().format(dateTimeFormatter);
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, instanceId,
-                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(instanceId, CycleUnitType.HOUR, dataTime,
+                AgentUtils.getCurrentTime());
         list.add(instanceProfile);
         this.isAdded = true;
         return list;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
index 4586da7bb1..89da06bd1b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
@@ -37,7 +37,6 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_TOPIC;
 public class PulsarTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarTask.class);
-    public static final String DEFAULT_PULSAR_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
     private boolean isAdded = false;
     private String tenant;
     private String namespace;
@@ -75,8 +74,8 @@ public class PulsarTask extends AbstractTask {
             return list;
         }
         String dataTime = LocalDateTime.now().format(dateTimeFormatter);
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_PULSAR_INSTANCE, instanceId,
-                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(instanceId, CycleUnitType.HOUR, dataTime,
+                AgentUtils.getCurrentTime());
         LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
         list.add(instanceProfile);
         this.isAdded = true;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
index b9f7449ecd..e36d5b02ee 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
@@ -34,7 +34,6 @@ import java.util.List;
 public class RedisTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisTask.class);
-    public static final String DEFAULT_REDIS_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.RedisInstance";
     private boolean isAdded = false;
     private String taskId;
 
@@ -68,8 +67,8 @@ public class RedisTask extends AbstractTask {
             return list;
         }
         String dataTime = LocalDateTime.now().format(dateTimeFormatter);
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_REDIS_INSTANCE, taskId,
-                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(taskId, CycleUnitType.HOUR, dataTime,
+                AgentUtils.getCurrentTime());
         LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
         list.add(instanceProfile);
         this.isAdded = true;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
index dd1446b301..f892c80e33 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
@@ -34,7 +34,6 @@ import java.util.List;
 public class SQLServerTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SQLServerTask.class);
-    public static final String DEFAULT_SQLSERVER_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.SQLServerInstance";
     private boolean isAdded = false;
 
     private String dbName;
@@ -102,8 +101,8 @@ public class SQLServerTask extends AbstractTask {
             return list;
         }
         String dataTime = LocalDateTime.now().format(dateTimeFormatter);
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_SQLSERVER_INSTANCE, instanceId,
-                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(instanceId, CycleUnitType.HOUR, dataTime,
+                AgentUtils.getCurrentTime());
         list.add(instanceProfile);
         this.isAdded = true;
         return list;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
new file mode 100644
index 0000000000..e234042524
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.logcollection;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.core.task.TaskAction;
+import org.apache.inlong.agent.plugin.task.AbstractTask;
+import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
+import org.apache.inlong.agent.state.State;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public abstract class LogAbstractTask extends AbstractTask {
+
+    private static final int INSTANCE_QUEUE_CAPACITY = 10;
+    public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogAbstractTask.class);
+    protected boolean retry;
+    protected BlockingQueue<InstanceProfile> instanceQueue;
+    private volatile boolean runAtLeastOneTime = false;
+    protected long startTime;
+    protected long endTime;
+    protected String timeOffset = "";
+    protected final Map<String/* dataTime */, Map<String/* fileName */, 
InstanceProfile>> eventMap =
+            new ConcurrentHashMap<>();
+
+    @Override
+    protected void initTask() {
+        instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
+    }
+
+    @Override
+    protected List<InstanceProfile> getNewInstanceList() {
+        if (retry) {
+            runForRetry();
+        } else {
+            runForNormal();
+        }
+        List<InstanceProfile> list = new ArrayList<>();
+        while (list.size() < INSTANCE_QUEUE_CAPACITY && 
!instanceQueue.isEmpty()) {
+            InstanceProfile profile = instanceQueue.poll();
+            if (profile != null) {
+                list.add(profile);
+            }
+        }
+        return list;
+    }
+
+    abstract protected void runForNormal();
+
+    abstract protected void dealWithEventMap();
+
+    abstract protected void scanExistingFile();
+
+    private void runForRetry() {
+        if (!runAtLeastOneTime) {
+            scanExistingFile();
+            runAtLeastOneTime = true;
+        }
+        dealWithEventMap();
+        if (allInstanceFinished()) {
+            LOGGER.info("retry task finished, send action to task manager, 
taskId {}", getTaskId());
+            TaskAction action = new 
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
+            taskManager.submitAction(action);
+            doChangeState(State.SUCCEEDED);
+        }
+    }
+
+    protected void dealWithEventMapWithCycle() {
+        long startScanTime = startTime;
+        long endScanTime = endTime;
+        List<String> dataTimeList = Scanner.getDataTimeList(startScanTime, 
endScanTime, taskProfile.getCycleUnit(),
+                timeOffset, retry);
+        if (dataTimeList.isEmpty()) {
+            LOGGER.error("getDataTimeList get empty list");
+            return;
+        }
+        Set<String> dealtDataTime = new HashSet<>();
+        // normal task first handle current data time
+        if (!retry) {
+            String current = dataTimeList.remove(dataTimeList.size() - 1);
+            dealtDataTime.add(current);
+            if (!dealEventMapByDataTime(current, true)) {
+                return;
+            }
+        }
+        dataTimeList.forEach(dataTime -> {
+            dealtDataTime.add(dataTime);
+            if (!dealEventMapByDataTime(dataTime, false)) {
+                return;
+            }
+        });
+        for (String dataTime : eventMap.keySet()) {
+            if (!dealtDataTime.contains(dataTime)) {
+                dealEventMapByDataTime(dataTime, false);
+            }
+        }
+    }
+
+    protected boolean dealEventMapByDataTime(String dataTime, boolean 
isCurrentDataTime) {
+        Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.get(dataTime);
+        if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) {
+            return true;
+        }
+        if (shouldStartNow(dataTime)) {
+            Set<InstanceProfile> sortedEvents = new 
TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId));
+            sortedEvents.addAll(sameDataTimeEvents.values());
+            for (InstanceProfile sortEvent : sortedEvents) {
+                String fileName = sortEvent.getInstanceId();
+                InstanceProfile profile = sameDataTimeEvents.get(fileName);
+                if (!isCurrentDataTime && isFull()) {
+                    return false;
+                }
+                if (!instanceQueue.offer(profile)) {
+                    return false;
+                }
+                sameDataTimeEvents.remove(fileName);
+            }
+        }
+        return true;
+    }
+
+    /*
+     * Calculate whether the event needs to be processed at the current time 
based on its data time, business cycle, and
+     * offset
+     */
+    private boolean shouldStartNow(String dataTime) {
+        String shouldStartTime = NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), timeOffset);
+        String currentTime = getCurrentTime();
+        return currentTime.compareTo(shouldStartTime) >= 0;
+    }
+
+    private String getCurrentTime() {
+        SimpleDateFormat dateFormat = new 
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
+        TimeZone timeZone = 
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
+        dateFormat.setTimeZone(timeZone);
+        return dateFormat.format(new Date(System.currentTimeMillis()));
+    }
+
+    protected void removeTimeoutEvent(Map<String, Map<String, 
InstanceProfile>> eventMap, boolean isRetry) {
+        if (isRetry) {
+            return;
+        }
+        for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
+            /* If the data time of the event is within 2 days before (after) 
the current time, it is valid */
+            String dataTime = entry.getKey();
+            if (!NewDateUtils.isValidCreationTime(dataTime, 
DAY_TIMEOUT_INTERVAL)) {
+                /* Remove it from memory map. */
+                eventMap.remove(dataTime);
+                LOGGER.warn("remove too old event from event map. dataTime 
{}", dataTime);
+            }
+        }
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
similarity index 54%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
index de2beb7cc0..86210157ed 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
@@ -15,18 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.task.cos;
+package org.apache.inlong.agent.plugin.task.logcollection.cos;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.task.TaskAction;
-import org.apache.inlong.agent.plugin.task.AbstractTask;
-import org.apache.inlong.agent.plugin.task.cos.FileScanner.BasicFileInfo;
+import org.apache.inlong.agent.plugin.task.logcollection.LogAbstractTask;
+import 
org.apache.inlong.agent.plugin.task.logcollection.cos.FileScanner.BasicFileInfo;
 import org.apache.inlong.agent.plugin.utils.cos.COSUtils;
-import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
-import org.apache.inlong.agent.plugin.utils.regex.Scanner;
-import org.apache.inlong.agent.state.State;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DateTransUtils;
 
@@ -36,45 +32,24 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Watch directory, if new valid files are created, create instance 
correspondingly.
  */
-public class COSTask extends AbstractTask {
+public class COSTask extends LogAbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(COSTask.class);
-    public static final String DEFAULT_COS_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.COSInstance";
-    private static final int INSTANCE_QUEUE_CAPACITY = 10;
-    private final Map<String/* dataTime */, Map<String/* fileName */, 
InstanceProfile>> eventMap =
-            new ConcurrentHashMap<>();
-    public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
-    private boolean retry;
-    private long startTime;
-    private long endTime;
     private String originPattern;
     private long lastScanTime = 0;
     public final long SCAN_INTERVAL = 1 * 60 * 1000;
-    private volatile boolean runAtLeastOneTime = false;
-    private BlockingQueue<InstanceProfile> instanceQueue;
     private COSClient cosClient;
     private String bucketName;
     private String secretId;
     private String secretKey;
     private String strRegion;
-    private String timeOffset = "";
 
     @Override
     protected int getInstanceLimit() {
@@ -83,8 +58,8 @@ public class COSTask extends AbstractTask {
 
     @Override
     protected void initTask() {
+        super.initTask();
         timeOffset = taskProfile.get(TaskConstants.TASK_COS_TIME_OFFSET, "");
-        instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
         retry = taskProfile.getBoolean(TaskConstants.COS_TASK_RETRY, false);
         originPattern = taskProfile.get(TaskConstants.COS_TASK_PATTERN);
         bucketName = taskProfile.get(TaskConstants.COS_TASK_BUCKET_NAME);
@@ -110,23 +85,6 @@ public class COSTask extends AbstractTask {
         return true;
     }
 
-    @Override
-    protected List<InstanceProfile> getNewInstanceList() {
-        if (retry) {
-            runForRetry();
-        } else {
-            runForNormal();
-        }
-        List<InstanceProfile> list = new ArrayList<>();
-        while (list.size() < INSTANCE_QUEUE_CAPACITY && 
!instanceQueue.isEmpty()) {
-            InstanceProfile profile = instanceQueue.poll();
-            if (profile != null) {
-                list.add(profile);
-            }
-        }
-        return list;
-    }
-
     @Override
     public boolean isProfileValid(TaskProfile profile) {
         if (!profile.allRequiredKeyExist()) {
@@ -173,21 +131,8 @@ public class COSTask extends AbstractTask {
         cosClient.shutdown();
     }
 
-    private void runForRetry() {
-        if (!runAtLeastOneTime) {
-            scanExistingFile();
-            runAtLeastOneTime = true;
-        }
-        dealWithEventMap();
-        if (allInstanceFinished()) {
-            LOGGER.info("retry task finished, send action to task manager, 
taskId {}", getTaskId());
-            TaskAction action = new 
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
-            taskManager.submitAction(action);
-            doChangeState(State.SUCCEEDED);
-        }
-    }
-
-    private void runForNormal() {
+    @Override
+    protected void runForNormal() {
         if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) {
             scanExistingFile();
             lastScanTime = AgentUtils.getCurrentTime();
@@ -195,7 +140,8 @@ public class COSTask extends AbstractTask {
         dealWithEventMap();
     }
 
-    private void scanExistingFile() {
+    @Override
+    protected void scanExistingFile() {
         List<BasicFileInfo> fileInfos = 
FileScanner.scanTaskBetweenTimes(cosClient, bucketName, originPattern,
                 taskProfile.getCycleUnit(), timeOffset, startTime, endTime, 
retry);
         LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), 
originPattern, fileInfos.size());
@@ -215,98 +161,12 @@ public class COSTask extends AbstractTask {
         return fileToProfile.get(fileName) != null;
     }
 
-    private void dealWithEventMap() {
+    @Override
+    protected void dealWithEventMap() {
         removeTimeoutEvent(eventMap, retry);
         dealWithEventMapWithCycle();
     }
 
-    private void dealWithEventMapWithCycle() {
-        long startScanTime = startTime;
-        long endScanTime = endTime;
-        List<String> dataTimeList = Scanner.getDataTimeList(startScanTime, 
endScanTime, taskProfile.getCycleUnit(),
-                timeOffset, retry);
-        if (dataTimeList.isEmpty()) {
-            LOGGER.error("get dataTimeList return empty list");
-            return;
-        }
-        Set<String> dealtDataTime = new HashSet<>();
-        // normal task first handle current data time
-        if (!retry) {
-            String current = dataTimeList.remove(dataTimeList.size() - 1);
-            dealtDataTime.add(current);
-            if (!dealEventMapByDataTime(current, true)) {
-                return;
-            }
-        }
-        dataTimeList.forEach(dataTime -> {
-            dealtDataTime.add(dataTime);
-            if (!dealEventMapByDataTime(dataTime, false)) {
-                return;
-            }
-        });
-        for (String dataTime : eventMap.keySet()) {
-            if (!dealtDataTime.contains(dataTime)) {
-                dealEventMapByDataTime(dataTime, false);
-            }
-        }
-    }
-
-    private boolean dealEventMapByDataTime(String dataTime, boolean 
isCurrentDataTime) {
-        Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.get(dataTime);
-        if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) {
-            return true;
-        }
-        if (shouldStartNow(dataTime)) {
-            Set<InstanceProfile> sortedEvents = new 
TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId));
-            sortedEvents.addAll(sameDataTimeEvents.values());
-            for (InstanceProfile sortEvent : sortedEvents) {
-                String fileName = sortEvent.getInstanceId();
-                InstanceProfile profile = sameDataTimeEvents.get(fileName);
-                if (!isCurrentDataTime && isFull()) {
-                    return false;
-                }
-                if (!instanceQueue.offer(profile)) {
-                    return false;
-                }
-                sameDataTimeEvents.remove(fileName);
-            }
-        }
-        return true;
-    }
-
-    /*
-     * Calculate whether the event needs to be processed at the current time 
based on its data time, business cycle, and
-     * offset
-     */
-    private boolean shouldStartNow(String dataTime) {
-        String shouldStartTime =
-                NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), timeOffset);
-        String currentTime = getCurrentTime();
-        return currentTime.compareTo(shouldStartTime) >= 0;
-    }
-
-    private void removeTimeoutEvent(Map<String, Map<String, InstanceProfile>> 
eventMap, boolean isRetry) {
-        if (isRetry) {
-            return;
-        }
-        for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
-            /* If the data time of the event is within 2 days before (after) 
the current time, it is valid */
-            String dataTime = entry.getKey();
-            if (!NewDateUtils.isValidCreationTime(dataTime, 
DAY_TIMEOUT_INTERVAL)) {
-                /* Remove it from memory map. */
-                eventMap.remove(dataTime);
-                LOGGER.warn("remove too old event from event map. dataTime 
{}", dataTime);
-            }
-        }
-    }
-
-    private String getCurrentTime() {
-        SimpleDateFormat dateFormat = new 
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
-        TimeZone timeZone = 
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
-        dateFormat.setTimeZone(timeZone);
-        return dateFormat.format(new Date(System.currentTimeMillis()));
-    }
-
     private void addToEvenMap(String fileName, String dataTime) {
         if (isInEventMap(fileName, dataTime)) {
             LOGGER.info("add to evenMap isInEventMap returns true skip taskId 
{} dataTime {} fileName {}",
@@ -328,8 +188,8 @@ public class COSTask extends AbstractTask {
             return;
         }
         String cycleUnit = taskProfile.getCycleUnit();
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_COS_INSTANCE,
-                fileName, cycleUnit, dataTime, fileUpdateTime);
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(fileName, cycleUnit, dataTime,
+                fileUpdateTime);
         sameDataTimeEvents.put(fileName, instanceProfile);
         LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", 
taskProfile.getTaskId(), dataTime, fileName);
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/FileScanner.java
similarity index 99%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/FileScanner.java
index 1019d34135..e3185722dd 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/FileScanner.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.task.cos;
+package org.apache.inlong.agent.plugin.task.logcollection.cos;
 
 import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
 import org.apache.inlong.agent.plugin.utils.regex.Scanner;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileDataUtils.java
similarity index 95%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileDataUtils.java
index d65960ab17..5848ad563d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileDataUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
 
 import java.io.IOException;
 import java.nio.file.Files;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileScanner.java
similarity index 98%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileScanner.java
index 7fa759a9d2..70003d65ab 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileScanner.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
 
 import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
 import org.apache.inlong.agent.plugin.utils.regex.Scanner;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
similarity index 73%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
index 4eb3b69525..ef50d1fc62 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.task.TaskAction;
-import org.apache.inlong.agent.plugin.task.AbstractTask;
-import org.apache.inlong.agent.plugin.task.file.FileScanner.BasicFileInfo;
+import org.apache.inlong.agent.plugin.task.logcollection.LogAbstractTask;
+import 
org.apache.inlong.agent.plugin.task.logcollection.local.FileScanner.BasicFileInfo;
 import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
 import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression;
 import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
-import org.apache.inlong.agent.plugin.utils.regex.Scanner;
-import org.apache.inlong.agent.state.State;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.agent.utils.file.FileUtils;
@@ -47,19 +44,12 @@ import java.nio.file.WatchEvent.Kind;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
 import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -67,28 +57,17 @@ import java.util.stream.Stream;
 /**
  * Watch directory, if new valid files are created, create instance 
correspondingly.
  */
-public class LogFileTask extends AbstractTask {
+public class FileTask extends LogAbstractTask {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogFileTask.class);
-    public static final String DEFAULT_FILE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.FileInstance";
-    private static final int INSTANCE_QUEUE_CAPACITY = 10;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileTask.class);
     private final Map<String, WatchEntity> watchers = new 
ConcurrentHashMap<>();
     private final Set<String> watchFailedDirs = new HashSet<>();
-    private final Map<String/* dataTime */, Map<String/* fileName */, 
InstanceProfile>> eventMap =
-            new ConcurrentHashMap<>();
-    public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
     public static final int CORE_THREAD_MAX_GAP_TIME_MS = 60 * 1000;
-    private boolean retry;
-    private volatile long startTime;
-    private volatile long endTime;
     private boolean realTime = false;
     private Set<String> originPatterns;
     private long lastScanTime = 0;
     public final long SCAN_INTERVAL = 1 * 60 * 1000;
-    private volatile boolean runAtLeastOneTime = false;
     private volatile long coreThreadUpdateTime = 0;
-    private String timeOffset = "";
-    private BlockingQueue<InstanceProfile> instanceQueue;
 
     @Override
     protected int getInstanceLimit() {
@@ -97,8 +76,8 @@ public class LogFileTask extends AbstractTask {
 
     @Override
     protected void initTask() {
+        super.initTask();
         timeOffset = taskProfile.get(TaskConstants.TASK_FILE_TIME_OFFSET, "");
-        instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
         retry = taskProfile.isRetry();
         originPatterns = 
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
                 .collect(Collectors.toSet());
@@ -125,23 +104,6 @@ public class LogFileTask extends AbstractTask {
         return true;
     }
 
-    @Override
-    protected List<InstanceProfile> getNewInstanceList() {
-        if (retry) {
-            runForRetry();
-        } else {
-            runForNormal();
-        }
-        List<InstanceProfile> list = new ArrayList<>();
-        while (list.size() < INSTANCE_QUEUE_CAPACITY && 
!instanceQueue.isEmpty()) {
-            InstanceProfile profile = instanceQueue.poll();
-            if (profile != null) {
-                list.add(profile);
-            }
-        }
-        return list;
-    }
-
     @Override
     public boolean isProfileValid(TaskProfile profile) {
         if (!profile.allRequiredKeyExist()) {
@@ -244,21 +206,8 @@ public class LogFileTask extends AbstractTask {
         });
     }
 
-    private void runForRetry() {
-        if (!runAtLeastOneTime) {
-            scanExistingFile();
-            runAtLeastOneTime = true;
-        }
-        dealWithEventMap();
-        if (allInstanceFinished()) {
-            LOGGER.info("retry task finished, send action to task manager, 
taskId {}", getTaskId());
-            TaskAction action = new 
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
-            taskManager.submitAction(action);
-            doChangeState(State.SUCCEEDED);
-        }
-    }
-
-    private void runForNormal() {
+    @Override
+    protected void runForNormal() {
         if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) {
             scanExistingFile();
             lastScanTime = AgentUtils.getCurrentTime();
@@ -267,7 +216,8 @@ public class LogFileTask extends AbstractTask {
         dealWithEventMap();
     }
 
-    private void scanExistingFile() {
+    @Override
+    protected void scanExistingFile() {
         originPatterns.forEach((originPattern) -> {
             List<BasicFileInfo> fileInfos = 
scanExistingFileByPattern(originPattern);
             LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), 
originPattern, fileInfos.size());
@@ -316,7 +266,8 @@ public class LogFileTask extends AbstractTask {
         }
     }
 
-    private void dealWithEventMap() {
+    @Override
+    protected void dealWithEventMap() {
         removeTimeoutEvent(eventMap, retry);
         if (realTime) {
             dealWithEventMapRealTime();
@@ -325,99 +276,12 @@ public class LogFileTask extends AbstractTask {
         }
     }
 
-    private void dealWithEventMapWithCycle() {
-        long startScanTime = startTime;
-        long endScanTime = endTime;
-        List<String> dataTimeList = Scanner.getDataTimeList(startScanTime, 
endScanTime, taskProfile.getCycleUnit(),
-                timeOffset, retry);
-        if (dataTimeList.isEmpty()) {
-            LOGGER.error("getDataTimeList get empty list");
-            return;
-        }
-        Set<String> dealtDataTime = new HashSet<>();
-        // normal task first handle current data time
-        if (!retry) {
-            String current = dataTimeList.remove(dataTimeList.size() - 1);
-            dealtDataTime.add(current);
-            if (!dealEventMapByDataTime(current, true)) {
-                return;
-            }
-        }
-        dataTimeList.forEach(dataTime -> {
-            dealtDataTime.add(dataTime);
-            if (!dealEventMapByDataTime(dataTime, false)) {
-                return;
-            }
-        });
-        for (String dataTime : eventMap.keySet()) {
-            if (!dealtDataTime.contains(dataTime)) {
-                dealEventMapByDataTime(dataTime, false);
-            }
-        }
-    }
-
     private void dealWithEventMapRealTime() {
         for (String dataTime : eventMap.keySet()) {
             dealEventMapByDataTime(dataTime, true);
         }
     }
 
-    private boolean dealEventMapByDataTime(String dataTime, boolean 
isCurrentDataTime) {
-        Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.get(dataTime);
-        if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) {
-            return true;
-        }
-        if (realTime || shouldStartNow(dataTime)) {
-            Set<InstanceProfile> sortedEvents = new 
TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId));
-            sortedEvents.addAll(sameDataTimeEvents.values());
-            for (InstanceProfile sortEvent : sortedEvents) {
-                String fileName = sortEvent.getInstanceId();
-                InstanceProfile profile = sameDataTimeEvents.get(fileName);
-                if (!isCurrentDataTime && isFull()) {
-                    return false;
-                }
-                if (!instanceQueue.offer(profile)) {
-                    return false;
-                }
-                sameDataTimeEvents.remove(fileName);
-            }
-        }
-        return true;
-    }
-
-    /*
-     * Calculate whether the event needs to be processed at the current time 
based on its data time, business cycle, and
-     * offset
-     */
-    private boolean shouldStartNow(String dataTime) {
-        String shouldStartTime =
-                NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), timeOffset);
-        String currentTime = getCurrentTime();
-        return currentTime.compareTo(shouldStartTime) >= 0;
-    }
-
-    private void removeTimeoutEvent(Map<String, Map<String, InstanceProfile>> 
eventMap, boolean isRetry) {
-        if (isRetry || realTime) {
-            return;
-        }
-        for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
-            /* If the data time of the event is within 2 days before (after) 
the current time, it is valid */
-            String dataTime = entry.getKey();
-            if (!NewDateUtils.isValidCreationTime(dataTime, 
DAY_TIMEOUT_INTERVAL)) {
-                /* Remove it from memory map. */
-                eventMap.remove(dataTime);
-                LOGGER.warn("remove too old event from event map. dataTime 
{}", dataTime);
-            }
-        }
-    }
-
-    private String getCurrentTime() {
-        SimpleDateFormat dateFormat = new 
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
-        TimeZone timeZone = 
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
-        dateFormat.setTimeZone(timeZone);
-        return dateFormat.format(new Date(System.currentTimeMillis()));
-    }
-
     public synchronized void dealWithWatchEntity(String originPattern) {
         WatchEntity entity = watchers.get(originPattern);
         if (entity == null) {
@@ -520,8 +384,8 @@ public class LogFileTask extends AbstractTask {
         } else {
             cycleUnit = taskProfile.getCycleUnit();
         }
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE,
-                fileName, cycleUnit, dataTime, fileUpdateTime);
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(fileName, cycleUnit, dataTime,
+                fileUpdateTime);
         sameDataTimeEvents.put(fileName, instanceProfile);
         LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", 
taskProfile.getTaskId(), dataTime, fileName);
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTimeComparator.java
similarity index 94%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTimeComparator.java
index 1fbbde3b56..aef1c9fd9e 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTimeComparator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
 
 import java.io.File;
 import java.util.Comparator;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/Files.java
similarity index 97%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/Files.java
index 3c3bca9d59..12b085226d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/Files.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
 
 import org.apache.inlong.agent.utils.file.FileFinder;
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
similarity index 99%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
index b33eaff818..47d052e81b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.task.file;
+package org.apache.inlong.agent.plugin.task.logcollection.local;
 
 import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
 import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
index 9901f29023..4340bd6176 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
@@ -38,6 +38,12 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +53,9 @@ import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.await;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(InstanceProfile.class)
+@PowerMockIgnore({"javax.management.*"})
 public class TestInstanceManager {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TestInstanceManager.class);
@@ -56,6 +65,15 @@ public class TestInstanceManager {
 
     @BeforeClass
     public static void setup() {
+        PowerMockito.mockStatic(InstanceProfile.class);
+        Mockito.when(InstanceProfile.getInstanceClassByTaskType(Mockito.any()))
+                .thenAnswer(mock -> MockInstance.class.getCanonicalName());
+        
Mockito.when(InstanceProfile.parseJsonStr(Mockito.anyString())).thenAnswer(mock 
-> {
+            String jsonStr = mock.getArgument(0);
+            InstanceProfile conf = new InstanceProfile();
+            conf.loadJsonStrResource(jsonStr);
+            return conf;
+        });
         helper = new 
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt";
         Store basicInstanceStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
@@ -79,9 +97,8 @@ public class TestInstanceManager {
     public void testInstanceManager() {
         InstanceStore instanceStore = manager.getInstanceStore();
         for (int i = 1; i <= 10; i++) {
-            InstanceProfile profile = 
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
-                    String.valueOf(i), taskProfile.getCycleUnit(), 
"2023092710",
-                    AgentUtils.getCurrentTime());
+            InstanceProfile profile = 
taskProfile.createInstanceProfile(String.valueOf(i), taskProfile.getCycleUnit(),
+                    "2023092710", AgentUtils.getCurrentTime());
             instanceStore.storeInstance(profile);
         }
         manager.start();
@@ -91,9 +108,8 @@ public class TestInstanceManager {
             
Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() == 
InstanceStateEnum.DEFAULT);
         }
         long timeBefore = AgentUtils.getCurrentTime();
-        InstanceProfile profile = 
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
-                helper.getTestRootDir() + "/2023092710_1.txt", 
taskProfile.getCycleUnit(), "2023092710",
-                AgentUtils.getCurrentTime());
+        InstanceProfile profile = 
taskProfile.createInstanceProfile(helper.getTestRootDir() + "/2023092710_1.txt",
+                taskProfile.getCycleUnit(), "2023092710", 
AgentUtils.getCurrentTime());
         String sinkDataTime = String.valueOf(profile.getSinkDataTime());
         try {
             String add2TimeZone = String.valueOf(
@@ -122,9 +138,8 @@ public class TestInstanceManager {
         Assert.assertTrue(manager.shouldAddAgain(profile.getInstanceId(), 
AgentUtils.getCurrentTime()));
 
         // test continue
-        profile = 
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
-                helper.getTestRootDir() + "/2023092710_1.txt", 
taskProfile.getCycleUnit(), "2023092710",
-                AgentUtils.getCurrentTime());
+        profile = taskProfile.createInstanceProfile(helper.getTestRootDir() + 
"/2023092710_1.txt",
+                taskProfile.getCycleUnit(), "2023092710", 
AgentUtils.getCurrentTime());
         action = new InstanceAction();
         action.setActionType(ActionType.ADD);
         action.setProfile(profile);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
index b8703c7056..56f46d74a0 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -50,8 +50,8 @@ public class KafkaSinkTest {
         TaskProfile taskProfile =
                 helper.getFileTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, "D",
                         "GMT+8:00", null);
-        profile = taskProfile.createInstanceProfile("", fileName,
-                taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
+        profile = taskProfile.createInstanceProfile(fileName, 
taskProfile.getCycleUnit(), "20230927",
+                AgentUtils.getCurrentTime());
         kafkaSink = new MockSink();
         kafkaSink.init(profile);
     }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
index 43e3115dcb..c8cf365850 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -50,8 +50,8 @@ public class PulsarSinkTest {
         TaskProfile taskProfile =
                 helper.getFileTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, "D",
                         "GMT+8:00", null);
-        profile = taskProfile.createInstanceProfile("", fileName,
-                taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
+        profile = taskProfile.createInstanceProfile(fileName, 
taskProfile.getCycleUnit(), "20230927",
+                AgentUtils.getCurrentTime());
         pulsarSink = new MockSink();
         pulsarSink.init(profile);
     }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 1c9e623b9b..9655e757ef 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -24,7 +24,7 @@ import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.message.file.OffsetAckInfo;
 import org.apache.inlong.agent.message.file.SenderMessage;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
@@ -73,8 +73,8 @@ public class TestSenderManager {
         TaskProfile taskProfile =
                 helper.getFileTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, "D",
                         "GMT+8:00", null);
-        profile = taskProfile.createInstanceProfile("", fileName,
-                taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
+        profile = taskProfile.createInstanceProfile(fileName, 
taskProfile.getCycleUnit(), "20230927",
+                AgentUtils.getCurrentTime());
     }
 
     @AfterClass
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index d7a93a0df8..2bbe41859a 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -27,7 +27,7 @@ import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.TaskManager;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
 import org.apache.inlong.agent.store.Store;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
@@ -87,8 +87,8 @@ public class TestLogFileSource {
             TaskProfile taskProfile = helper.getFileTaskProfile(taskId, 
pattern, dataContentStyle, retry, "", "",
                     TaskStateEnum.RUNNING, "D",
                     "GMT+8:00", Arrays.asList("ok"));
-            InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile("",
-                    fileName, taskProfile.getCycleUnit(), "20230928", 
AgentUtils.getCurrentTime());
+            InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(fileName, taskProfile.getCycleUnit(),
+                    "20230928", AgentUtils.getCurrentTime());
             instanceProfile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(instanceProfile.getInstanceId()));
             LogFileSource source = new LogFileSource();
             Whitebox.setInternalState(source, "BATCH_READ_LINE_COUNT", 1);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
index 14518078f2..8fb551a083 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
@@ -123,8 +123,8 @@ public class TestRedisSource {
 
         TaskProfile taskProfile = helper.getFileTaskProfile(1, "", "csv", 
false, "", "", TaskStateEnum.RUNNING, "D",
                 "GMT+8:00", null);
-        profile = taskProfile.createInstanceProfile("",
-                "", taskProfile.getCycleUnit(), "20240725", 
AgentUtils.getCurrentTime());
+        profile = taskProfile.createInstanceProfile("", 
taskProfile.getCycleUnit(), "20240725",
+                AgentUtils.getCurrentTime());
         profile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
         profile.set(CommonConstants.PROXY_INLONG_STREAM_ID, streamId);
         profile.set(TaskConstants.TASK_REDIS_AUTHUSER, username);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 410acd8e77..960f2a5144 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -138,8 +138,8 @@ public class TestSQLServerSource {
 
         TaskProfile taskProfile = helper.getFileTaskProfile(1, "", "csv", 
false, "", "", TaskStateEnum.RUNNING, "D",
                 "GMT+8:00", null);
-        instanceProfile = taskProfile.createInstanceProfile("",
-                "", taskProfile.getCycleUnit(), "20240725", 
AgentUtils.getCurrentTime());
+        instanceProfile = taskProfile.createInstanceProfile("", 
taskProfile.getCycleUnit(), "20240725",
+                AgentUtils.getCurrentTime());
         instanceProfile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
         instanceProfile.set(CommonConstants.PROXY_INLONG_STREAM_ID, streamId);
         instanceProfile.set(TaskConstants.TASK_SQLSERVER_USER, username);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
index f66e4845b1..4d3bc0508d 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
@@ -22,7 +22,7 @@ import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.core.task.TaskManager;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.task.cos.COSTask;
+import org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask;
 import org.apache.inlong.agent.plugin.utils.cos.COSUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
 
@@ -204,20 +204,20 @@ public class TestCOSTask {
         TaskProfile taskProfile = helper.getCOSTaskProfile(taskId, pattern, 
"csv", true, startTime, endTime,
                 TaskStateEnum.RUNNING,
                 cycle, "GMT+8:00", null);
-        COSTask task = null;
+        COSTask fileTask = null;
         final List<String> fileName = new ArrayList();
         final List<String> dataTime = new ArrayList();
         try {
-            task = PowerMockito.spy(new COSTask());
+            fileTask = PowerMockito.spy(new COSTask());
             PowerMockito.doAnswer(invocation -> {
                 fileName.add(invocation.getArgument(0));
                 dataTime.add(invocation.getArgument(1));
                 return null;
-            }).when(task, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString());
-            Assert.assertTrue(task.isProfileValid(taskProfile));
+            }).when(fileTask, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString());
+            Assert.assertTrue(fileTask.isProfileValid(taskProfile));
             manager.getTaskStore().storeTask(taskProfile);
-            task.init(manager, taskProfile, manager.getInstanceBasicStore());
-            EXECUTOR_SERVICE.submit(task);
+            fileTask.init(manager, taskProfile, 
manager.getInstanceBasicStore());
+            EXECUTOR_SERVICE.submit(fileTask);
         } catch (Exception e) {
             LOGGER.error("source init error", e);
             Assert.assertTrue("source init error", false);
@@ -228,6 +228,6 @@ public class TestCOSTask {
             Assert.assertEquals(0, fileName.get(i).compareTo(srcKeys.get(i)));
             Assert.assertEquals(0, 
dataTime.get(i).compareTo(srcDataTimes.get(i)));
         }
-        task.destroy();
+        fileTask.destroy();
     }
 }
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index 7d1962c314..d2c8e8a342 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -22,7 +22,7 @@ import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.core.task.TaskManager;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.task.file.LogFileTask;
+import org.apache.inlong.agent.plugin.task.logcollection.local.FileTask;
 import org.apache.inlong.common.enums.TaskStateEnum;
 
 import org.junit.AfterClass;
@@ -49,7 +49,7 @@ import java.util.concurrent.TimeUnit;
 import static org.awaitility.Awaitility.await;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(LogFileTask.class)
+@PrepareForTest(FileTask.class)
 @PowerMockIgnore({"javax.management.*"})
 public class TestLogFileTask {
 
@@ -103,11 +103,11 @@ public class TestLogFileTask {
         TaskProfile taskProfile =
                 helper.getFileTaskProfile(taskId, pattern, "csv", true, 
startTime, endTime, TaskStateEnum.RUNNING,
                         cycle, "GMT+8:00", null);
-        LogFileTask dayTask = null;
+        FileTask dayTask = null;
         final List<String> fileName = new ArrayList();
         final List<String> dataTime = new ArrayList();
         try {
-            dayTask = PowerMockito.spy(new LogFileTask());
+            dayTask = PowerMockito.spy(new FileTask());
             PowerMockito.doAnswer(invocation -> {
                 fileName.add(invocation.getArgument(0));
                 dataTime.add(invocation.getArgument(1));
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
index 4cab9fa8f0..c6ba4a449b 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.core.task.TaskManager;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.store.TaskStore;
 import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -34,6 +35,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;
 import static org.awaitility.Awaitility.await;
 
 public class TestTaskManager {
@@ -61,7 +63,7 @@ public class TestTaskManager {
                 TaskProfile taskProfile =
                         helper.getFileTaskProfile(i, pattern, "csv", false, 
"", "", TaskStateEnum.RUNNING,
                                 "D", "GMT+8:00", null);
-                taskProfile.setTaskClass(MockTask.class.getCanonicalName());
+                taskProfile.setInt(TASK_TYPE, TaskTypeEnum.MOCK.getType());
                 taskStore.storeTask(taskProfile);
             }
             manager.start();
@@ -78,7 +80,7 @@ public class TestTaskManager {
         TaskProfile taskProfile1 = helper.getFileTaskProfile(100, pattern, 
"csv", false, "", "", TaskStateEnum.RUNNING,
                 "D", "GMT+8:00", null);
         String taskId1 = taskProfile1.getTaskId();
-        taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
+        taskProfile1.setInt(TASK_TYPE, TaskTypeEnum.MOCK.getType());
         List<TaskProfile> taskProfiles1 = new ArrayList<>();
         taskProfiles1.add(taskProfile1);
         // test add
@@ -102,7 +104,7 @@ public class TestTaskManager {
         // test delete
         TaskProfile taskProfile2 = helper.getFileTaskProfile(200, pattern, 
"csv", false, "", "", TaskStateEnum.RUNNING,
                 "D", "GMT+8:00", null);
-        taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
+        taskProfile2.setInt(TASK_TYPE, TaskTypeEnum.MOCK.getType());
         List<TaskProfile> taskProfiles2 = new ArrayList<>();
         taskProfiles2.add(taskProfile2);
         manager.submitTaskProfiles(taskProfiles2);


Reply via email to