This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b206b830a1 [INLONG-10012][Agent] Adjust task encapsulation to place 
common logicin the base class (#10013)
b206b830a1 is described below

commit b206b830a1eee70c966628352e31b87c9a20a231
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Apr 19 09:42:03 2024 +0800

    [INLONG-10012][Agent] Adjust task encapsulation to place common logicin the 
base class (#10013)
---
 .../task/{KafkaTask.java => AbstractTask.java}     | 121 ++++++++----------
 .../apache/inlong/agent/plugin/task/KafkaTask.java | 128 +++----------------
 .../inlong/agent/plugin/task/MongoDBTask.java      | 121 ++----------------
 .../inlong/agent/plugin/task/PulsarTask.java       | 121 ++----------------
 .../inlong/agent/plugin/task/file/LogFileTask.java | 140 ++++++---------------
 5 files changed, 130 insertions(+), 501 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
similarity index 51%
copy from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
copy to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index 78ad200db0..56a786c6e5 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -19,7 +19,7 @@ package org.apache.inlong.agent.plugin.task;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.core.instance.ActionType;
 import org.apache.inlong.agent.core.instance.InstanceAction;
 import org.apache.inlong.agent.core.instance.InstanceManager;
@@ -34,51 +34,40 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
+import java.util.List;
 
-import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_TOPIC;
+public abstract class AbstractTask extends Task {
 
-public class KafkaTask extends Task {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaTask.class);
-    public static final String DEFAULT_KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
-    public static final int CORE_THREAD_SLEEP_TIME = 5000;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractTask.class);
+    public static final int CORE_THREAD_SLEEP_TIME = 1000;
     public static final int CORE_THREAD_PRINT_TIME = 10000;
-
-    private TaskProfile taskProfile;
-    private Db basicDb;
-    private TaskManager taskManager;
+    protected TaskProfile taskProfile;
+    protected Db basicDb;
+    protected TaskManager taskManager;
     private InstanceManager instanceManager;
-    private long lastPrintTime = 0;
-    private boolean initOK = false;
-    private volatile boolean running = false;
-    private boolean isAdded = false;
-    private boolean isRestoreFromDB = false;
-
-    private String topic;
+    protected volatile boolean running = false;
+    protected boolean initOK = false;
+    protected long lastPrintTime = 0;
 
     @Override
     public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
         taskManager = (TaskManager) srcManager;
-        commonInit(taskProfile, basicDb);
-        initOK = true;
-    }
-
-    private void commonInit(TaskProfile taskProfile, Db basicDb) {
-        LOGGER.info("kafka commonInit: {}", taskProfile.toJsonStr());
         this.taskProfile = taskProfile;
         this.basicDb = basicDb;
-        this.topic = taskProfile.get(TASK_KAFKA_TOPIC);
-        this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
-        instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
+        instanceManager = new InstanceManager(taskProfile.getTaskId(), 
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
                 basicDb, taskManager.getTaskDb());
         try {
             instanceManager.start();
         } catch (Exception e) {
             LOGGER.error("start instance manager error: ", e);
         }
+        initTask();
+        initOK = true;
+    }
+
+    protected abstract void initTask();
+
+    protected void releaseTask() {
     }
 
     @Override
@@ -87,6 +76,7 @@ public class KafkaTask extends Task {
         if (instanceManager != null) {
             instanceManager.stop();
         }
+        releaseTask();
     }
 
     @Override
@@ -102,16 +92,6 @@ public class KafkaTask extends Task {
         return taskProfile.getTaskId();
     }
 
-    @Override
-    public boolean isProfileValid(TaskProfile profile) {
-        if (!profile.allRequiredKeyExist()) {
-            LOGGER.error("task profile needs all required key");
-            return false;
-        }
-
-        return true;
-    }
-
     @Override
     public void addCallbacks() {
 
@@ -119,7 +99,7 @@ public class KafkaTask extends Task {
 
     @Override
     public void run() {
-        Thread.currentThread().setName("kafka-task-core-" + getTaskId());
+        Thread.currentThread().setName("task-core-" + getTaskId());
         running = true;
         try {
             doRun();
@@ -129,40 +109,49 @@ public class KafkaTask extends Task {
         running = false;
     }
 
-    private void doRun() {
+    protected void doRun() {
         while (!isFinished()) {
-            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
-                LOGGER.info("kafka task running! taskId {}", getTaskId());
-                lastPrintTime = AgentUtils.getCurrentTime();
-            }
+            taskPrint();
             AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
             if (!initOK) {
                 continue;
             }
+            List<InstanceProfile> profileList = getNewInstanceList();
+            for (InstanceProfile profile : profileList) {
+                InstanceAction action = new InstanceAction(ActionType.ADD, 
profile);
+                while (!isFinished() && !instanceManager.submitAction(action)) 
{
+                    LOGGER.error("instance manager action queue is full: 
taskId {}", getTaskId());
+                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+                }
+            }
+            taskHeartbeat();
+        }
+    }
 
-            // Add instance profile to instance manager
-            addInstanceProfile();
+    protected abstract List<InstanceProfile> getNewInstanceList();
+
+    protected void taskHeartbeat() {
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
taskProfile.getInlongGroupId(),
+                taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(), 
1, 1);
 
-            String inlongGroupId = taskProfile.getInlongGroupId();
-            String inlongStreamId = taskProfile.getInlongStreamId();
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
inlongGroupId, inlongStreamId,
-                    AgentUtils.getCurrentTime(), 1, 1);
-        }
     }
 
-    private void addInstanceProfile() {
-        if (isAdded) {
-            return;
-        }
-        String dataTime = 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
-                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
-        LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
-        InstanceAction action = new InstanceAction(ActionType.ADD, 
instanceProfile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
-            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+    protected void taskPrint() {
+        if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
+            LOGGER.info("task running! taskId {}", getTaskId());
+            lastPrintTime = AgentUtils.getCurrentTime();
         }
-        this.isAdded = true;
+    }
+
+    protected boolean allInstanceFinished() {
+        return instanceManager.allInstanceFinished();
+    }
+
+    protected boolean shouldAddAgain(String fileName, long lastModifyTime) {
+        return instanceManager.shouldAddAgain(fileName, lastModifyTime);
+    }
+
+    protected boolean isFull() {
+        return instanceManager.isFull();
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
index 78ad200db0..f83104e8a7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
@@ -20,86 +20,45 @@ package org.apache.inlong.agent.plugin.task;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
-import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.file.Task;
-import org.apache.inlong.agent.state.State;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
 
-import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_TOPIC;
 
-public class KafkaTask extends Task {
+public class KafkaTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaTask.class);
     public static final String DEFAULT_KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
-    public static final int CORE_THREAD_SLEEP_TIME = 5000;
-    public static final int CORE_THREAD_PRINT_TIME = 10000;
-
-    private TaskProfile taskProfile;
-    private Db basicDb;
-    private TaskManager taskManager;
-    private InstanceManager instanceManager;
-    private long lastPrintTime = 0;
-    private boolean initOK = false;
-    private volatile boolean running = false;
     private boolean isAdded = false;
-    private boolean isRestoreFromDB = false;
-
     private String topic;
+    private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
 
     @Override
-    public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
-        taskManager = (TaskManager) srcManager;
-        commonInit(taskProfile, basicDb);
-        initOK = true;
-    }
-
-    private void commonInit(TaskProfile taskProfile, Db basicDb) {
+    protected void initTask() {
         LOGGER.info("kafka commonInit: {}", taskProfile.toJsonStr());
-        this.taskProfile = taskProfile;
-        this.basicDb = basicDb;
         this.topic = taskProfile.get(TASK_KAFKA_TOPIC);
-        this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
-        instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
-                basicDb, taskManager.getTaskDb());
-        try {
-            instanceManager.start();
-        } catch (Exception e) {
-            LOGGER.error("start instance manager error: ", e);
-        }
-    }
-
-    @Override
-    public void destroy() {
-        doChangeState(State.SUCCEEDED);
-        if (instanceManager != null) {
-            instanceManager.stop();
-        }
-    }
-
-    @Override
-    public TaskProfile getProfile() {
-        return taskProfile;
     }
 
     @Override
-    public String getTaskId() {
-        if (taskProfile == null) {
-            return "";
+    protected List<InstanceProfile> getNewInstanceList() {
+        List<InstanceProfile> list = new ArrayList<>();
+        if (isAdded) {
+            return list;
         }
-        return taskProfile.getTaskId();
+        String dataTime = LocalDateTime.now().format(dateTimeFormatter);
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
+                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
+        list.add(instanceProfile);
+        this.isAdded = true;
+        return list;
     }
 
     @Override
@@ -108,61 +67,6 @@ public class KafkaTask extends Task {
             LOGGER.error("task profile needs all required key");
             return false;
         }
-
         return true;
     }
-
-    @Override
-    public void addCallbacks() {
-
-    }
-
-    @Override
-    public void run() {
-        Thread.currentThread().setName("kafka-task-core-" + getTaskId());
-        running = true;
-        try {
-            doRun();
-        } catch (Throwable e) {
-            LOGGER.error("do run error: ", e);
-        }
-        running = false;
-    }
-
-    private void doRun() {
-        while (!isFinished()) {
-            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
-                LOGGER.info("kafka task running! taskId {}", getTaskId());
-                lastPrintTime = AgentUtils.getCurrentTime();
-            }
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-            if (!initOK) {
-                continue;
-            }
-
-            // Add instance profile to instance manager
-            addInstanceProfile();
-
-            String inlongGroupId = taskProfile.getInlongGroupId();
-            String inlongStreamId = taskProfile.getInlongStreamId();
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
inlongGroupId, inlongStreamId,
-                    AgentUtils.getCurrentTime(), 1, 1);
-        }
-    }
-
-    private void addInstanceProfile() {
-        if (isAdded) {
-            return;
-        }
-        String dataTime = 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
-                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
-        LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
-        InstanceAction action = new InstanceAction(ActionType.ADD, 
instanceProfile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
-            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-        }
-        this.isAdded = true;
-    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
index 6081cc388c..2a022ddb78 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
@@ -21,87 +21,28 @@ import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
-import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.file.Task;
-import org.apache.inlong.agent.state.State;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
 
-import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
-
-public class MongoDBTask extends Task {
+public class MongoDBTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MongoDBTask.class);
     public static final String DEFAULT_MONGODB_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
-    public static final int CORE_THREAD_SLEEP_TIME = 5000;
-    public static final int CORE_THREAD_PRINT_TIME = 10000;
-
-    private TaskProfile taskProfile;
-    private Db basicDb;
-    private TaskManager taskManager;
-    private InstanceManager instanceManager;
-    private long lastPrintTime = 0;
-    private boolean initOK = false;
-    private volatile boolean running = false;
     private boolean isAdded = false;
-    private boolean isRestoreFromDB = false;
-
-    private String database;
     private String collection;
+    private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
 
     @Override
-    public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
-        taskManager = (TaskManager) srcManager;
-        commonInit(taskProfile, basicDb);
-        initOK = true;
-    }
-
-    private void commonInit(TaskProfile taskProfile, Db basicDb) {
+    protected void initTask() {
         LOGGER.info("mongoDB commonInit: {}", taskProfile.toJsonStr());
-        this.taskProfile = taskProfile;
-        this.basicDb = basicDb;
-        this.database = 
taskProfile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST);
         this.collection = 
taskProfile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST);
-        this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
-        instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
-                basicDb, taskManager.getTaskDb());
-        try {
-            instanceManager.start();
-        } catch (Exception e) {
-            LOGGER.error("start instance manager error: ", e);
-        }
-    }
-
-    @Override
-    public void destroy() {
-        doChangeState(State.SUCCEEDED);
-        if (instanceManager != null) {
-            instanceManager.stop();
-        }
-    }
-
-    @Override
-    public TaskProfile getProfile() {
-        return taskProfile;
-    }
-
-    @Override
-    public String getTaskId() {
-        if (taskProfile == null) {
-            return "";
-        }
-        return taskProfile.getTaskId();
     }
 
     @Override
@@ -110,61 +51,21 @@ public class MongoDBTask extends Task {
             LOGGER.error("task profile needs all required key");
             return false;
         }
-
         return true;
     }
 
     @Override
