This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b91a38004f [INLONG-11624][Agent] Add SQL data source (#11625)
b91a38004f is described below

commit b91a38004fe1282ba897436e5696c41aa02621e7
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Dec 30 22:08:00 2024 +0800

    [INLONG-11624][Agent] Add SQL data source (#11625)
    
    * [INLONG-11624][Agent] Add SQL data source
    
    * [INLONG-11624][Agent] Add code comments
    
    * Update 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLInstance.java
    
    Co-authored-by: AloysZhang <[email protected]>
    
    * Update 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
    
    Co-authored-by: AloysZhang <[email protected]>
    
    * Update 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
    
    Co-authored-by: AloysZhang <[email protected]>
    
    * Update 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/FileTask.java
    
    Co-authored-by: AloysZhang <[email protected]>
    
    * Update 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/FileTask.java
    
    Co-authored-by: AloysZhang <[email protected]>
    
    * [INLONG-11624][Agent] Modify code based on comments
    
    * Update 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
    
    Co-authored-by: fuweng11 <[email protected]>
    
    * [INLONG-11624][Agent] Modify thread name
    
    * [INLONG-11624][Agent] Rename variables to prevent misunderstandings
    
    ---------
    
    Co-authored-by: AloysZhang <[email protected]>
    Co-authored-by: fuweng11 <[email protected]>
---
 .../apache/inlong/agent/conf/InstanceProfile.java  |  94 +++-
 .../org/apache/inlong/agent/conf/TaskProfile.java  |  49 ++-
 .../inlong/agent/constant/TaskConstants.java       |  19 +-
 .../java/org/apache/inlong/agent/pojo/SQLTask.java |  62 +++
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  92 ++--
 .../inlong/agent/plugin/instance/SQLInstance.java  |  35 ++
 .../inlong/agent/plugin/sources/COSSource.java     |   2 +-
 .../inlong/agent/plugin/sources/SQLSource.java     | 291 ++++++++++++
 .../plugin/task/logcollection/LogAbstractTask.java |  42 +-
 .../agent/plugin/task/logcollection/SQLTask.java   | 142 ++++++
 .../plugin/task/logcollection/cos/COSTask.java     |  50 +--
 .../plugin/task/logcollection/local/FileTask.java  |  68 +--
 .../task/logcollection/local/WatchEntity.java      |   7 +-
 .../inlong/agent/plugin/utils/regex/DateUtils.java | 488 ++++++++++++++-------
 .../agent/plugin/utils/regex/NewDateUtils.java     | 466 --------------------
 .../inlong/agent/plugin/utils/regex/Scanner.java   |   6 +-
 .../inlong/agent/plugin/AgentBaseTestsHelper.java  |  33 ++
 .../inlong/agent/plugin/task/TestCOSTask.java      |  31 +-
 .../{TestLogFileTask.java => TestFileTask.java}    |  34 +-
 .../inlong/agent/plugin/utils/TestUtils.java       |   4 +-
 20 files changed, 1166 insertions(+), 849 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index c3b20a0581..2c5eaeb371 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -48,16 +48,30 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;
  */
 public class InstanceProfile extends AbstractConfiguration implements 
Comparable<InstanceProfile> {
 
-    public static final String DEFAULT_FILE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.FileInstance";
-    public static final String DEFAULT_COS_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.COSInstance";
-    public static final String DEFAULT_KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
-    public static final String DEFAULT_MONGODB_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
-    public static final String DEFAULT_MQTT_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MqttInstance";
-    public static final String DEFAULT_ORACLE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.OracleInstance";
-    public static final String DEFAULT_POSTGRES_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.PostgreSQLInstance";
-    public static final String DEFAULT_PULSAR_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
-    public static final String DEFAULT_REDIS_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.RedisInstance";
-    public static final String DEFAULT_SQLSERVER_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.SQLServerInstance";
+    public static final String FILE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.FileInstance";
+    public static final String COS_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.COSInstance";
+    public static final String KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
+    public static final String MONGODB_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
+    public static final String MQTT_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MqttInstance";
+    public static final String ORACLE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.OracleInstance";
+    public static final String POSTGRES_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.PostgreSQLInstance";
+    public static final String PULSAR_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
+    public static final String REDIS_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.RedisInstance";
+    public static final String SQLSERVER_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.SQLServerInstance";
+    public static final String SQL_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.SQLInstance";
+
+    public static final String FILE_SOURCE = 
"org.apache.inlong.agent.plugin.sources.LogFileSource";
+    public static final String BINLOG_SOURCE = 
"org.apache.inlong.agent.plugin.sources.BinlogSource";
+    public static final String KAFKA_SOURCE = 
"org.apache.inlong.agent.plugin.sources.KafkaSource";
+    public static final String PULSAR_SOURCE = 
"org.apache.inlong.agent.plugin.sources.PulsarSource";
+    public static final String POSTGRESQL_SOURCE = 
"org.apache.inlong.agent.plugin.sources.PostgreSQLSource";
+    public static final String MONGO_SOURCE = 
"org.apache.inlong.agent.plugin.sources.MongoDBSource";
+    public static final String ORACLE_SOURCE = 
"org.apache.inlong.agent.plugin.sources.OracleSource";
+    public static final String REDIS_SOURCE = 
"org.apache.inlong.agent.plugin.sources.RedisSource";
+    public static final String MQTT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.MqttSource";
+    public static final String SQLSERVER_SOURCE = 
"org.apache.inlong.agent.plugin.sources.SQLServerSource";
+    public static final String COS_SOURCE = 
"org.apache.inlong.agent.plugin.sources.COSSource";
+    public static final String SQL_SOURCE = 
"org.apache.inlong.agent.plugin.sources.SQLSource";
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceProfile.class);
     private static final Gson GSON = new Gson();
@@ -88,25 +102,27 @@ public class InstanceProfile extends AbstractConfiguration 
implements Comparable
         }
         switch (taskType) {
             case FILE:
-                return DEFAULT_FILE_INSTANCE;
+                return FILE_INSTANCE;
             case KAFKA:
-                return DEFAULT_KAFKA_INSTANCE;
+                return KAFKA_INSTANCE;
             case PULSAR:
-                return DEFAULT_PULSAR_INSTANCE;
+                return PULSAR_INSTANCE;
             case POSTGRES:
-                return DEFAULT_POSTGRES_INSTANCE;
+                return POSTGRES_INSTANCE;
             case ORACLE:
-                return DEFAULT_ORACLE_INSTANCE;
+                return ORACLE_INSTANCE;
             case SQLSERVER:
-                return DEFAULT_SQLSERVER_INSTANCE;
+                return SQLSERVER_INSTANCE;
             case MONGODB:
-                return DEFAULT_MONGODB_INSTANCE;
+                return MONGODB_INSTANCE;
             case REDIS:
-                return DEFAULT_REDIS_INSTANCE;
+                return REDIS_INSTANCE;
             case MQTT:
-                return DEFAULT_MQTT_INSTANCE;
+                return MQTT_INSTANCE;
             case COS:
-                return DEFAULT_COS_INSTANCE;
+                return COS_INSTANCE;
+            case SQL:
+                return SQL_INSTANCE;
             default:
                 LOGGER.error("invalid task type {}", taskType);
                 return null;
@@ -126,7 +142,43 @@ public class InstanceProfile extends AbstractConfiguration 
implements Comparable
     }
 
     public String getSourceClass() {
-        return get(TaskConstants.TASK_SOURCE);
+        TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE, 
TaskTypeEnum.FILE.getType()));
+        return getSourceClassByTaskType(taskType);
+    }
+
+    public static String getSourceClassByTaskType(TaskTypeEnum taskType) {
+        if (taskType == null) {
+            return null;
+        }
+        switch (taskType) {
+            case BINLOG:
+                return BINLOG_SOURCE;
+            case FILE:
+                return FILE_SOURCE;
+            case KAFKA:
+                return KAFKA_SOURCE;
+            case PULSAR:
+                return PULSAR_SOURCE;
+            case POSTGRES:
+                return POSTGRESQL_SOURCE;
+            case ORACLE:
+                return ORACLE_SOURCE;
+            case SQLSERVER:
+                return SQLSERVER_SOURCE;
+            case MONGODB:
+                return MONGO_SOURCE;
+            case REDIS:
+                return REDIS_SOURCE;
+            case MQTT:
+                return MQTT_SOURCE;
+            case COS:
+                return COS_SOURCE;
+            case SQL:
+                return SQL_SOURCE;
+            default:
+                LOGGER.error("invalid task type {}", taskType);
+                return null;
+        }
     }
 
     public String getSinkClass() {
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index c2a60a0598..06d7018a30 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -47,17 +47,18 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;
  */
 public class TaskProfile extends AbstractConfiguration {
 
-    public static final String DEFAULT_FILE_TASK = 
"org.apache.inlong.agent.plugin.task.logcollection.local.FileTask";
-    public static final String DEFAULT_COS_TASK = 
"org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask";
-    public static final String DEFAULT_KAFKA_TASK = 
"org.apache.inlong.agent.plugin.task.KafkaTask";
-    public static final String DEFAULT_PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
-    public static final String DEFAULT_MONGODB_TASK = 
"org.apache.inlong.agent.plugin.task.MongoDBTask";
-    public static final String DEFAULT_ORACLE_TASK = 
"org.apache.inlong.agent.plugin.task.OracleTask";
-    public static final String DEFAULT_REDIS_TASK = 
"org.apache.inlong.agent.plugin.task.RedisTask";
-    public static final String DEFAULT_POSTGRESQL_TASK = 
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
-    public static final String DEFAULT_MQTT_TASK = 
"org.apache.inlong.agent.plugin.task.MqttTask";
-    public static final String DEFAULT_SQLSERVER_TASK = 
"org.apache.inlong.agent.plugin.task.SQLServerTask";
-    public static final String DEFAULT_MOCK_TASK = 
"org.apache.inlong.agent.plugin.task.MockTask";
+    public static final String SQL_TASK = 
"org.apache.inlong.agent.plugin.task.logcollection.SQLTask";
+    public static final String FILE_TASK = 
"org.apache.inlong.agent.plugin.task.logcollection.local.FileTask";
+    public static final String COS_TASK = 
"org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask";
+    public static final String KAFKA_TASK = 
"org.apache.inlong.agent.plugin.task.KafkaTask";
+    public static final String PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
+    public static final String MONGODB_TASK = 
"org.apache.inlong.agent.plugin.task.MongoDBTask";
+    public static final String ORACLE_TASK = 
"org.apache.inlong.agent.plugin.task.OracleTask";
+    public static final String REDIS_TASK = 
"org.apache.inlong.agent.plugin.task.RedisTask";
+    public static final String POSTGRESQL_TASK = 
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
+    public static final String MQTT_TASK = 
"org.apache.inlong.agent.plugin.task.MqttTask";
+    public static final String SQLSERVER_TASK = 
"org.apache.inlong.agent.plugin.task.SQLServerTask";
+    public static final String MOCK_TASK = 
"org.apache.inlong.agent.plugin.task.MockTask";
 
     private static final Gson GSON = new Gson();
     private static final Logger logger = 
LoggerFactory.getLogger(TaskProfile.class);
@@ -75,28 +76,30 @@ public class TaskProfile extends AbstractConfiguration {
     public String getTaskClass() {
         TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE, 
TaskTypeEnum.FILE.getType()));
         switch (requireNonNull(taskType)) {
+            case SQL:
+                return SQL_TASK;
             case FILE:
-                return DEFAULT_FILE_TASK;
+                return FILE_TASK;
             case KAFKA:
-                return DEFAULT_KAFKA_TASK;
+                return KAFKA_TASK;
             case PULSAR:
-                return DEFAULT_PULSAR_TASK;
+                return PULSAR_TASK;
             case POSTGRES:
-                return DEFAULT_POSTGRESQL_TASK;
+                return POSTGRESQL_TASK;
             case ORACLE:
-                return DEFAULT_ORACLE_TASK;
+                return ORACLE_TASK;
             case SQLSERVER:
-                return DEFAULT_SQLSERVER_TASK;
+                return SQLSERVER_TASK;
             case MONGODB:
-                return DEFAULT_MONGODB_TASK;
+                return MONGODB_TASK;
             case REDIS:
-                return DEFAULT_REDIS_TASK;
+                return REDIS_TASK;
             case MQTT:
-                return DEFAULT_MQTT_TASK;
+                return MQTT_TASK;
             case COS:
-                return DEFAULT_COS_TASK;
+                return COS_TASK;
             case MOCK:
-                return DEFAULT_MOCK_TASK;
+                return MOCK_TASK;
             default:
                 logger.error("invalid task type {}", taskType);
                 return null;
@@ -153,7 +156,7 @@ public class TaskProfile extends AbstractConfiguration {
      */
     @Override
     public boolean allRequiredKeyExist() {
-        return hasKey(TaskConstants.TASK_ID) && 
hasKey(TaskConstants.TASK_SOURCE)
+        return hasKey(TaskConstants.TASK_ID)
                 && hasKey(TaskConstants.TASK_SINK) && 
hasKey(TaskConstants.TASK_CHANNEL)
                 && hasKey(TaskConstants.TASK_GROUP_ID) && 
hasKey(TaskConstants.TASK_STREAM_ID);
     }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 201ec24713..da786b7cf6 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -29,9 +29,6 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_GROUP_ID = "task.groupId";
     public static final String TASK_STREAM_ID = "task.streamId";
     public static final String RESTORE_FROM_DB = "task.restoreFromDB";
-
-    public static final String TASK_SOURCE = "task.source";
-
     public static final String TASK_CHANNEL = "task.channel";
     public static final String TASK_TYPE = "task.taskType";
     public static final String TASK_FILE_TRIGGER = "task.fileTask.trigger";
@@ -75,7 +72,7 @@ public class TaskConstants extends CommonConstants {
     public static final String COS_CONTENT_STYLE = "task.cosTask.contentStyle";
     public static final String COS_MAX_NUM = "task.cosTask.maxFileCount";
     public static final String COS_TASK_PATTERN = "task.cosTask.pattern";
-    public static final String TASK_COS_TIME_OFFSET = 
"task.cosTask.timeOffset";
+    public static final String COS_TIME_OFFSET = "task.cosTask.timeOffset";
     public static final String COS_TASK_RETRY = "task.cosTask.retry";
     public static final String COS_TASK_TIME_FROM = 
"task.cosTask.dataTimeFrom";
     public static final String COS_TASK_TIME_TO = "task.cosTask.dataTimeTo";
@@ -86,6 +83,20 @@ public class TaskConstants extends CommonConstants {
     public static final String COS_DATA_SEPARATOR = 
"task.cosTask.dataSeparator";
     public static final String COS_FILTER_STREAMS = 
"task.cosTask.filterStreams";
 
+    // SQL task
+    public static final String SQL_TASK_CYCLE_UNIT = "task.sqlTask.cycleUnit";
+    public static final String SQL_MAX_NUM = "task.sqlTask.maxInstanceCount";
+    public static final String SQL_TASK_SQL = "task.sqlTask.sql";
+    public static final String SQL_TIME_OFFSET = "task.sqlTask.timeOffset";
+    public static final String SQL_TASK_RETRY = "task.sqlTask.retry";
+    public static final String SQL_TASK_TIME_FROM = 
"task.sqlTask.dataTimeFrom";
+    public static final String SQL_TASK_TIME_TO = "task.sqlTask.dataTimeTo";
+    public static final String SQL_TASK_JDBC_URL = "task.sqlTask.jdbcUrl";
+    public static final String SQL_TASK_USERNAME = "task.sqlTask.username";
+    public static final String SQL_TASK_PASSWORD = "task.sqlTask.jdbcPassword";
+    public static final String SQL_TASK_DATA_SEPARATOR = 
"task.sqlTask.dataSeparator";
+    public static final String SQL_TASK_FETCH_SIZE = "task.sqlTask.fetchSize";
+
     /**
      * delimiter to split offset for different task
      */
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SQLTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SQLTask.java
new file mode 100644
index 0000000000..4a3668fe70
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SQLTask.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class SQLTask {
+
+    private Integer id;
+    private String sql;
+    private String cycleUnit;
+    private Boolean retry;
+    private String dataTimeFrom;
+    private String dataTimeTo;
+    private String timeOffset;
+    // The number of instances that can run simultaneously for this task to
+    // prevent other problems caused by running too many instances 
simultaneously
+    private Integer maxInstanceCount;
+    private String jdbcUrl;
+    private String username;
+    private String jdbcPassword;
+    private String dataSeparator;
+    // The number of rows collected from the data source in each batch
+    private Integer fetchSize;
+
+    @Data
+    public static class SQLTaskConfig {
+
+        private String sql;
+        private String cycleUnit;
+        private Boolean retry;
+        private String dataTimeFrom;
+        private String dataTimeTo;
+        // '1m' means one minute after, '-1m' means one minute before
+        // '1h' means one hour after, '-1h' means one hour before
+        // '1d' means one day after, '-1d' means one day before
+        // Null means from current timestamp
+        private String timeOffset;
+        private Integer maxInstanceCount;
+        private String jdbcUrl;
+        private String username;
+        private String jdbcPassword;
+        private String dataSeparator;
+        private Integer fetchSize;
+    }
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index a30bf44ecc..2e4849a209 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -31,6 +31,7 @@ import 
org.apache.inlong.agent.pojo.OracleTask.OracleTaskConfig;
 import org.apache.inlong.agent.pojo.PostgreSQLTask.PostgreSQLTaskConfig;
 import org.apache.inlong.agent.pojo.PulsarTask.PulsarTaskConfig;
 import org.apache.inlong.agent.pojo.RedisTask.RedisTaskConfig;
+import org.apache.inlong.agent.pojo.SQLTask.SQLTaskConfig;
 import org.apache.inlong.agent.pojo.SqlServerTask.SqlserverTaskConfig;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.enums.TaskTypeEnum;
@@ -57,53 +58,8 @@ public class TaskProfileDto {
     public static final String DEFAULT_DATA_PROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
     public static final String PULSAR_SINK = 
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
     public static final String KAFKA_SINK = 
"org.apache.inlong.agent.plugin.sinks.KafkaSink";
-    /**
-     * file source
-     */
-    public static final String DEFAULT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.LogFileSource";
-    /**
-     * binlog source
-     */
-    public static final String BINLOG_SOURCE = 
"org.apache.inlong.agent.plugin.sources.BinlogSource";
-    /**
-     * kafka source
-     */
-    public static final String KAFKA_SOURCE = 
"org.apache.inlong.agent.plugin.sources.KafkaSource";
-    // pulsar source
-    public static final String PULSAR_SOURCE = 
"org.apache.inlong.agent.plugin.sources.PulsarSource";
-    /**
-     * PostgreSQL source
-     */
-    public static final String POSTGRESQL_SOURCE = 
"org.apache.inlong.agent.plugin.sources.PostgreSQLSource";
-    /**
-     * mongo source
-     */
-    public static final String MONGO_SOURCE = 
"org.apache.inlong.agent.plugin.sources.MongoDBSource";
-    /**
-     * oracle source
-     */
-    public static final String ORACLE_SOURCE = 
"org.apache.inlong.agent.plugin.sources.OracleSource";
-    /**
-     * redis source
-     */
-    public static final String REDIS_SOURCE = 
"org.apache.inlong.agent.plugin.sources.RedisSource";
-    /**
-     * mqtt source
-     */
-    public static final String MQTT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.MqttSource";
-    /**
-     * sqlserver source
-     */
-    public static final String SQLSERVER_SOURCE = 
"org.apache.inlong.agent.plugin.sources.SQLServerSource";
-    /**
-     * cos source
-     */
-    public static final String COS_SOURCE = 
"org.apache.inlong.agent.plugin.sources.COSSource";
-
     private static final Gson GSON = new Gson();
-
     public static final String deafult_time_offset = "0";
-
     private static final String DEFAULT_AUDIT_VERSION = "0";
 
     private Task task;
@@ -154,7 +110,7 @@ public class TaskProfileDto {
                 FileTaskConfig.class);
 
         FileTask.Dir dir = new FileTask.Dir();
-        dir.setPatterns(taskConfig.getPattern());
+        dir.setPatterns(taskConfig.getPattern().trim());
         dir.setBlackList(taskConfig.getBlackList());
         fileTask.setDir(dir);
         fileTask.setCollectType(taskConfig.getCollectType());
@@ -200,7 +156,7 @@ public class TaskProfileDto {
         cosTask.setId(dataConfig.getTaskId());
         COSTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(),
                 COSTaskConfig.class);
-        cosTask.setPattern(taskConfig.getPattern());
+        cosTask.setPattern(taskConfig.getPattern().trim());
         cosTask.setCollectType(taskConfig.getCollectType());
         cosTask.setContentStyle(taskConfig.getContentStyle());
         cosTask.setDataSeparator(taskConfig.getDataSeparator());
@@ -224,6 +180,30 @@ public class TaskProfileDto {
         return cosTask;
     }
 
+    private static SQLTask getSQLTask(DataConfig dataConfig) {
+        SQLTask sqlTask = new SQLTask();
+        sqlTask.setId(dataConfig.getTaskId());
+        SQLTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(),
+                SQLTaskConfig.class);
+        sqlTask.setSql(taskConfig.getSql());
+        sqlTask.setMaxInstanceCount(taskConfig.getMaxInstanceCount());
+        sqlTask.setRetry(taskConfig.getRetry());
+        sqlTask.setCycleUnit(taskConfig.getCycleUnit());
+        sqlTask.setDataTimeFrom(taskConfig.getDataTimeFrom());
+        sqlTask.setDataTimeTo(taskConfig.getDataTimeTo());
+        sqlTask.setUsername(taskConfig.getUsername());
+        sqlTask.setJdbcPassword(taskConfig.getJdbcPassword());
+        sqlTask.setDataSeparator(taskConfig.getDataSeparator());
+        sqlTask.setFetchSize(taskConfig.getFetchSize());
+        sqlTask.setJdbcUrl(taskConfig.getJdbcUrl());
+        if (taskConfig.getTimeOffset() != null) {
+            sqlTask.setTimeOffset(taskConfig.getTimeOffset());
+        } else {
+            sqlTask.setTimeOffset(deafult_time_offset + 
sqlTask.getCycleUnit());
+        }
+        return sqlTask;
+    }
+
     private static KafkaTask getKafkaTask(DataConfig dataConfigs) {
 
         KafkaTaskConfig kafkaTaskConfig = 
GSON.fromJson(dataConfigs.getExtParams(),
@@ -500,66 +480,62 @@ public class TaskProfileDto {
         TaskTypeEnum taskType = 
TaskTypeEnum.getTaskType(dataConfig.getTaskType());
         switch (requireNonNull(taskType)) {
             case SQL:
+                SQLTask sqlTask = getSQLTask(dataConfig);
+                task.setCycleUnit(sqlTask.getCycleUnit());
+                task.setSqlTask(sqlTask);
+                task.setRetry(sqlTask.getRetry());
+                profileDto.setTask(task);
+                break;
             case BINLOG:
                 BinlogTask binlogTask = getBinlogTask(dataConfig);
                 task.setBinlogTask(binlogTask);
-                task.setSource(BINLOG_SOURCE);
                 profileDto.setTask(task);
                 break;
             case FILE:
                 FileTask fileTask = getFileTask(dataConfig);
                 task.setCycleUnit(fileTask.getCycleUnit());
                 task.setFileTask(fileTask);
-                task.setSource(DEFAULT_SOURCE);
                 task.setRetry(fileTask.getRetry());
                 profileDto.setTask(task);
                 break;
             case KAFKA:
                 KafkaTask kafkaTask = getKafkaTask(dataConfig);
                 task.setKafkaTask(kafkaTask);
-                task.setSource(KAFKA_SOURCE);
                 profileDto.setTask(task);
                 break;
             case PULSAR:
                 PulsarTask pulsarTask = getPulsarTask(dataConfig);
                 task.setPulsarTask(pulsarTask);
-                task.setSource(PULSAR_SOURCE);
                 profileDto.setTask(task);
                 break;
             case POSTGRES:
                 PostgreSQLTask postgreSQLTask = getPostgresTask(dataConfig);
                 task.setPostgreSQLTask(postgreSQLTask);
-                task.setSource(POSTGRESQL_SOURCE);
                 profileDto.setTask(task);
                 break;
             case ORACLE:
                 OracleTask oracleTask = getOracleTask(dataConfig);
                 task.setOracleTask(oracleTask);
-                task.setSource(ORACLE_SOURCE);
                 profileDto.setTask(task);
                 break;
             case SQLSERVER:
                 SqlServerTask sqlserverTask = getSqlServerTask(dataConfig);
                 task.setSqlserverTask(sqlserverTask);
-                task.setSource(SQLSERVER_SOURCE);
                 profileDto.setTask(task);
                 break;
             case MONGODB:
                 MongoTask mongoTask = getMongoTask(dataConfig);
                 task.setMongoTask(mongoTask);
-                task.setSource(MONGO_SOURCE);
                 profileDto.setTask(task);
                 break;
             case REDIS:
                 RedisTask redisTask = getRedisTask(dataConfig);
                 task.setRedisTask(redisTask);
-                task.setSource(REDIS_SOURCE);
                 profileDto.setTask(task);
                 break;
             case MQTT:
                 MqttTask mqttTask = getMqttTask(dataConfig);
                 task.setMqttTask(mqttTask);
-                task.setSource(MQTT_SOURCE);
                 profileDto.setTask(task);
                 break;
             case MOCK:
@@ -569,7 +545,6 @@ public class TaskProfileDto {
                 COSTask cosTask = getCOSTask(dataConfig);
                 task.setCycleUnit(cosTask.getCycleUnit());
                 task.setCosTask(cosTask);
-                task.setSource(COS_SOURCE);
                 task.setRetry(cosTask.getRetry());
                 profileDto.setTask(task);
                 break;
@@ -618,6 +593,7 @@ public class TaskProfileDto {
         private MqttTask mqttTask;
         private SqlServerTask sqlserverTask;
         private COSTask cosTask;
+        private SQLTask sqlTask;
     }
 
     @Data
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLInstance.java
new file mode 100644
index 0000000000..eb79f1fa21
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLInstance.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+import java.io.IOException;
+
+/**
+ * SQL instance contains source and sink.
+ * main job is to read from source and write to sink
+ */
+public class SQLInstance extends CommonInstance {
+
+    @Override
+    public void setInodeInfo(InstanceProfile profile) throws IOException {
+        profile.set(TaskConstants.INODE_INFO, "");
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
index b6792ac82d..5dd236f710 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
@@ -119,7 +119,7 @@ public class COSSource extends AbstractSource {
             if (offsetProfile != null) {
                 offset = offsetProfile.toJsonStr();
             }
-            LOGGER.info("LogFileSource init: {} offset: {}", 
profile.toJsonStr(), offset);
+            LOGGER.info("COS source init: {} offset: {}", profile.toJsonStr(), 
offset);
             AgentConfiguration conf = AgentConfiguration.getAgentConf();
             int permit = conf.getInt(AGENT_GLOBAL_COS_SOURCE_PERMIT, 
DEFAULT_AGENT_GLOBAL_COS_SOURCE_PERMIT);
             
MemoryManager.getInstance().addSemaphore(AGENT_GLOBAL_COS_SOURCE_PERMIT, 
permit);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
new file mode 100755
index 0000000000..e73db1be90
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.except.FileException;
+import org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler;
+import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.agent.constant.TaskConstants.SQL_TASK_DATA_SEPARATOR;
+import static 
org.apache.inlong.agent.constant.TaskConstants.SQL_TASK_FETCH_SIZE;
+
+/**
+ * Read by SQL
+ */
+public class SQLSource extends AbstractSource {
+
+    public static final String AGENT_GLOBAL_SQL_SOURCE_PERMIT = 
"agent.global.sql.source.permit";
+    public static final int DEFAULT_AGENT_GLOBAL_SQL_SOURCE_PERMIT = 128 * 
1000 * 1000;
+    private int MAX_RECONNECT_TIMES = 3;
+    private int RECONNECT_INTERVAL_SECOND = 10;
+    private int DEFAULT_FETCH_SIZE = 1000;
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    protected static class FileOffset {
+
+        private Long lineOffset;
+        private Long byteOffset;
+        private boolean hasByteOffset;
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SQLSource.class);
+    public static final String OFFSET_SEP = ":";
+    protected final Integer WAIT_TIMEOUT_MS = 10;
+
+    private String finalSQL;
+    private String jdbcUrl;
+    private String username;
+    private String password;
+    private Connection conn;
+    protected BlockingQueue<SourceData> queue;
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("sql-source-pool"));
+    private volatile boolean running = false;
+    private boolean isMysql = true;
+    private volatile boolean fileExist = true;
+
+    public SQLSource() {
+    }
+
+    @Override
+    protected void initExtendClass() {
+        extendClass = DefaultExtendedHandler.class.getCanonicalName();
+    }
+
+    @Override
+    protected void initSource(InstanceProfile profile) {
+        try {
+            LOGGER.info("sql source init: {}", profile.toJsonStr());
+            AgentConfiguration conf = AgentConfiguration.getAgentConf();
+            int permit = conf.getInt(AGENT_GLOBAL_SQL_SOURCE_PERMIT, 
DEFAULT_AGENT_GLOBAL_SQL_SOURCE_PERMIT);
+            
MemoryManager.getInstance().addSemaphore(AGENT_GLOBAL_SQL_SOURCE_PERMIT, 
permit);
+            finalSQL = profile.getInstanceId();
+            jdbcUrl = 
profile.get(TaskConstants.SQL_TASK_JDBC_URL).trim().replace("\r", "")
+                    .replace("\n", "");
+            if (jdbcUrl.startsWith("jdbc:mysql:")) {
+                isMysql = true;
+            }
+            username = profile.get(TaskConstants.SQL_TASK_USERNAME);
+            password = profile.get(TaskConstants.SQL_TASK_PASSWORD);
+
+            queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+            initConn();
+            EXECUTOR_SERVICE.execute(run());
+        } catch (Exception ex) {
+            stopRunning();
+            throw new FileException("error init stream for " + finalSQL, ex);
+        }
+    }
+
+    private void initConn() throws SQLException {
+        int retryTimes = 0;
+        while (conn == null) {
+            try {
+                conn = DriverManager.getConnection(jdbcUrl, username, 
password);
+            } catch (Exception e) {
+                retryTimes++;
+                if (retryTimes >= MAX_RECONNECT_TIMES) {
+                    throw new SQLException(
+                            "Failed to connect database after retry " + 
retryTimes + " times.", e);
+                }
+                LOGGER.warn("Reconnect database after {} seconds due to the 
following error: {}",
+                        RECONNECT_INTERVAL_SECOND, e.getMessage());
+                AgentUtils.silenceSleepInSeconds(RECONNECT_INTERVAL_SECOND);
+            }
+        }
+
+    }
+
+    @Override
+    protected boolean doPrepareToRead() {
+
+        return true;
+    }
+
+    @Override
+    protected List<SourceData> readFromSource() {
+        if (queue.isEmpty()) {
+            return null;
+        }
+        int count = 0;
+        int len = 0;
+        List<SourceData> lines = new ArrayList<>();
+        while (!queue.isEmpty() && count < BATCH_READ_LINE_COUNT && len < 
BATCH_READ_LINE_TOTAL_LEN) {
+            if (len + queue.peek().getData().length > 
BATCH_READ_LINE_TOTAL_LEN) {
+                break;
+            }
+            len += queue.peek().getData().length;
+            count++;
+            lines.add(queue.poll());
+        }
+        MemoryManager.getInstance().release(AGENT_GLOBAL_SQL_SOURCE_PERMIT, 
len);
+        return lines;
+    }
+
+    @Override
+    protected void printCurrentState() {
+        LOGGER.info("sql source running, sql: {}", instanceId);
+    }
+
+    @Override
+    protected String getThreadName() {
+        return "sql-source-" + taskId + "-" + finalSQL;
+    }
+
+    private Runnable run() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName());
+            running = true;
+            try {
+                doRun();
+            } catch (Throwable e) {
+                fileExist = false;
+                LOGGER.error("do run error maybe connect broken: ", e);
+                ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+            }
+            running = false;
+        };
+    }
+
+    protected void doRun() throws SQLException, IOException {
+        try (Statement stmt = 
conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+            if (isMysql) {
+                stmt.setFetchSize(Integer.MIN_VALUE);
+            } else {
+                stmt.setFetchSize(profile.getInt(SQL_TASK_FETCH_SIZE, 
DEFAULT_FETCH_SIZE));
+            }
+            try (ResultSet rs = stmt.executeQuery(profile.getInstanceId())) {
+                ResultSetMetaData metaData = rs.getMetaData();
+                int columnCount = metaData.getColumnCount();
+                ByteArrayOutputStream bas = new ByteArrayOutputStream();
+                byte[] sep = 
profile.get(SQL_TASK_DATA_SEPARATOR).getBytes(StandardCharsets.UTF_8);
+                while (rs.next()) {
+                    for (int i = 1; i <= columnCount; i++) {
+                        if (i > 1) {
+                            bas.write(sep);
+                        }
+                        bas.write(rs.getBytes(i));
+                    }
+                    SourceData sourceData = new SourceData(bas.toByteArray(),
+                            getOffsetString(0L, 0L));
+                    boolean suc = false;
+                    while (isRunnable() && !suc) {
+                        boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_SQL_SOURCE_PERMIT, sourceData.getData().length);
+                        if (!suc4Queue) {
+                            break;
+                        }
+                        suc = queue.offer(sourceData);
+                        if (!suc) {
+                            MemoryManager.getInstance()
+                                    .release(AGENT_GLOBAL_SQL_SOURCE_PERMIT, 
sourceData.getData().length);
+                            AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
+                        }
+                    }
+                    bas.reset();
+                }
+            }
+        } catch (SQLException e) {
+            LOGGER.error("read from result set error: ", e);
+            throw e;
+        } catch (IOException e) {
+            LOGGER.error("read from result io error: ", e);
+            throw e;
+        }
+    }
+
+    private String getOffsetString(Long lineOffset, Long byteOffset) {
+        return lineOffset + OFFSET_SEP + byteOffset;
+    }
+
+    @Override
+    protected boolean isRunnable() {
+        return runnable && fileExist;
+    }
+
+    @Override
+    public boolean sourceExist() {
+        return fileExist;
+    }
+
+    @Override
+    protected void releaseSource() {
+        while (running) {
+            AgentUtils.silenceSleepInMs(1);
+        }
+        if (conn != null) {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                LOGGER.error("close connect error: ", e);
+            }
+        }
+        while (!queue.isEmpty()) {
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_SQL_SOURCE_PERMIT, 
queue.poll().getData().length);
+        }
+    }
+
+    private boolean waitForPermit(String permitName, int permitLen) {
+        boolean suc = false;
+        while (!suc) {
+            suc = MemoryManager.getInstance().tryAcquire(permitName, 
permitLen);
+            if (!suc) {
+                MemoryManager.getInstance().printDetail(permitName, 
"sql_source");
+                if (!isRunnable()) {
+                    return false;
+                }
+                AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
index e234042524..4c7b729a4d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
@@ -20,7 +20,7 @@ package org.apache.inlong.agent.plugin.task.logcollection;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.core.task.TaskAction;
 import org.apache.inlong.agent.plugin.task.AbstractTask;
-import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
 import org.apache.inlong.agent.plugin.utils.regex.Scanner;
 import org.apache.inlong.agent.state.State;
 
@@ -156,18 +156,50 @@ public abstract class LogAbstractTask extends 
AbstractTask {
      * offset
      */
     private boolean shouldStartNow(String dataTime) {
-        String shouldStartTime = NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), timeOffset);
+        String shouldStartTime = DateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), timeOffset);
         String currentTime = getCurrentTime();
         return currentTime.compareTo(shouldStartTime) >= 0;
     }
 
     private String getCurrentTime() {
-        SimpleDateFormat dateFormat = new 
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
-        TimeZone timeZone = 
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
+        SimpleDateFormat dateFormat = new 
SimpleDateFormat(DateUtils.DEFAULT_FORMAT);
+        TimeZone timeZone = TimeZone.getTimeZone(DateUtils.DEFAULT_TIME_ZONE);
         dateFormat.setTimeZone(timeZone);
         return dateFormat.format(new Date(System.currentTimeMillis()));
     }
 
+    protected void addToEvenMap(String fileName, String dataTime, Long 
fileUpdateTime, String cycleUnit) {
+        if (isInEventMap(fileName, dataTime)) {
+            LOGGER.info("add to evenMap isInEventMap returns true skip taskId 
{} dataTime {} fileName {}",
+                    taskProfile.getTaskId(), dataTime, fileName);
+            return;
+        }
+        if (!shouldAddAgain(fileName, fileUpdateTime)) {
+            LOGGER.info("add to evenMap shouldAddAgain returns false skip 
taskId {} dataTime {} fileName {}",
+                    taskProfile.getTaskId(), dataTime, fileName);
+            return;
+        }
+        Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.computeIfAbsent(dataTime,
+                mapKey -> new ConcurrentHashMap<>());
+        boolean containsInMemory = sameDataTimeEvents.containsKey(fileName);
+        if (containsInMemory) {
+            LOGGER.error("should not happen! may be {} has been deleted and 
add again", fileName);
+            return;
+        }
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(fileName, cycleUnit, dataTime,
+                fileUpdateTime);
+        sameDataTimeEvents.put(fileName, instanceProfile);
+        LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", 
taskProfile.getTaskId(), dataTime, fileName);
+    }
+
+    private boolean isInEventMap(String fileName, String dataTime) {
+        Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime);
+        if (fileToProfile == null) {
+            return false;
+        }
+        return fileToProfile.get(fileName) != null;
+    }
+
     protected void removeTimeoutEvent(Map<String, Map<String, 
InstanceProfile>> eventMap, boolean isRetry) {
         if (isRetry) {
             return;
@@ -175,7 +207,7 @@ public abstract class LogAbstractTask extends AbstractTask {
         for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
             /* If the data time of the event is within 2 days before (after) 
the current time, it is valid */
             String dataTime = entry.getKey();
-            if (!NewDateUtils.isValidCreationTime(dataTime, 
DAY_TIMEOUT_INTERVAL)) {
+            if (!DateUtils.isValidCreationTime(dataTime, 
DAY_TIMEOUT_INTERVAL)) {
                 /* Remove it from memory map. */
                 eventMap.remove(dataTime);
                 LOGGER.warn("remove too old event from event map. dataTime 
{}", dataTime);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/SQLTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/SQLTask.java
new file mode 100644
index 0000000000..ba6995bf03
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/SQLTask.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.logcollection;
+
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner.FinalPatternInfo;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.util.List;
+
+public class SQLTask extends LogAbstractTask {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SQLTask.class);
+    private String originPattern;
+    private long lastScanTime = 0;
+    public final long SCAN_INTERVAL = 1 * 60 * 1000;
+
+    @Override
+    protected int getInstanceLimit() {
+        return taskProfile.getInt(TaskConstants.SQL_MAX_NUM);
+    }
+
+    @Override
+    protected void initTask() {
+        super.initTask();
+        timeOffset = taskProfile.get(TaskConstants.SQL_TIME_OFFSET, "");
+        retry = taskProfile.getBoolean(TaskConstants.SQL_TASK_RETRY, false);
+        originPattern = taskProfile.get(TaskConstants.SQL_TASK_SQL);
+        if (retry) {
+            initRetryTask(taskProfile);
+        }
+    }
+
+    private boolean initRetryTask(TaskProfile profile) {
+        String dataTimeFrom = profile.get(TaskConstants.SQL_TASK_TIME_FROM, 
"");
+        String dataTimeTo = profile.get(TaskConstants.SQL_TASK_TIME_TO, "");
+        try {
+            startTime = DateTransUtils.timeStrConvertToMillSec(dataTimeFrom, 
profile.getCycleUnit());
+            endTime = DateTransUtils.timeStrConvertToMillSec(dataTimeTo, 
profile.getCycleUnit());
+        } catch (ParseException e) {
+            LOGGER.error("retry task time error start {} end {}", 
dataTimeFrom, dataTimeTo, e);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        if (!profile.allRequiredKeyExist()) {
+            LOGGER.error("task profile needs all required key");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.SQL_TASK_CYCLE_UNIT)) {
+            LOGGER.error("task profile needs sql cycle unit");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_CYCLE_UNIT)) {
+            LOGGER.error("task profile needs cycle unit");
+            return false;
+        }
+        if (profile.get(TaskConstants.TASK_CYCLE_UNIT)
+                .compareTo(profile.get(TaskConstants.SQL_TASK_CYCLE_UNIT)) != 
0) {
+            LOGGER.error("task profile cycle unit must be consistent");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_TIME_ZONE)) {
+            LOGGER.error("task profile needs time zone");
+            return false;
+        }
+        boolean ret = profile.hasKey(TaskConstants.SQL_TASK_SQL)
+                && profile.hasKey(TaskConstants.SQL_MAX_NUM);
+        if (!ret) {
+            LOGGER.error("task profile needs file keys");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.SQL_TIME_OFFSET)) {
+            LOGGER.error("task profile needs time offset");
+            return false;
+        }
+        if (profile.getBoolean(TaskConstants.SQL_TASK_RETRY, false)) {
+            if (!initRetryTask(profile)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    protected void releaseTask() {
+    }
+
+    @Override
+    protected void runForNormal() {
+        if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) {
+            scanExistingFile();
+            lastScanTime = AgentUtils.getCurrentTime();
+        }
+        dealWithEventMap();
+    }
+
+    @Override
+    protected void scanExistingFile() {
+        List<FinalPatternInfo> finalPatternInfos = 
Scanner.getFinalPatternInfos(originPattern,
+                taskProfile.getCycleUnit(), timeOffset, startTime, endTime, 
retry);
+        LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), 
originPattern, finalPatternInfos.size());
+        finalPatternInfos.forEach((fileInfo) -> {
+            String dataTime = 
DateTransUtils.millSecConvertToTimeStr(fileInfo.dataTime, 
taskProfile.getCycleUnit());
+            addToEvenMap(fileInfo.finalPattern, dataTime, 0L, 
taskProfile.getCycleUnit());
+            if (retry) {
+                instanceCount++;
+            }
+        });
+    }
+
+    @Override
+    protected void dealWithEventMap() {
+        removeTimeoutEvent(eventMap, retry);
+        dealWithEventMapWithCycle();
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
index 86210157ed..c7bb5c9309 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/cos/COSTask.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.agent.plugin.task.logcollection.cos;
 
-import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.plugin.task.logcollection.LogAbstractTask;
@@ -33,12 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.text.ParseException;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
-/**
- * Watch directory, if new valid files are created, create instance 
correspondingly.
- */
 public class COSTask extends LogAbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(COSTask.class);
@@ -59,7 +53,7 @@ public class COSTask extends LogAbstractTask {
     @Override
     protected void initTask() {
         super.initTask();
-        timeOffset = taskProfile.get(TaskConstants.TASK_COS_TIME_OFFSET, "");
+        timeOffset = taskProfile.get(TaskConstants.COS_TIME_OFFSET, "");
         retry = taskProfile.getBoolean(TaskConstants.COS_TASK_RETRY, false);
         originPattern = taskProfile.get(TaskConstants.COS_TASK_PATTERN);
         bucketName = taskProfile.get(TaskConstants.COS_TASK_BUCKET_NAME);
@@ -114,7 +108,7 @@ public class COSTask extends LogAbstractTask {
             LOGGER.error("task profile needs file keys");
             return false;
         }
-        if (!profile.hasKey(TaskConstants.TASK_COS_TIME_OFFSET)) {
+        if (!profile.hasKey(TaskConstants.COS_TIME_OFFSET)) {
             LOGGER.error("task profile needs time offset");
             return false;
         }
@@ -146,51 +140,19 @@ public class COSTask extends LogAbstractTask {
                 taskProfile.getCycleUnit(), timeOffset, startTime, endTime, 
retry);
         LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), 
originPattern, fileInfos.size());
         fileInfos.forEach((fileInfo) -> {
-            addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
+            String fileName = fileInfo.fileName;
+            String dataTime = fileInfo.dataTime;
+            ObjectMetadata meta = cosClient.getObjectMetadata(bucketName, 
fileName);
+            addToEvenMap(fileName, dataTime, meta.getLastModified().getTime(), 
taskProfile.getCycleUnit());
             if (retry) {
                 instanceCount++;
             }
         });
     }
 
-    private boolean isInEventMap(String fileName, String dataTime) {
-        Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime);
-        if (fileToProfile == null) {
-            return false;
-        }
-        return fileToProfile.get(fileName) != null;
-    }
-
     @Override
     protected void dealWithEventMap() {
         removeTimeoutEvent(eventMap, retry);
         dealWithEventMapWithCycle();
     }
-
-    private void addToEvenMap(String fileName, String dataTime) {
-        if (isInEventMap(fileName, dataTime)) {
-            LOGGER.info("add to evenMap isInEventMap returns true skip taskId 
{} dataTime {} fileName {}",
-                    taskProfile.getTaskId(), dataTime, fileName);
-            return;
-        }
-        ObjectMetadata meta = cosClient.getObjectMetadata(bucketName, 
fileName);
-        Long fileUpdateTime = meta.getLastModified().getTime();
-        if (!shouldAddAgain(fileName, fileUpdateTime)) {
-            LOGGER.info("add to evenMap shouldAddAgain returns false skip 
taskId {} dataTime {} fileName {}",
-                    taskProfile.getTaskId(), dataTime, fileName);
-            return;
-        }
-        Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.computeIfAbsent(dataTime,
-                mapKey -> new ConcurrentHashMap<>());
-        boolean containsInMemory = sameDataTimeEvents.containsKey(fileName);
-        if (containsInMemory) {
-            LOGGER.error("should not happen! may be {} has been deleted and 
add again", fileName);
-            return;
-        }
-        String cycleUnit = taskProfile.getCycleUnit();
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(fileName, cycleUnit, dataTime,
-                fileUpdateTime);
-        sameDataTimeEvents.put(fileName, instanceProfile);
-        LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", 
taskProfile.getTaskId(), dataTime, fileName);
-    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
index ef50d1fc62..82fca57109 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/FileTask.java
@@ -17,13 +17,12 @@
 
 package org.apache.inlong.agent.plugin.task.logcollection.local;
 
-import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.plugin.task.logcollection.LogAbstractTask;
 import 
org.apache.inlong.agent.plugin.task.logcollection.local.FileScanner.BasicFileInfo;
-import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
 import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression;
 import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
 import org.apache.inlong.agent.utils.AgentUtils;
@@ -222,7 +221,9 @@ public class FileTask extends LogAbstractTask {
             List<BasicFileInfo> fileInfos = 
scanExistingFileByPattern(originPattern);
             LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), 
originPattern, fileInfos.size());
             fileInfos.forEach((fileInfo) -> {
-                addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
+                String fileName = fileInfo.fileName;
+                Long fileUpdateTime = 
FileUtils.getFileLastModifyTime(fileName);
+                addToEvenMap(fileName, fileInfo.dataTime, fileUpdateTime, 
taskProfile.getCycleUnit());
                 if (retry) {
                     instanceCount++;
                 }
@@ -230,17 +231,6 @@ public class FileTask extends LogAbstractTask {
         });
     }
 
-    private boolean isInEventMap(String fileName, String dataTime) {
-        Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime);
-        if (fileToProfile == null) {
-            return false;
-        }
-        if (fileToProfile.get(fileName) == null) {
-            return false;
-        }
-        return true;
-    }
-
     private List<BasicFileInfo> scanExistingFileByPattern(String 
originPattern) {
         if (realTime) {
             return FileScanner.scanTaskBetweenTimes(originPattern, 
CycleUnitType.HOUR, timeOffset,
@@ -306,15 +296,14 @@ public class FileTask extends LogAbstractTask {
 
     private void dealWithWatchKey(WatchEntity entity, WatchKey key) throws 
IOException {
         Path contextPath = entity.getPath(key);
-        LOGGER.info("Find creation events in path: " + 
contextPath.toAbsolutePath());
+        LOGGER.info("Find creation events in path: {}", 
contextPath.toAbsolutePath());
         for (WatchEvent<?> watchEvent : key.pollEvents()) {
             Path child = resolvePathFromEvent(watchEvent, contextPath);
             if (child == null) {
                 continue;
             }
             if (Files.isDirectory(child)) {
-                LOGGER.info("The find creation event is triggered by a 
directory: " + child
-                        .getFileName());
+                LOGGER.info("The find creation event is triggered by a 
directory: {}", child.getFileName());
                 entity.registerRecursively(child);
                 continue;
             }
@@ -349,61 +338,30 @@ public class FileTask extends LogAbstractTask {
         Matcher matcher = entity.getPattern().matcher(newFileName);
         if (matcher.matches() || matcher.lookingAt()) {
             LOGGER.info("matched file {} {}", newFileName, 
entity.getPattern());
-            String dataTime = getDataTimeFromFileName(newFileName, 
entity.getOriginPattern(),
-                    entity.getDateExpression());
+            String dataTime = getDataTimeFromFileName(newFileName, 
entity.getDateExpression());
             if (!checkFileNameForTime(newFileName, entity)) {
                 LOGGER.error("File Timeout {} {}", newFileName, dataTime);
                 return;
             }
-            addToEvenMap(newFileName, dataTime);
-        }
-    }
-
-    private void addToEvenMap(String fileName, String dataTime) {
-        if (isInEventMap(fileName, dataTime)) {
-            LOGGER.info("addToEvenMap isInEventMap returns true skip taskId {} 
dataTime {} fileName {}",
-                    taskProfile.getTaskId(), dataTime, fileName);
-            return;
-        }
-        Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName);
-        if (!shouldAddAgain(fileName, fileUpdateTime)) {
-            LOGGER.info("addToEvenMap shouldAddAgain returns false skip taskId 
{} dataTime {} fileName {}",
-                    taskProfile.getTaskId(), dataTime, fileName);
-            return;
-        }
-        Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.computeIfAbsent(dataTime,
-                mapKey -> new ConcurrentHashMap<>());
-        boolean containsInMemory = sameDataTimeEvents.containsKey(fileName);
-        if (containsInMemory) {
-            LOGGER.error("should not happen! may be {} has been deleted and 
add again", fileName);
-            return;
-        }
-        String cycleUnit = "";
-        if (realTime) {
-            cycleUnit = CycleUnitType.HOUR;
-        } else {
-            cycleUnit = taskProfile.getCycleUnit();
+            Long fileUpdateTime = FileUtils.getFileLastModifyTime(newFileName);
+            addToEvenMap(newFileName, dataTime, fileUpdateTime, 
taskProfile.getCycleUnit());
         }
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(fileName, cycleUnit, dataTime,
-                fileUpdateTime);
-        sameDataTimeEvents.put(fileName, instanceProfile);
-        LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", 
taskProfile.getTaskId(), dataTime, fileName);
     }
 
     private boolean checkFileNameForTime(String newFileName, WatchEntity 
entity) {
         /* Get the data time for this file. */
         PathDateExpression dateExpression = entity.getDateExpression();
         if (dateExpression.getLongestDatePattern().length() != 0) {
-            String dataTime = getDataTimeFromFileName(newFileName, 
entity.getOriginPattern(), dateExpression);
+            String dataTime = getDataTimeFromFileName(newFileName, 
dateExpression);
             LOGGER.info("file {}, fileTime {}", newFileName, dataTime);
-            if (!NewDateUtils.isValidCreationTime(dataTime, 
entity.getCycleUnit(), timeOffset)) {
+            if (!DateUtils.isValidCreationTime(dataTime, 
entity.getCycleUnit(), timeOffset)) {
                 return false;
             }
         }
         return true;
     }
 
-    private String getDataTimeFromFileName(String fileName, String 
originPattern, PathDateExpression dateExpression) {
+    private String getDataTimeFromFileName(String fileName, PathDateExpression 
dateExpression) {
         /*
          * TODO: what if this file doesn't have any date pattern regex.
          *
@@ -411,7 +369,7 @@ public class FileTask extends LogAbstractTask {
          * reading and start reading this new file.
          */
         // Extract data time from file name
-        String fileTime = NewDateUtils.getDateTime(fileName, originPattern, 
dateExpression);
+        String fileTime = DateUtils.getDateTime(fileName, dateExpression);
 
         /**
          * Replace any non-numeric characters in the file time
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
index 47d052e81b..b539334039 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/local/WatchEntity.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.agent.plugin.task.logcollection.local;
 
 import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
-import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
 import org.apache.inlong.agent.plugin.utils.regex.NonRegexPatternPosition;
 import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression;
 import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
@@ -71,12 +70,12 @@ public class WatchEntity {
         this.originPattern = originPattern;
         ArrayList<String> directoryLayers = 
PatternUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
         this.basicStaticPath = directoryLayers.get(0);
-        this.regexPattern = 
NewDateUtils.replaceDateExpressionWithRegex(originPattern);
+        this.regexPattern = 
DateUtils.replaceDateExpressionWithRegex(originPattern);
         pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL | Pattern.MULTILINE);
         ArrayList<String> directories = 
PatternUtil.cutDirectoryByWildcard(originPattern);
         this.originPatternWithoutFileName = directories.get(0);
         this.patternWithoutFileName = Pattern
-                
.compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
+                
.compile(DateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
                         Pattern.CASE_INSENSITIVE | Pattern.DOTALL | 
Pattern.MULTILINE);
         /*
          * Get the longest data regex from the data name, it's used if we want 
to get out the data time from the file
@@ -219,7 +218,7 @@ public class WatchEntity {
         } else {
             dirPattern = originPatternWithoutFileName.substring(0, index);
         }
-        Pattern pattern = 
Pattern.compile(NewDateUtils.replaceDateExpressionWithRegex(dirPattern),
+        Pattern pattern = 
Pattern.compile(DateUtils.replaceDateExpressionWithRegex(dirPattern),
                 Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
         return pattern;
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
index 2366dbae06..aad912b46f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
@@ -17,11 +17,20 @@
 
 package org.apache.inlong.agent.plugin.utils.regex;
 
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.utils.DateTransUtils;
+
 import hirondelle.date4j.DateTime;
 import org.apache.commons.lang.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
 import java.util.Objects;
 import java.util.TimeZone;
 import java.util.regex.Matcher;
@@ -29,31 +38,343 @@ import java.util.regex.Pattern;
 
 public class DateUtils {
 
-    private static final Logger logger = LogManager.getLogger(DateUtils.class);
+    private static final String YEAR = "YYYY";
+    private static final String YEAR_LOWERCASE = "yyyy";
+    private static final String MONTH = "MM";
+    private static final String DAY = "DD";
+    private static final String DAY_LOWERCASE = "dd";
+    private static final String HOUR = "HH";
+    private static final String HOUR_LOWERCASE = "hh";
+    private static final String MINUTE = "mm";
+    public static final String DEFAULT_FORMAT = "yyyyMMddHHmm";
+    public static final String DEFAULT_TIME_ZONE = "Asia/Shanghai";
+    private static final Logger logger = 
LoggerFactory.getLogger(DateUtils.class);
     private static final String TIME_REGEX = 
"YYYY(?:.MM|MM)?(?:.DD|DD)?(?:.hh|hh)?(?:.mm|mm)?(?:"
             + ".ss|ss)?";
-    private static final String LIMIT_SEP = "(?<=[a-zA-Z])";
-    private static final String LETTER_STR = "\\D+";
-    private static final String DIGIT_STR = "[0-9]+";
     private static final Pattern pattern = Pattern.compile(TIME_REGEX,
             Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
-    private String dateFormat = "YYYYMMDDhhmmss";
+    private static final Pattern bracePatt = Pattern.compile("\\{(.*?)\\}");
+    private static final int DEFAULT_LENGTH = "yyyyMMddHHmm".length();
+    public static long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
+    public static long HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
 
-    public static String getSubTimeFormat(String format, int length) {
-        // format may be "YYYYMMDDhhmmss" | "YYYY_MM_DD_hh_mm_ss"
-        int formatLen = format.length();
-        StringBuffer sb = new StringBuffer();
+    public static String getShouldStartTime(String dataTime, String cycleUnit,
+            String offset) {
+        if (dataTime == null || dataTime.length() > 12) {
+            return null;
+        }
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat(DEFAULT_FORMAT);
+        TimeZone timeZone = TimeZone.getTimeZone(DateUtils.DEFAULT_TIME_ZONE);
+        dateFormat.setTimeZone(timeZone);
+
+        if (dataTime.length() < DEFAULT_LENGTH) {
+            StringBuffer sb = new StringBuffer();
+            for (int i = 0; i < DEFAULT_LENGTH - dataTime.length(); i++) {
+                sb.append("0");
+            }
+            dataTime = dataTime + sb.toString();
+        }
+
+        Calendar calendar = Calendar.getInstance();
+        try {
+            calendar.setTimeInMillis(dateFormat.parse(dataTime).getTime());
+        } catch (ParseException e) {
+            return null;
+        }
+
+        /*
+         * The delay should be added to the data time, so remove the - from 
offset.
+         */
+        if (offset.startsWith("-")) {
+            offset = offset.substring(1, offset.length());
+        } else { // positive,read file earlier
+            offset = "-" + offset;
+        }
+
+        return dateFormat
+                .format(new Date(getDateTime(calendar, cycleUnit, 
offset).getTimeInMillis()));
+    }
+
+    private static Calendar getDateTime(Calendar calendar, String cycleUnit, 
String offset) {
+        int cycleNumber = (cycleUnit.length() <= 1 ? 1
+                : Integer.parseInt(cycleUnit.substring(0, cycleUnit.length() - 
1)));
+
+        String offsetUnit = offset.substring(offset.length() - 1, 
offset.length());
+        int offsetNumber = Integer.parseInt(offset.substring(0, 
offset.length() - 1));
+
+        /*
+         * For day task, the offset cycle unit can only be day; for hourly 
task, the offset can't be minute; for
+         * minutely task, the offset cycle unit can be day, hour and minute, 
but if the offset cycle unit is minute, the
+         * offset must be divided by cycle number.
+         */
+        if (cycleUnit.length() > 1 && 
(StringUtils.endsWithIgnoreCase(cycleUnit, "M"))) {
+            calendar.set(Calendar.SECOND, 0);
+            int minTime = calendar.get(Calendar.MINUTE);
+
+            int leftMin = minTime % cycleNumber;
+            minTime = minTime - leftMin;
+            calendar.set(Calendar.MINUTE, minTime);
+
+            /* Calculate the offset. */
+            if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) {
+                calendar.add(Calendar.DAY_OF_YEAR, offsetNumber);
+            }
+
+            if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) {
+                calendar.add(Calendar.HOUR_OF_DAY, offsetNumber);
+            }
+        } else if (cycleUnit.length() == 1) {
+            if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
+                calendar.set(Calendar.HOUR_OF_DAY, 0);
+                calendar.set(Calendar.MINUTE, 0);
+                calendar.set(Calendar.SECOND, 0);
+            } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
+                calendar.set(Calendar.MINUTE, 0);
+                calendar.set(Calendar.SECOND, 0);
+            }
+        }
+
+        /* Calculate the offset. */
+        if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) {
+            calendar.add(Calendar.DAY_OF_YEAR, offsetNumber);
+        }
+
+        if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) {
+            calendar.add(Calendar.HOUR_OF_DAY, offsetNumber);
+        }
+
+        if (CycleUnitType.MINUTE.equals(offsetUnit)) {
+            calendar.add(Calendar.MINUTE, offsetNumber);
+        }
+
+        return calendar;
+    }
+
+    public static boolean isValidCreationTime(String dataTime, String 
cycleUnit,
+            String timeOffset) {
+        long timeInterval = 0;
+        if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
+            timeInterval = DAY_TIMEOUT_INTERVAL;
+        } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
+            timeInterval = HOUR_TIMEOUT_INTERVAL;
+        } else if (cycleUnit.endsWith(CycleUnitType.MINUTE)) {
+            timeInterval = HOUR_TIMEOUT_INTERVAL;
+        } else {
+            logger.error("cycleUnit {} can't parse!", cycleUnit);
+            timeInterval = DAY_TIMEOUT_INTERVAL;
+        }
+
+        if (timeOffset.startsWith("-")) {
+            timeInterval -= DateTransUtils.calcOffset(timeOffset);
+        } else {
+            timeInterval += DateTransUtils.calcOffset(timeOffset);
+        }
+
+        return isValidCreationTime(dataTime, timeInterval);
+    }
+
+    /*
+     * Check whether the data time is between curTime - interval and curTime + 
interval.
+     */
+    public static boolean isValidCreationTime(String dataTime, long 
timeInterval) {
+        long currentTime = System.currentTimeMillis();
 
-        for (int i = 0; i < formatLen && length > 0; ++i) {
-            if (Character.isLetter(format.charAt(i))
-                    || Character.isDigit(format.charAt(i))) {
-                length--;
+        long minTime = currentTime - timeInterval;
+        long maxTime = currentTime + timeInterval;
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat(DEFAULT_FORMAT);
+        if (dataTime.length() < DEFAULT_LENGTH) {
+            StringBuffer sb = new StringBuffer();
+            for (int i = 0; i < DEFAULT_LENGTH - dataTime.length(); i++) {
+                sb.append("0");
             }
-            sb.append(format.charAt(i));
+            dataTime = dataTime + sb.toString();
+        }
+
+        Calendar calendar = Calendar.getInstance();
+        try {
+            calendar.setTimeInMillis(dateFormat.parse(dataTime).getTime());
+        } catch (ParseException e) {
+            return false;
+        }
+
+        return calendar.getTimeInMillis() >= minTime
+                && calendar.getTimeInMillis() <= maxTime;
+    }
+
+    public static String getDateTime(String fileName, PathDateExpression 
dateExpression) {
+        if (fileName == null || dateExpression == null
+                || dateExpression.getLongestDatePattern() == null) {
+            return null;
+        }
+
+        String longestDatePattern = DateUtils
+                
.replaceDateExpressionWithRegex(dateExpression.getLongestDatePattern());
+        NonRegexPatternPosition patternPosition = 
dateExpression.getPatternPosition();
+
+        Matcher mat = Pattern.compile(longestDatePattern).matcher(fileName);
+        boolean find = mat.find();
+        // TODO : more than one part match the time regex in file name 
("/data/joox_logs/2000701106/201602170040.log"
+        // YYYYMMDDhh)
+        if (!find) {
+            logger.error("Can't find the pattern {} for file name {}", 
longestDatePattern,
+                    fileName);
+            return null;
+        }
+
+        String dateTime = fileName.substring(mat.start(), mat.end());
+        if (patternPosition == NonRegexPatternPosition.PREFIX) {
+            dateTime = dateTime.substring(1, dateTime.length());
+        } else if (patternPosition == NonRegexPatternPosition.SUFFIX) {
+            dateTime = dateTime.substring(0, dateTime.length() - 1);
+        } else if (patternPosition == NonRegexPatternPosition.BOTH) {
+            dateTime = dateTime.substring(1, dateTime.length() - 1);
+        } else if (patternPosition == NonRegexPatternPosition.END) {
+            dateTime = dateTime.substring(0, dateTime.length());
+        } else if (patternPosition == NonRegexPatternPosition.ENDSUFFIX) {
+            dateTime = dateTime.substring(1, dateTime.length());
+        } else if (patternPosition == NonRegexPatternPosition.NONE) {
+            logger.error("The data path configuration is invalid");
+            dateTime = null;
+        }
+
+        return dateTime;
+    }
+
+    public static ArrayList<MatchPoint> extractAllTimeRegex(String src) {
+        // TODO : time regex error
+        Matcher m = pattern.matcher(src);
+        ArrayList<MatchPoint> arr = new ArrayList<MatchPoint>();
+        while (m.find()) {
+            String oneMatch = m.group(0);
+            arr.add(new MatchPoint(oneMatch, m.start(), m.end()));
+        }
+        return arr;
+    }
+
+    public static String replaceDateExpressionWithRegex(String dataPath) {
+        if (dataPath == null) {
+            return null;
+        }
+
+        StringBuffer sb = new StringBuffer();
+
+        // find longest DATEPATTERN
+        ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
+
+        if (mp == null || mp.size() == 0) {
+            return dataPath;
+        }
+
+        int lastIndex = 0;
+        for (MatchPoint m : mp) {
+            lastIndex = replacePattern(dataPath, sb, m, "\\d{4}", "\\d{2}", 
"\\d{2}", "\\d{2}", "\\d{2}", lastIndex);
         }
+
+        sb.append(dataPath.substring(lastIndex));
+
         return sb.toString();
     }
 
+    private static int replacePattern(String dataPath, StringBuffer sb, 
MatchPoint m, String year, String month,
+            String day, String hour, String minute, int lastIndex) {
+        sb.append(dataPath, lastIndex, m.getStart());
+
+        String longestPattern = m.getStr();
+        int hhIndex = longestPattern.indexOf(HOUR);
+        if (hhIndex == -1) {
+            hhIndex = longestPattern.indexOf(HOUR_LOWERCASE);
+        }
+        int mmIndex = longestPattern.indexOf(MINUTE);
+        longestPattern = longestPattern.replace(YEAR, year);
+        longestPattern = longestPattern.replace(YEAR_LOWERCASE, year);
+        longestPattern = longestPattern.replace(MONTH, month);
+        longestPattern = longestPattern.replace(DAY, day);
+        longestPattern = longestPattern.replace(DAY_LOWERCASE, day);
+        longestPattern = longestPattern.replace(HOUR, hour);
+        longestPattern = longestPattern.replace(HOUR_LOWERCASE, hour);
+
+        if (hhIndex != -1 && mmIndex != -1
+                && mmIndex >= hhIndex + 2 && mmIndex < hhIndex + 4) {
+            longestPattern = longestPattern.replace(MINUTE, minute);
+        }
+
+        sb.append(longestPattern);
+        return m.getEnd();
+    }
+
+    public static String replaceDateExpression(Calendar dateTime, String 
dataPath) {
+        if (dataPath == null) {
+            return null;
+        }
+
+        String year = String.valueOf(dateTime.get(Calendar.YEAR));
+        String month = String.valueOf(dateTime.get(Calendar.MONTH) + 1);
+        String day = String.valueOf(dateTime.get(Calendar.DAY_OF_MONTH));
+        String hour = String.valueOf(dateTime.get(Calendar.HOUR_OF_DAY));
+        String minute = String.valueOf(dateTime.get(Calendar.MINUTE));
+
+        StringBuffer sb = new StringBuffer();
+
+        // find longest DATEPATTERN
+        ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
+        if (mp == null || mp.size() == 0) {
+            return dataPath;
+        }
+
+        int lastIndex = 0;
+        for (MatchPoint m : mp) {
+            lastIndex = replacePattern(dataPath, sb, m, year, 
externDate(month), externDate(day), externDate(hour),
+                    externDate(minute), lastIndex);
+        }
+
+        sb.append(dataPath.substring(lastIndex));
+
+        return sb.toString();
+    }
+
+    public static String externDate(String time) {
+        if (time.length() == 1) {
+            return "0" + time;
+        } else {
+            return time;
+        }
+    }
+
+    public static List<Long> getDateRegion(long startTime, long endTime, 
String cycleUnit) {
+        List<Long> ret = new ArrayList<Long>();
+        DateTime dtStart = DateTime.forInstant(startTime, 
TimeZone.getDefault());
+        DateTime dtEnd = DateTime.forInstant(endTime, TimeZone.getDefault());
+
+        if (cycleUnit.equals(CycleUnitType.DAY)) {
+            dtEnd = dtEnd.getEndOfDay();
+        }
+
+        int year = 0;
+        int month = 0;
+        int day = 0;
+        int hour = 0;
+        int minute = 0;
+        int second = 0;
+        if (cycleUnit.equalsIgnoreCase(CycleUnitType.DAY)) {
+            day = 1;
+        } else if (cycleUnit.equalsIgnoreCase(CycleUnitType.HOUR)) {
+            hour = 1;
+        } else if (cycleUnit.equals(CycleUnitType.MINUTE)) {
+            minute = 1;
+        } else {
+            logger.error("cycleUnit {} is error: ", cycleUnit);
+            return ret;
+        }
+        while (dtStart.lteq(dtEnd)) {
+            ret.add(dtStart.getMilliseconds(TimeZone.getDefault()));
+            dtStart = dtStart.plus(year, month, day, hour, minute, second, 0,
+                    DateTime.DayOverflow.LastDay);
+        }
+        return ret;
+    }
+
     // only return the first matched
     public static String extractLongestTimeRegex(String src)
             throws IllegalArgumentException {
@@ -124,139 +445,4 @@ public class DateUtils {
         return ((position == NonRegexPatternPosition.NONE) ? null
                 : new PathDateExpression(longestPattern, position));
     }
-
-    public void init(String timeFormat) {
-        if (timeFormat != null && !timeFormat.isEmpty()) {
-            dateFormat = timeFormat;
-        }
-    }
-
-    // 20120812010203 ---> 2012-08-12 01:02:03
-    private String normalizeDateStr(String src) {
-        src = src.replaceAll("[^a-zA-Z0-9]", "");
-        int len = src.length();
-        // if (!isTimeStrValid(src)) {
-        // return "";
-        // }
-        StringBuffer sb = new StringBuffer();
-        // year
-        sb.append(src.substring(0, 4));
-        sb.append("-");
-        if (len > 4) {
-            // month
-            sb.append(src.substring(4, 6));
-            if (len > 6) {
-                sb.append("-");
-                // day
-                sb.append(src.substring(6, 8));
-                if (len > 8) {
-                    sb.append(" ");
-                    // hour
-                    sb.append(src.substring(8, 10));
-                    if (len > 10) {
-                        sb.append(":");
-                        // minute
-                        sb.append(src.substring(10, 12));
-                        if (len > 12) {
-                            sb.append(":");
-                            // seconds
-                            sb.append(src.substring(12, 14));
-                        } else {
-                            sb.append(":00");
-                        }
-                    } else {
-                        sb.append(":00:00");
-                    }
-                } else {
-                    sb.append(" 00:00:00");
-                }
-            } else {
-                sb.append("-01 00:00:00");
-            }
-        } else {
-            sb.append("-01-01 00:00:00");
-        }
-        return sb.toString();
-    }
-
-    public String getDate(String src, String limit) {
-        if (src == null || src.trim().isEmpty()) {
-            return "";
-        }
-
-        // TODO : verify format str
-        int year = 0;
-        int month = 0;
-        int day = 0;
-        int hour = 0;
-        int minute = 0;
-        int second = 0;
-
-        // TODO : timezone
-        TimeZone tz = TimeZone.getTimeZone("GMT+8:00");
-        DateTime dt = null;
-        String outputFormat = null;
-        if (src.matches(LETTER_STR)) {
-            // format str
-            // TODO : data format verify
-            dt = DateTime.now(tz);
-            outputFormat = src;
-        } else {
-            // time str
-            src = src.replaceAll("[^0-9]", "");
-            outputFormat = getSubTimeFormat(dateFormat, src.length());
-            src = normalizeDateStr(src);
-            if (src.isEmpty()) {
-                return "";
-            }
-            dt = new DateTime(src);
-        }
-
-        // System.out.println("outputformat: " + outputFormat);
-
-        limit = limit.trim();
-        String[] limitArr = limit.split(LIMIT_SEP);
-
-        for (String onelimit : limitArr) {
-            year = 0;
-            month = 0;
-            day = 0;
-            hour = 0;
-            minute = 0;
-            second = 0;
-            // System.out.println("onelimit: " + onelimit);
-            int limitLen = onelimit.length();
-            String type = onelimit.substring(limitLen - 1, limitLen);
-            int offset = Integer.parseInt(onelimit.substring(0, limitLen - 1));
-            // System.out.println("type: " + type + ". offset: " + offset);
-            int sign = 1;
-            if (offset < 0) {
-                sign = -1;
-            } else {
-                sign = 1;
-            }
-            if (type.equalsIgnoreCase("Y")) {
-                year = sign * offset;
-            } else if (type.equals("M")) {
-                month = sign * offset;
-            } else if (type.equalsIgnoreCase("D")) {
-                day = sign * offset;
-            } else if (type.equalsIgnoreCase("h")) {
-                hour = sign * offset;
-            } else if (type.equals("m")) {
-                minute = sign * offset;
-            } else if (type.equalsIgnoreCase("s")) {
-                second = sign * offset;
-            }
-            if (sign < 0) {
-                dt = dt.minus(year, month, day, hour, minute, second, 0,
-                        DateTime.DayOverflow.LastDay);
-            } else {
-                dt = dt.plus(year, month, day, hour, minute, second, 0,
-                        DateTime.DayOverflow.LastDay);
-            }
-
-        }
-        return dt.format(outputFormat);
-    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
deleted file mode 100644
index a80f088e0d..0000000000
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.utils.regex;
-
-import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.utils.DateTransUtils;
-
-import hirondelle.date4j.DateTime;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class NewDateUtils {
-
-    public static final String FULL_FORMAT = "yyyyMMddHHmmss";
-    public static final String NULL_DATA_TIME = "000000000000";
-    public static final String DEFAULT_FORMAT = "yyyyMMddHHmm";
-    public static final String DEFAULT_TIME_ZONE = "Asia/Shanghai";
-    private static final Logger logger = 
LoggerFactory.getLogger(NewDateUtils.class);
-    private static final String TIME_REGEX = 
"YYYY(?:.MM|MM)?(?:.DD|DD)?(?:.hh|hh)?(?:.mm|mm)?(?:"
-            + ".ss|ss)?";
-    private static final String LIMIT_SEP = "(?<=[a-zA-Z])";
-    private static final String LETTER_STR = "\\D+";
-    private static final String DIGIT_STR = "[0-9]+";
-    private static final Pattern pattern = Pattern.compile(TIME_REGEX,
-            Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
-    private static final Pattern bracePatt = Pattern.compile("\\{(.*?)\\}");
-    private static final int DEFAULT_LENGTH = "yyyyMMddHHmm".length();
-    public static long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
-    public static long HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
-    // data source config error */
-    public static final String DATA_SOURCE_CONFIG_ERROR = 
"ERROR-0-INLONG_AGENT|10001|ERROR"
-            + "|ERROR_DATA_SOURCE_CONFIG|";
-
-    /* Return the time in milliseconds for a data time. */
-    /*
-     * public static long getTimeInMillis(String dataTime) { if (dataTime == 
null) { return 0; }
-     *
-     * try { SimpleDateFormat dateFormat = new 
SimpleDateFormat(DEFAULT_FORMAT); Date date = dateFormat.parse(dataTime);
-     *
-     * return date.getTime(); } catch (ParseException e) { return 0; } }
-     */
-    /* Return the format data time string from milliseconds. */
-    /*
-     * public static String getDataTimeFromTimeMillis(long dataTime) { 
SimpleDateFormat dateFormat = new
-     * SimpleDateFormat(DEFAULT_FORMAT); return dateFormat.format(new 
Date(dataTime)); }
-     */
-    /* Return the should start time for a data time log file. */
-    public static String getShouldStartTime(String dataTime, String cycleUnit,
-            String offset) {
-        if (dataTime == null || dataTime.length() > 12) {
-            return null;
-        }
-
-        SimpleDateFormat dateFormat = new SimpleDateFormat(DEFAULT_FORMAT);
-        TimeZone timeZone = 
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
-        dateFormat.setTimeZone(timeZone);
-
-        if (dataTime.length() < DEFAULT_LENGTH) {
-            StringBuffer sb = new StringBuffer();
-            for (int i = 0; i < DEFAULT_LENGTH - dataTime.length(); i++) {
-                sb.append("0");
-            }
-            dataTime = dataTime + sb.toString();
-        }
-
-        Calendar calendar = Calendar.getInstance();
-        try {
-            calendar.setTimeInMillis(dateFormat.parse(dataTime).getTime());
-        } catch (ParseException e) {
-            return null;
-        }
-
-        /*
-         * The delay should be added to the data time, so remove the - from 
offset.
-         */
-        if (offset.startsWith("-")) {
-            offset = offset.substring(1, offset.length());
-        } else { // positive,read file earlier
-            offset = "-" + offset;
-        }
-
-        return dateFormat
-                .format(new Date(getDateTime(calendar, cycleUnit, 
offset).getTimeInMillis()));
-    }
-
-    private static Calendar getDateTime(Calendar calendar, String cycleUnit, 
String offset) {
-        int cycleNumber = (cycleUnit.length() <= 1
-                ? 1
-                : Integer.parseInt(cycleUnit.substring(0, cycleUnit.length() - 
1)));
-
-        String offsetUnit = offset.substring(offset.length() - 1, 
offset.length());
-        int offsetNumber = Integer.parseInt(offset.substring(0, 
offset.length() - 1));
-
-        /*
-         * For day task, the offset cycle unit can only be day; for hourly 
task, the offset can't be minute; for
-         * minutely task, the offset cycle unit can be day, hour and minute, 
but if the offset cycle unit is minute, the
-         * offset must be divided by cycle number.
-         */
-        if (cycleUnit.length() > 1 && 
(StringUtils.endsWithIgnoreCase(cycleUnit, "M"))) {
-            calendar.set(Calendar.SECOND, 0);
-            int minTime = calendar.get(Calendar.MINUTE);
-
-            int leftMin = minTime % cycleNumber;
-            minTime = minTime - leftMin;
-            calendar.set(Calendar.MINUTE, minTime);
-
-            /* Calculate the offset. */
-            if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) {
-                calendar.add(Calendar.DAY_OF_YEAR, offsetNumber);
-            }
-
-            if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) {
-                calendar.add(Calendar.HOUR_OF_DAY, offsetNumber);
-            }
-        } else if (cycleUnit.length() == 1) {
-            if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
-                calendar.set(Calendar.HOUR_OF_DAY, 0);
-                calendar.set(Calendar.MINUTE, 0);
-                calendar.set(Calendar.SECOND, 0);
-            } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
-                calendar.set(Calendar.MINUTE, 0);
-                calendar.set(Calendar.SECOND, 0);
-            }
-        }
-
-        /* Calculate the offset. */
-        if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) {
-            calendar.add(Calendar.DAY_OF_YEAR, offsetNumber);
-        }
-
-        if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) {
-            calendar.add(Calendar.HOUR_OF_DAY, offsetNumber);
-        }
-
-        if (CycleUnitType.MINUTE.equals(offsetUnit)) {
-            calendar.add(Calendar.MINUTE, offsetNumber);
-        }
-
-        return calendar;
-    }
-
-    public static boolean isValidCreationTime(String dataTime, String 
cycleUnit,
-            String timeOffset) {
-        long timeInterval = 0;
-        if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
-            timeInterval = DAY_TIMEOUT_INTERVAL;
-        } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
-            timeInterval = HOUR_TIMEOUT_INTERVAL;
-        } else if (cycleUnit.endsWith(CycleUnitType.MINUTE)) {
-            timeInterval = HOUR_TIMEOUT_INTERVAL;
-        } else {
-            logger.error("cycleUnit {} can't parse!", cycleUnit);
-            timeInterval = DAY_TIMEOUT_INTERVAL;
-        }
-
-        if (timeOffset.startsWith("-")) {
-            timeInterval -= DateTransUtils.calcOffset(timeOffset);
-        } else {
-            timeInterval += DateTransUtils.calcOffset(timeOffset);
-        }
-
-        return isValidCreationTime(dataTime, timeInterval);
-    }
-
-    /*
-     * Check whether the data time is between curTime - interval and curTime + 
interval.
-     */
-    public static boolean isValidCreationTime(String dataTime, long 
timeInterval) {
-        long currentTime = System.currentTimeMillis();
-
-        long minTime = currentTime - timeInterval;
-        long maxTime = currentTime + timeInterval;
-
-        SimpleDateFormat dateFormat = new SimpleDateFormat(DEFAULT_FORMAT);
-        if (dataTime.length() < DEFAULT_LENGTH) {
-            StringBuffer sb = new StringBuffer();
-            for (int i = 0; i < DEFAULT_LENGTH - dataTime.length(); i++) {
-                sb.append("0");
-            }
-            dataTime = dataTime + sb.toString();
-        }
-
-        Calendar calendar = Calendar.getInstance();
-        try {
-            calendar.setTimeInMillis(dateFormat.parse(dataTime).getTime());
-        } catch (ParseException e) {
-            return false;
-        }
-
-        return calendar.getTimeInMillis() >= minTime
-                && calendar.getTimeInMillis() <= maxTime;
-    }
-
-    public static boolean isBraceContain(String dataName) {
-        Matcher matcher = bracePatt.matcher(dataName);
-        return matcher.find();
-    }
-
-    public static String getDateTime(String fileName, String dataName, 
PathDateExpression dateExpression) {
-        String dataTime = null;
-
-        if (isBraceContain(dataName)) {
-            String fullRegx = replaceDateExpressionWithRegex(dataName, 
"dataTime");
-            Pattern fullPattern = Pattern.compile(fullRegx);
-            Matcher matcher = fullPattern.matcher(fileName);
-            if (matcher.find()) {
-                dataTime = matcher.group("dataTime");
-            }
-        } else {
-            dataTime = getDateTime(fileName, dateExpression);
-        }
-        return dataTime;
-    }
-
-    public static String getDateTime(String fileName, PathDateExpression 
dateExpression) {
-        if (fileName == null || dateExpression == null
-                || dateExpression.getLongestDatePattern() == null) {
-            return null;
-        }
-
-        String longestDatePattern = NewDateUtils
-                
.replaceDateExpressionWithRegex(dateExpression.getLongestDatePattern());
-        NonRegexPatternPosition patternPosition = 
dateExpression.getPatternPosition();
-
-        Matcher mat = Pattern.compile(longestDatePattern).matcher(fileName);
-        boolean find = mat.find();
-        // TODO : more than one part match the time regex in file name 
("/data/joox_logs/2000701106/201602170040.log"
-        // YYYYMMDDhh)
-        if (!find) {
-            logger.error("Can't find the pattern {} for file name {}", 
longestDatePattern,
-                    fileName);
-            return null;
-        }
-
-        String dateTime = fileName.substring(mat.start(), mat.end());
-        if (patternPosition == NonRegexPatternPosition.PREFIX) {
-            dateTime = dateTime.substring(1, dateTime.length());
-        } else if (patternPosition == NonRegexPatternPosition.SUFFIX) {
-            dateTime = dateTime.substring(0, dateTime.length() - 1);
-        } else if (patternPosition == NonRegexPatternPosition.BOTH) {
-            dateTime = dateTime.substring(1, dateTime.length() - 1);
-        } else if (patternPosition == NonRegexPatternPosition.END) {
-            dateTime = dateTime.substring(0, dateTime.length());
-        } else if (patternPosition == NonRegexPatternPosition.ENDSUFFIX) {
-            dateTime = dateTime.substring(1, dateTime.length());
-        } else if (patternPosition == NonRegexPatternPosition.NONE) {
-            logger.error("The data path configuration is invalid");
-            dateTime = null;
-        }
-
-        return dateTime;
-    }
-
-    public static ArrayList<MatchPoint> extractAllTimeRegex(String src) {
-        // TODO : time regex error
-        Matcher m = pattern.matcher(src);
-        ArrayList<MatchPoint> arr = new ArrayList<MatchPoint>();
-        while (m.find()) {
-            String oneMatch = m.group(0);
-            arr.add(new MatchPoint(oneMatch, m.start(), m.end()));
-        }
-        return arr;
-    }
-
-    public static String replaceDateExpressionWithRegex(String dataPath) {
-        if (dataPath == null) {
-            return null;
-        }
-        StringBuffer sb = new StringBuffer();
-
-        // find longest DATEPATTERN
-        ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
-
-        if (mp == null || mp.size() == 0) {
-            return dataPath;
-        }
-
-        int lastIndex = 0;
-        for (MatchPoint m : mp) {
-            sb.append(dataPath.substring(lastIndex, m.getStart()));
-
-            String longestPattern = m.getStr();
-            int hhIndex = longestPattern.indexOf("hh");
-            int mmIndex = longestPattern.indexOf("mm");
-            longestPattern = longestPattern.replace("YYYY", "\\d{4}");
-            longestPattern = longestPattern.replace("MM", "\\d{2}");
-            longestPattern = longestPattern.replace("DD", "\\d{2}");
-            longestPattern = longestPattern.replace("hh", "\\d{2}");
-
-            if (hhIndex != -1 && mmIndex != -1
-                    && mmIndex >= hhIndex + 2 && mmIndex < hhIndex + 4) {
-                longestPattern = longestPattern.replace("mm", "\\d{2}");
-            }
-            sb.append(longestPattern);
-            lastIndex = m.getEnd();
-        }
-
-        sb.append(dataPath.substring(lastIndex));
-
-        return sb.toString();
-    }
-
-    public static String replaceDateExpressionWithRegex(String dataPath, 
String dateTimeGroupName) {
-        if (dataPath == null) {
-            return null;
-        }
-
-        // \\d{4}\\d{2}\\d{2}\\d{2} --> (?<GroupName>\\d{4}\\d{2}\\d{2}\\d{2})
-        if (isBraceContain(dataPath)) {
-            StringBuilder sb = new StringBuilder();
-            sb.append(dataPath.substring(0, dataPath.indexOf('{')));
-            sb.append("(?<").append(dateTimeGroupName).append('>');
-            sb.append(dataPath.substring(dataPath.indexOf('{') + 1, 
dataPath.indexOf('}')));
-            sb.append(')').append(dataPath.substring(dataPath.indexOf('}') + 
1));
-            dataPath = sb.toString();
-        }
-
-        StringBuffer sb = new StringBuffer();
-
-        // find longest DATEPATTERN
-        ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
-
-        if (mp == null || mp.size() == 0) {
-            return dataPath;
-        }
-
-        int lastIndex = 0;
-        for (int i = 0; i < mp.size(); i++) {
-            MatchPoint m = mp.get(i);
-            sb.append(dataPath.substring(lastIndex, m.getStart()));
-
-            String longestPattern = m.getStr();
-            int hhIndex = longestPattern.indexOf("hh");
-            int mmIndex = longestPattern.indexOf("mm");
-            longestPattern = longestPattern.replace("YYYY", "\\d{4}");
-            longestPattern = longestPattern.replace("MM", "\\d{2}");
-            longestPattern = longestPattern.replace("DD", "\\d{2}");
-            longestPattern = longestPattern.replace("hh", "\\d{2}");
-
-            if (hhIndex != -1 && mmIndex != -1
-                    && mmIndex >= hhIndex + 2 && mmIndex < hhIndex + 4) {
-                longestPattern = longestPattern.replace("mm", "\\d{2}");
-            }
-
-            sb.append(longestPattern);
-            lastIndex = m.getEnd();
-        }
-
-        sb.append(dataPath.substring(lastIndex));
-
-        return sb.toString();
-    }
-
-    public static String replaceDateExpression(Calendar dateTime, String 
dataPath) {
-        if (dataPath == null) {
-            return null;
-        }
-
-        String year = String.valueOf(dateTime.get(Calendar.YEAR));
-        String month = String.valueOf(dateTime.get(Calendar.MONTH) + 1);
-        String day = String.valueOf(dateTime.get(Calendar.DAY_OF_MONTH));
-        String hour = String.valueOf(dateTime.get(Calendar.HOUR_OF_DAY));
-        String minute = String.valueOf(dateTime.get(Calendar.MINUTE));
-
-        StringBuffer sb = new StringBuffer();
-
-        // find longest DATEPATTERN
-        ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
-        if (mp == null || mp.size() == 0) {
-            return dataPath;
-        }
-
-        int lastIndex = 0;
-        for (MatchPoint m : mp) {
-            sb.append(dataPath.substring(lastIndex, m.getStart()));
-
-            String longestPattern = m.getStr();
-            int hhIndex = longestPattern.indexOf("hh");
-            int mmIndex = longestPattern.indexOf("mm");
-
-            longestPattern = longestPattern.replaceAll("YYYY", year);
-            longestPattern = longestPattern.replaceAll("MM", 
externDate(month));
-            longestPattern = longestPattern.replaceAll("DD", externDate(day));
-            longestPattern = longestPattern.replaceAll("hh", externDate(hour));
-
-            if (hhIndex != -1 && mmIndex != -1 && mmIndex >= hhIndex + 2
-                    && mmIndex < hhIndex + 4) {
-                longestPattern = longestPattern.replaceAll("mm", 
externDate(minute));
-            }
-
-            sb.append(longestPattern);
-            lastIndex = m.getEnd();
-        }
-
-        sb.append(dataPath.substring(lastIndex));
-
-        return sb.toString();
-    }
-
-    public static String externDate(String time) {
-        if (time.length() == 1) {
-            return "0" + time;
-        } else {
-            return time;
-        }
-    }
-
-    public static List<Long> getDateRegion(long startTime, long endTime, 
String cycleUnit) {
-        List<Long> ret = new ArrayList<Long>();
-        DateTime dtStart = DateTime.forInstant(startTime, 
TimeZone.getDefault());
-        DateTime dtEnd = DateTime.forInstant(endTime, TimeZone.getDefault());
-
-        if (cycleUnit.equals(CycleUnitType.DAY)) {
-            dtEnd = dtEnd.getEndOfDay();
-        }
-
-        int year = 0;
-        int month = 0;
-        int day = 0;
-        int hour = 0;
-        int minute = 0;
-        int second = 0;
-        if (cycleUnit.equalsIgnoreCase(CycleUnitType.DAY)) {
-            day = 1;
-        } else if (cycleUnit.equalsIgnoreCase(CycleUnitType.HOUR)) {
-            hour = 1;
-        } else if (cycleUnit.equals(CycleUnitType.MINUTE)) {
-            minute = 1;
-        } else {
-            logger.error("cycleUnit {} is error: ", cycleUnit);
-            return ret;
-        }
-        while (dtStart.lteq(dtEnd)) {
-            ret.add(dtStart.getMilliseconds(TimeZone.getDefault()));
-            dtStart = dtStart.plus(year, month, day, hour, minute, second, 0,
-                    DateTime.DayOverflow.LastDay);
-        }
-        return ret;
-    }
-}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
index b3153e81fe..ecddccce9f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
@@ -59,13 +59,13 @@ public class Scanner {
         String strStartTime = 
DateTransUtils.millSecConvertToTimeStr(range.startTime, cycleUnit);
         String strEndTime = 
DateTransUtils.millSecConvertToTimeStr(range.endTime, cycleUnit);
         LOGGER.info("{} scan time is between {} and {}", originPattern, 
strStartTime, strEndTime);
-        List<Long> dateRegion = NewDateUtils.getDateRegion(range.startTime, 
range.endTime, cycleUnit);
+        List<Long> dateRegion = DateUtils.getDateRegion(range.startTime, 
range.endTime, cycleUnit);
         List<FinalPatternInfo> finalPatternList = new ArrayList<>();
         for (Long time : dateRegion) {
             Calendar calendar = Calendar.getInstance();
             calendar.setTimeInMillis(time);
             FinalPatternInfo finalPatternInfo = new FinalPatternInfo(
-                    NewDateUtils.replaceDateExpression(calendar, 
originPattern), time);
+                    DateUtils.replaceDateExpression(calendar, originPattern), 
time);
             finalPatternList.add(finalPatternInfo);
         }
         return finalPatternList;
@@ -75,7 +75,7 @@ public class Scanner {
             boolean isRetry) {
         TimeRange range = getTimeRange(startTime, endTime, cycleUnit, 
timeOffset, isRetry);
         List<String> dataTimeList = new ArrayList<>();
-        List<Long> dateRegion = NewDateUtils.getDateRegion(range.startTime, 
range.endTime, cycleUnit);
+        List<Long> dateRegion = DateUtils.getDateRegion(range.startTime, 
range.endTime, cycleUnit);
         for (Long time : dateRegion) {
             String dataTime = DateTransUtils.millSecConvertToTimeStr(time, 
cycleUnit);
             dataTimeList.add(dataTime);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 214a29b24e..2506191ca6 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.FetcherConstants;
 import org.apache.inlong.agent.pojo.COSTask.COSTaskConfig;
 import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
+import org.apache.inlong.agent.pojo.SQLTask.SQLTaskConfig;
 import org.apache.inlong.common.enums.TaskStateEnum;
 import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.agent.DataConfig;
@@ -164,4 +165,36 @@ public class AgentBaseTestsHelper {
         return dataConfig;
     }
 
+    public TaskProfile getSQLTaskProfile(int taskId, String sql, String 
contentStyle, boolean retry,
+            String startTime, String endTime, TaskStateEnum state, String 
cycleUnit, String timeZone) {
+        DataConfig dataConfig = getSQLDataConfig(taskId, sql, contentStyle, 
retry, startTime, endTime,
+                state, cycleUnit, timeZone);
+        TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
+        return profile;
+    }
+
+    private DataConfig getSQLDataConfig(int taskId, String sql, String 
contentStyle, boolean retry,
+            String startTime, String endTime, TaskStateEnum state, String 
cycleUnit, String timeZone) {
+        DataConfig dataConfig = new DataConfig();
+        dataConfig.setInlongGroupId("testGroupId");
+        dataConfig.setInlongStreamId("testStreamId");
+        dataConfig.setDataReportType(1);
+        dataConfig.setTaskType(TaskTypeEnum.SQL.getType());
+        dataConfig.setTaskId(taskId);
+        dataConfig.setTimeZone(timeZone);
+        dataConfig.setState(state.ordinal());
+        SQLTaskConfig sqlTaskConfig = new SQLTaskConfig();
+        sqlTaskConfig.setUsername("testUserName");
+        sqlTaskConfig.setJdbcPassword("testPassword");
+        sqlTaskConfig.setSql(sql);
+        sqlTaskConfig.setTimeOffset("0d");
+        // GMT-8:00 same with Asia/Shanghai
+        sqlTaskConfig.setMaxInstanceCount(100);
+        sqlTaskConfig.setCycleUnit(cycleUnit);
+        sqlTaskConfig.setRetry(retry);
+        sqlTaskConfig.setDataTimeFrom(startTime);
+        sqlTaskConfig.setDataTimeTo(endTime);
+        dataConfig.setExtParams(GSON.toJson(sqlTaskConfig));
+        return dataConfig;
+    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
index 4d3bc0508d..8facaa32ec 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
@@ -30,6 +30,7 @@ import com.qcloud.cos.COSClient;
 import com.qcloud.cos.model.COSObjectSummary;
 import com.qcloud.cos.model.ListObjectsRequest;
 import com.qcloud.cos.model.ObjectListing;
+import com.qcloud.cos.model.ObjectMetadata;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.List;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -74,6 +76,11 @@ public class TestCOSTask {
         helper = new 
AgentBaseTestsHelper(TestCOSTask.class.getName()).setupAgentHome();
         manager = new TaskManager();
         cosClient = Mockito.mock(COSClient.class);
+        when(cosClient.getObjectMetadata(Mockito.anyString(), 
Mockito.anyString())).thenAnswer(mock -> {
+            ObjectMetadata metadata = new ObjectMetadata();
+            metadata.setLastModified(new Date());
+            return metadata;
+        });
         PowerMockito.mockStatic(COSUtils.class);
         Mockito.when(COSUtils.createCli(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString()))
                 .thenReturn(cosClient);
@@ -88,7 +95,8 @@ public class TestCOSTask {
         ObjectListing objectListing1_1 = Mockito.mock(ObjectListing.class);
         when(objectListing1_1.getCommonPrefixes()).thenReturn(
                 Arrays.asList("some/20230928_0/", "some/20230928_1/", 
"some/20230928_aaa/"));
-        
when(objectListing1_1.getObjectSummaries()).thenReturn(getSummaries(Arrays.asList("some/20230928_test_0.txt")));
+        when(objectListing1_1.getObjectSummaries()).thenReturn(
+                getSummaries(Arrays.asList("some/20230928_test_0.txt")));
 
         ObjectListing objectListing1_2 = Mockito.mock(ObjectListing.class);
         when(objectListing1_2.getCommonPrefixes()).thenReturn(
@@ -185,13 +193,27 @@ public class TestCOSTask {
     public void testScan() {
         mockDay(cosClient);
         doTest(1, "some/YYYYMMDD_[0-9]+/test_[0-9]+.txt", CycleUnitType.DAY,
-                Arrays.asList("some/20230928_0/test_0.txt", 
"some/20230928_0/test_1.txt", "some/20230929_1/test_0.txt",
+                Arrays.asList("some/20230928_0/test_0.txt", 
"some/20230928_0/test_1.txt",
+                        "some/20230929_1/test_0.txt",
+                        "some/20230929_1/test_1.txt"),
+                Arrays.asList("20230928", "20230928", "20230929", "20230929"),
+                "20230928",
+                "20230930");
+        doTest(2, "some/yyyyMMdd_[0-9]+/test_[0-9]+.txt", CycleUnitType.DAY,
+                Arrays.asList("some/20230928_0/test_0.txt", 
"some/20230928_0/test_1.txt",
+                        "some/20230929_1/test_0.txt",
                         "some/20230929_1/test_1.txt"),
                 Arrays.asList("20230928", "20230928", "20230929", "20230929"),
                 "20230928",
                 "20230930");
         mockHour(cosClient);
-        doTest(2, "some/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt", CycleUnitType.HOUR,
+        doTest(3, "some/YYYYMMDDHH_[0-9]+/test_[0-9]+.txt", CycleUnitType.HOUR,
+                Arrays.asList("some/2023092800_0/test_0.txt", 
"some/2023092800_0/test_1.txt",
+                        "some/2023092901_1/test_0.txt",
+                        "some/2023092901_1/test_1.txt"),
+                Arrays.asList("2023092800", "2023092800", "2023092901", 
"2023092901"), "2023092800",
+                "2023093023");
+        doTest(4, "some/yyyyMMddhh_[0-9]+/test_[0-9]+.txt", CycleUnitType.HOUR,
                 Arrays.asList("some/2023092800_0/test_0.txt", 
"some/2023092800_0/test_1.txt",
                         "some/2023092901_1/test_0.txt",
                         "some/2023092901_1/test_1.txt"),
@@ -213,7 +235,8 @@ public class TestCOSTask {
                 fileName.add(invocation.getArgument(0));
                 dataTime.add(invocation.getArgument(1));
                 return null;
-            }).when(fileTask, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString());
+            }).when(fileTask, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(),
+                    Mockito.anyString());
             Assert.assertTrue(fileTask.isProfileValid(taskProfile));
             manager.getTaskStore().storeTask(taskProfile);
             fileTask.init(manager, taskProfile, 
manager.getInstanceBasicStore());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestFileTask.java
similarity index 78%
rename from 
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
rename to 
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestFileTask.java
index d2c8e8a342..f5eda88e10 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestFileTask.java
@@ -51,10 +51,10 @@ import static org.awaitility.Awaitility.await;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(FileTask.class)
 @PowerMockIgnore({"javax.management.*"})
-public class TestLogFileTask {
+public class TestFileTask {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestLogFileTask.class);
-    private static final ClassLoader LOADER = 
TestLogFileTask.class.getClassLoader();
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestFileTask.class);
+    private static final ClassLoader LOADER = 
TestFileTask.class.getClassLoader();
     private static AgentBaseTestsHelper helper;
     private static TaskManager manager;
     private static String resourceParentPath;
@@ -66,7 +66,7 @@ public class TestLogFileTask {
 
     @BeforeClass
     public static void setup() throws Exception {
-        helper = new 
AgentBaseTestsHelper(TestLogFileTask.class.getName()).setupAgentHome();
+        helper = new 
AgentBaseTestsHelper(TestFileTask.class.getName()).setupAgentHome();
         resourceParentPath = new 
File(LOADER.getResource("testScan/temp.txt").getPath()).getParent();
         manager = new TaskManager();
     }
@@ -82,14 +82,31 @@ public class TestLogFileTask {
                 resourceParentPath + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt", 
CycleUnitType.DAY, Arrays.asList("20230928"),
                 "20230928", "20230930");
         doTest(2, Arrays.asList("testScan/2023092810_1/test_1.txt"),
-                resourceParentPath + "/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt",
+                resourceParentPath + "/YYYYMMDDHH_[0-9]+/test_[0-9]+.txt",
                 CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023092800", 
"2023093023");
         doTest(3, Arrays.asList("testScan/202309281030_1/test_1.txt", 
"testScan/202309301059_1/test_1.txt"),
-                resourceParentPath + "/YYYYMMDDhhmm_[0-9]+/test_[0-9]+.txt",
+                resourceParentPath + "/YYYYMMDDHHmm_[0-9]+/test_[0-9]+.txt",
                 CycleUnitType.MINUTE, Arrays.asList("202309281030", 
"202309301059"), "202309280000",
                 "202309302300");
         doTest(4, Arrays.asList("testScan/20241030/23/59.txt"),
-                resourceParentPath + "/YYYYMMDD/hh/mm.txt",
+                resourceParentPath + "/YYYYMMDD/HH/mm.txt",
+                CycleUnitType.MINUTE, Arrays.asList("202410302359"), 
"202410300000", "202410310000");
+    }
+
+    @Test
+    public void testScanLowercase() throws Exception {
+        doTest(1, Arrays.asList("testScan/20230928_1/test_1.txt"),
+                resourceParentPath + "/yyyyMMdd_[0-9]+/test_[0-9]+.txt", 
CycleUnitType.DAY, Arrays.asList("20230928"),
+                "20230928", "20230930");
+        doTest(2, Arrays.asList("testScan/2023092810_1/test_1.txt"),
+                resourceParentPath + "/yyyyMMddhh_[0-9]+/test_[0-9]+.txt",
+                CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023092800", 
"2023093023");
+        doTest(3, Arrays.asList("testScan/202309281030_1/test_1.txt", 
"testScan/202309301059_1/test_1.txt"),
+                resourceParentPath + "/yyyyMMddhhmm_[0-9]+/test_[0-9]+.txt",
+                CycleUnitType.MINUTE, Arrays.asList("202309281030", 
"202309301059"), "202309280000",
+                "202309302300");
+        doTest(4, Arrays.asList("testScan/20241030/23/59.txt"),
+                resourceParentPath + "/yyyyMMdd/hh/mm.txt",
                 CycleUnitType.MINUTE, Arrays.asList("202410302359"), 
"202410300000", "202410310000");
     }
 
@@ -112,7 +129,8 @@ public class TestLogFileTask {
                 fileName.add(invocation.getArgument(0));
                 dataTime.add(invocation.getArgument(1));
                 return null;
-            }).when(dayTask, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString());
+            }).when(dayTask, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(),
+                    Mockito.anyString());
             Assert.assertTrue(dayTask.isProfileValid(taskProfile));
             manager.getTaskStore().storeTask(taskProfile);
             dayTask.init(manager, taskProfile, 
manager.getInstanceBasicStore());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index a1f13122e3..9dd8ea88e0 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.agent.plugin.utils;
 
-import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
 import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
 import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.metric.MetricRegister;
@@ -125,7 +125,7 @@ public class TestUtils {
         Calendar calendar = Calendar.getInstance();
         Long dataTime = DateTransUtils.timeStrConvertToMillSec("202406251007", 
"m");
         calendar.setTimeInMillis(dataTime);
-        Assert.assertEquals(NewDateUtils.replaceDateExpression(calendar, src), 
dst);
+        Assert.assertEquals(DateUtils.replaceDateExpression(calendar, src), 
dst);
     }
 
     private void testCutDirectoryByWildcard(String src, List<String> dst) {

Reply via email to