-    public void addCallbacks() {
-
-    }
-
-    @Override
-    public void run() {
-        Thread.currentThread().setName("mongoDB-task-core-" + getTaskId());
-        running = true;
-        try {
-            doRun();
-        } catch (Throwable e) {
-            LOGGER.error("do run error: ", e);
-        }
-        running = false;
-    }
-
-    private void doRun() {
-        while (!isFinished()) {
-            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
-                LOGGER.info("mongoDB task running! taskId {}", getTaskId());
-                lastPrintTime = AgentUtils.getCurrentTime();
-            }
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-            if (!initOK) {
-                continue;
-            }
-
-            // Add instance profile to instance manager
-            addInstanceProfile();
-
-            String inlongGroupId = taskProfile.getInlongGroupId();
-            String inlongStreamId = taskProfile.getInlongStreamId();
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
inlongGroupId, inlongStreamId,
-                    AgentUtils.getCurrentTime(), 1, 1);
-        }
-    }
-
-    private void addInstanceProfile() {
+    protected List<InstanceProfile> getNewInstanceList() {
+        List<InstanceProfile> list = new ArrayList<>();
         if (isAdded) {
-            return;
+            return list;
         }
-        String dataTime = 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+        String dataTime = LocalDateTime.now().format(dateTimeFormatter);
         InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_MONGODB_INSTANCE, collection,
                 CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
         LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
-        InstanceAction action = new InstanceAction(ActionType.ADD, 
instanceProfile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
-            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-        }
+        list.add(instanceProfile);
         this.isAdded = true;
+        return list;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
index 1f5ce56c59..a105c45774 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
@@ -20,95 +20,38 @@ package org.apache.inlong.agent.plugin.task;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
-import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.file.Task;
-import org.apache.inlong.agent.state.State;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
 
-import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
 import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_NAMESPACE;
 import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_TENANT;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_TOPIC;
 
-public class PulsarTask extends Task {
+public class PulsarTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarTask.class);
     public static final String DEFAULT_PULSAR_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
-    public static final int CORE_THREAD_SLEEP_TIME = 5000;
-    public static final int CORE_THREAD_PRINT_TIME = 10000;
-
-    private TaskProfile taskProfile;
-    private Db basicDb;
-    private TaskManager taskManager;
-    private InstanceManager instanceManager;
-    private long lastPrintTime = 0;
-    private boolean initOK = false;
-    private volatile boolean running = false;
     private boolean isAdded = false;
-    private boolean isRestoreFromDB = false;
-
     private String tenant;
     private String namespace;
     private String topic;
     private String instanceId;
+    private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
 
     @Override
-    public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
-        taskManager = (TaskManager) srcManager;
-        commonInit(taskProfile, basicDb);
-        initOK = true;
-    }
-
-    private void commonInit(TaskProfile taskProfile, Db basicDb) {
+    protected void initTask() {
         LOGGER.info("pulsar commonInit: {}", taskProfile.toJsonStr());
-        this.taskProfile = taskProfile;
-        this.basicDb = basicDb;
         this.tenant = taskProfile.get(TASK_PULSAR_TENANT);
         this.namespace = taskProfile.get(TASK_PULSAR_NAMESPACE);
         this.topic = taskProfile.get(TASK_PULSAR_TOPIC);
         this.instanceId = tenant + "/" + namespace + "/" + topic;
-
-        this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
-        instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
-                basicDb, taskManager.getTaskDb());
-        try {
-            instanceManager.start();
-        } catch (Exception e) {
-            LOGGER.error("start instance manager error: ", e);
-        }
-    }
-
-    @Override
-    public void destroy() {
-        doChangeState(State.SUCCEEDED);
-        if (instanceManager != null) {
-            instanceManager.stop();
-        }
-    }
-
-    @Override
-    public TaskProfile getProfile() {
-        return taskProfile;
-    }
-
-    @Override
-    public String getTaskId() {
-        if (taskProfile == null) {
-            return "";
-        }
-        return taskProfile.getTaskId();
     }
 
     @Override
@@ -117,61 +60,21 @@ public class PulsarTask extends Task {
             LOGGER.error("task profile needs all required key");
             return false;
         }
-
         return true;
     }
 
     @Override
-    public void addCallbacks() {
-
-    }
-
-    @Override
-    public void run() {
-        Thread.currentThread().setName("pulsar-task-core-" + getTaskId());
-        running = true;
-        try {
-            doRun();
-        } catch (Throwable e) {
-            LOGGER.error("do run error: ", e);
-        }
-        running = false;
-    }
-
-    private void doRun() {
-        while (!isFinished()) {
-            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
-                LOGGER.info("pulsar task running! taskId {}", getTaskId());
-                lastPrintTime = AgentUtils.getCurrentTime();
-            }
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-            if (!initOK) {
-                continue;
-            }
-
-            // Add instance profile to instance manager
-            addInstanceProfile();
-
-            String inlongGroupId = taskProfile.getInlongGroupId();
-            String inlongStreamId = taskProfile.getInlongStreamId();
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
inlongGroupId, inlongStreamId,
-                    AgentUtils.getCurrentTime(), 1, 1);
-        }
-    }
-
-    private void addInstanceProfile() {
+    protected List<InstanceProfile> getNewInstanceList() {
+        List<InstanceProfile> list = new ArrayList<>();
         if (isAdded) {
-            return;
+            return list;
         }
-        String dataTime = 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+        String dataTime = LocalDateTime.now().format(dateTimeFormatter);
         InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_PULSAR_INSTANCE, instanceId,
                 CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
         LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
-        InstanceAction action = new InstanceAction(ActionType.ADD, 
instanceProfile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
-            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-        }
+        list.add(instanceProfile);
         this.isAdded = true;
+        return list;
     }
-}
+}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index b49a72313c..f06ac21f0a 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -21,14 +21,8 @@ import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
 import org.apache.inlong.agent.core.task.TaskAction;
-import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.plugin.task.AbstractTask;
 import org.apache.inlong.agent.plugin.task.file.FileScanner.BasicFileInfo;
 import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
 import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
@@ -60,7 +54,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -68,64 +64,60 @@ import java.util.stream.Stream;
 /**
  * Watch directory, if new valid files are created, create jobs 
correspondingly.
  */
-public class LogFileTask extends Task {
+public class LogFileTask extends AbstractTask {
 
-    public static final String DEFAULT_FILE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.FileInstance";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LogFileTask.class);
+    public static final String DEFAULT_FILE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.FileInstance";
     public static final String SCAN_CYCLE_RANCE = "-2";
-    private TaskProfile taskProfile;
-    private Db basicDb;
-    private TaskManager taskManager;
-    private InstanceManager instanceManager;
+    private static final int INSTANCE_QUEUE_CAPACITY = 10;
     private final Map<String, WatchEntity> watchers = new 
ConcurrentHashMap<>();
     private final Set<String> watchFailedDirs = new HashSet<>();
     private final Map<String/* dataTime */, Map<String/* fileName */, 
InstanceProfile>> eventMap =
             new ConcurrentHashMap<>();
     public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
-    public static final int CORE_THREAD_SLEEP_TIME = 1000;
     public static final int CORE_THREAD_MAX_GAP_TIME_MS = 60 * 1000;
-    public static final int CORE_THREAD_PRINT_TIME = 10000;
-    private long lastPrintTime = 0;
     private boolean retry;
     private long startTime;
     private long endTime;
     private boolean realTime = false;
-    private boolean initOK = false;
     private Set<String> originPatterns;
     private long lastScanTime = 0;
     public final long SCAN_INTERVAL = 1 * 60 * 1000;
     private volatile boolean runAtLeastOneTime = false;
     private volatile long coreThreadUpdateTime = 0;
-    private volatile boolean running = false;
+    private BlockingQueue<InstanceProfile> instanceQueue;
 
     @Override
-    public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
-        taskManager = (TaskManager) srcManager;
-        commonInit(taskProfile, basicDb);
+    protected void initTask() {
+        instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
+        retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
+        originPatterns = 
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
+                .collect(Collectors.toSet());
+        if 
(taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+            realTime = true;
+        }
         if (retry) {
             retryInit();
         } else {
             watchInit();
         }
-        initOK = true;
     }
 
-    private void commonInit(TaskProfile taskProfile, Db basicDb) {
-        this.taskProfile = taskProfile;
-        this.basicDb = basicDb;
-        retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
-        originPatterns = 
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
-                .collect(Collectors.toSet());
-        if 
(taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
-            realTime = true;
+    @Override
+    protected List<InstanceProfile> getNewInstanceList() {
+        if (retry) {
+            runForRetry();
+        } else {
+            runForNormal();
         }
-        instanceManager = new InstanceManager(taskProfile.getTaskId(), 
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
-                basicDb, taskManager.getTaskDb());
-        try {
-            instanceManager.start();
-        } catch (Exception e) {
-            LOGGER.error("start instance manager error: ", e);
+        List<InstanceProfile> list = new ArrayList<>();
+        while (list.size() < INSTANCE_QUEUE_CAPACITY && 
!instanceQueue.isEmpty()) {
+            InstanceProfile profile = instanceQueue.poll();
+            if (profile != null) {
+                list.add(profile);
+            }
         }
+        return list;
     }
 
     @Override
@@ -151,9 +143,8 @@ public class LogFileTask extends Task {
             LOGGER.error("task profile needs time zone");
             return false;
         }
-        boolean ret =
-                profile.hasKey(TaskConstants.FILE_DIR_FILTER_PATTERNS)
-                        && profile.hasKey(TaskConstants.FILE_MAX_NUM);
+        boolean ret = profile.hasKey(TaskConstants.FILE_DIR_FILTER_PATTERNS)
+                && profile.hasKey(TaskConstants.FILE_MAX_NUM);
         if (!ret) {
             LOGGER.error("task profile needs file keys");
             return false;
@@ -218,11 +209,7 @@ public class LogFileTask extends Task {
     }
 
     @Override
-    public void destroy() {
-        doChangeState(State.SUCCEEDED);
-        if (instanceManager != null) {
-            instanceManager.stop();
-        }
+    protected void releaseTask() {
         releaseWatchers(watchers);
     }
 
@@ -243,66 +230,13 @@ public class LogFileTask extends Task {
         });
     }
 
-    @Override
-    public TaskProfile getProfile() {
-        return taskProfile;
-    }
-
-    @Override
-    public String getTaskId() {
-        if (taskProfile == null) {
-            return "";
-        }
-        return taskProfile.getTaskId();
-    }
-
-    @Override
-    public void addCallbacks() {
-
-    }
-
-    @Override
-    public void run() {
-        Thread.currentThread().setName("directory-task-core-" + getTaskId());
-        running = true;
-        try {
-            doRun();
-        } catch (Throwable e) {
-            LOGGER.error("do run error: ", e);
-        }
-        running = false;
-    }
-
-    private void doRun() {
-        while (!isFinished()) {
-            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
-                LOGGER.info("log file task running! taskId {}", getTaskId());
-                lastPrintTime = AgentUtils.getCurrentTime();
-            }
-            coreThreadUpdateTime = AgentUtils.getCurrentTime();
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-            if (!initOK) {
-                continue;
-            }
-            if (retry) {
-                runForRetry();
-            } else {
-                runForNormal();
-            }
-            String inlongGroupId = taskProfile.getInlongGroupId();
-            String inlongStreamId = taskProfile.getInlongStreamId();
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
inlongGroupId, inlongStreamId,
-                    AgentUtils.getCurrentTime(), 1, 1);
-        }
-    }
-
     private void runForRetry() {
         if (!runAtLeastOneTime) {
             scanExistingFile();
             runAtLeastOneTime = true;
         }
         dealWithEventMap();
-        if (instanceManager.allInstanceFinished()) {
+        if (allInstanceFinished()) {
             LOGGER.info("retry task finished, send action to task manager, 
taskId {}", getTaskId());
             TaskAction action = new 
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
             taskManager.submitAction(action);
@@ -435,13 +369,11 @@ public class LogFileTask extends Task {
             for (InstanceProfile sortEvent : sortedEvents) {
                 String fileName = sortEvent.getInstanceId();
                 InstanceProfile profile = sameDataTimeEvents.get(fileName);
-                InstanceAction action = new InstanceAction(ActionType.ADD, 
profile);
-                if (!isCurrentDataTime && instanceManager.isFull()) {
+                if (!isCurrentDataTime && isFull()) {
                     return;
                 }
-                while (!isFinished() && !instanceManager.submitAction(action)) 
{
-                    LOGGER.error("instance manager action queue is full: 
taskId {}", instanceManager.getTaskId());
-                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+                if (!instanceQueue.offer(profile)) {
+                    return;
                 }
                 sameDataTimeEvents.remove(fileName);
             }
@@ -564,7 +496,7 @@ public class LogFileTask extends Task {
             return;
         }
         Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName);
-        if (!instanceManager.shouldAddAgain(fileName, fileUpdateTime)) {
+        if (!shouldAddAgain(fileName, fileUpdateTime)) {
             LOGGER.info("addToEvenMap shouldAddAgain returns false skip taskId 
{} dataTime {} fileName {}",
                     taskProfile.getTaskId(), dataTime, fileName);
             return;


Reply via email to