This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b206b830a1 [INLONG-10012][Agent] Adjust task encapsulation to place
common logicin the base class (#10013)
b206b830a1 is described below
commit b206b830a1eee70c966628352e31b87c9a20a231
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Apr 19 09:42:03 2024 +0800
[INLONG-10012][Agent] Adjust task encapsulation to place common logicin the
base class (#10013)
---
.../task/{KafkaTask.java => AbstractTask.java} | 121 ++++++++----------
.../apache/inlong/agent/plugin/task/KafkaTask.java | 128 +++----------------
.../inlong/agent/plugin/task/MongoDBTask.java | 121 ++----------------
.../inlong/agent/plugin/task/PulsarTask.java | 121 ++----------------
.../inlong/agent/plugin/task/file/LogFileTask.java | 140 ++++++---------------
5 files changed, 130 insertions(+), 501 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
similarity index 51%
copy from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
copy to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index 78ad200db0..56a786c6e5 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -19,7 +19,7 @@ package org.apache.inlong.agent.plugin.task;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.core.instance.ActionType;
import org.apache.inlong.agent.core.instance.InstanceAction;
import org.apache.inlong.agent.core.instance.InstanceManager;
@@ -34,51 +34,40 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
+import java.util.List;
-import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_TOPIC;
+public abstract class AbstractTask extends Task {
-public class KafkaTask extends Task {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaTask.class);
- public static final String DEFAULT_KAFKA_INSTANCE =
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
- public static final int CORE_THREAD_SLEEP_TIME = 5000;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractTask.class);
+ public static final int CORE_THREAD_SLEEP_TIME = 1000;
public static final int CORE_THREAD_PRINT_TIME = 10000;
-
- private TaskProfile taskProfile;
- private Db basicDb;
- private TaskManager taskManager;
+ protected TaskProfile taskProfile;
+ protected Db basicDb;
+ protected TaskManager taskManager;
private InstanceManager instanceManager;
- private long lastPrintTime = 0;
- private boolean initOK = false;
- private volatile boolean running = false;
- private boolean isAdded = false;
- private boolean isRestoreFromDB = false;
-
- private String topic;
+ protected volatile boolean running = false;
+ protected boolean initOK = false;
+ protected long lastPrintTime = 0;
@Override
public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
taskManager = (TaskManager) srcManager;
- commonInit(taskProfile, basicDb);
- initOK = true;
- }
-
- private void commonInit(TaskProfile taskProfile, Db basicDb) {
- LOGGER.info("kafka commonInit: {}", taskProfile.toJsonStr());
this.taskProfile = taskProfile;
this.basicDb = basicDb;
- this.topic = taskProfile.get(TASK_KAFKA_TOPIC);
- this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
- instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
+ instanceManager = new InstanceManager(taskProfile.getTaskId(),
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
basicDb, taskManager.getTaskDb());
try {
instanceManager.start();
} catch (Exception e) {
LOGGER.error("start instance manager error: ", e);
}
+ initTask();
+ initOK = true;
+ }
+
+ protected abstract void initTask();
+
+ protected void releaseTask() {
}
@Override
@@ -87,6 +76,7 @@ public class KafkaTask extends Task {
if (instanceManager != null) {
instanceManager.stop();
}
+ releaseTask();
}
@Override
@@ -102,16 +92,6 @@ public class KafkaTask extends Task {
return taskProfile.getTaskId();
}
- @Override
- public boolean isProfileValid(TaskProfile profile) {
- if (!profile.allRequiredKeyExist()) {
- LOGGER.error("task profile needs all required key");
- return false;
- }
-
- return true;
- }
-
@Override
public void addCallbacks() {
@@ -119,7 +99,7 @@ public class KafkaTask extends Task {
@Override
public void run() {
- Thread.currentThread().setName("kafka-task-core-" + getTaskId());
+ Thread.currentThread().setName("task-core-" + getTaskId());
running = true;
try {
doRun();
@@ -129,40 +109,49 @@ public class KafkaTask extends Task {
running = false;
}
- private void doRun() {
+ protected void doRun() {
while (!isFinished()) {
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
- LOGGER.info("kafka task running! taskId {}", getTaskId());
- lastPrintTime = AgentUtils.getCurrentTime();
- }
+ taskPrint();
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
if (!initOK) {
continue;
}
+ List<InstanceProfile> profileList = getNewInstanceList();
+ for (InstanceProfile profile : profileList) {
+ InstanceAction action = new InstanceAction(ActionType.ADD,
profile);
+ while (!isFinished() && !instanceManager.submitAction(action))
{
+ LOGGER.error("instance manager action queue is full:
taskId {}", getTaskId());
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ }
+ }
+ taskHeartbeat();
+ }
+ }
- // Add instance profile to instance manager
- addInstanceProfile();
+ protected abstract List<InstanceProfile> getNewInstanceList();
+
+ protected void taskHeartbeat() {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
taskProfile.getInlongGroupId(),
+ taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(),
1, 1);
- String inlongGroupId = taskProfile.getInlongGroupId();
- String inlongStreamId = taskProfile.getInlongStreamId();
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
inlongGroupId, inlongStreamId,
- AgentUtils.getCurrentTime(), 1, 1);
- }
}
- private void addInstanceProfile() {
- if (isAdded) {
- return;
- }
- String dataTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
- CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
- LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
- InstanceAction action = new InstanceAction(ActionType.ADD,
instanceProfile);
- while (!isFinished() && !instanceManager.submitAction(action)) {
- LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ protected void taskPrint() {
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
+ LOGGER.info("task running! taskId {}", getTaskId());
+ lastPrintTime = AgentUtils.getCurrentTime();
}
- this.isAdded = true;
+ }
+
+ protected boolean allInstanceFinished() {
+ return instanceManager.allInstanceFinished();
+ }
+
+ protected boolean shouldAddAgain(String fileName, long lastModifyTime) {
+ return instanceManager.shouldAddAgain(fileName, lastModifyTime);
+ }
+
+ protected boolean isFull() {
+ return instanceManager.isFull();
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
index 78ad200db0..f83104e8a7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
@@ -20,86 +20,45 @@ package org.apache.inlong.agent.plugin.task;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
-import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.file.Task;
-import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
-import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_TOPIC;
-public class KafkaTask extends Task {
+public class KafkaTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaTask.class);
public static final String DEFAULT_KAFKA_INSTANCE =
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
- public static final int CORE_THREAD_SLEEP_TIME = 5000;
- public static final int CORE_THREAD_PRINT_TIME = 10000;
-
- private TaskProfile taskProfile;
- private Db basicDb;
- private TaskManager taskManager;
- private InstanceManager instanceManager;
- private long lastPrintTime = 0;
- private boolean initOK = false;
- private volatile boolean running = false;
private boolean isAdded = false;
- private boolean isRestoreFromDB = false;
-
private String topic;
+ private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
@Override
- public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
- taskManager = (TaskManager) srcManager;
- commonInit(taskProfile, basicDb);
- initOK = true;
- }
-
- private void commonInit(TaskProfile taskProfile, Db basicDb) {
+ protected void initTask() {
LOGGER.info("kafka commonInit: {}", taskProfile.toJsonStr());
- this.taskProfile = taskProfile;
- this.basicDb = basicDb;
this.topic = taskProfile.get(TASK_KAFKA_TOPIC);
- this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
- instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
- basicDb, taskManager.getTaskDb());
- try {
- instanceManager.start();
- } catch (Exception e) {
- LOGGER.error("start instance manager error: ", e);
- }
- }
-
- @Override
- public void destroy() {
- doChangeState(State.SUCCEEDED);
- if (instanceManager != null) {
- instanceManager.stop();
- }
- }
-
- @Override
- public TaskProfile getProfile() {
- return taskProfile;
}
@Override
- public String getTaskId() {
- if (taskProfile == null) {
- return "";
+ protected List<InstanceProfile> getNewInstanceList() {
+ List<InstanceProfile> list = new ArrayList<>();
+ if (isAdded) {
+ return list;
}
- return taskProfile.getTaskId();
+ String dataTime = LocalDateTime.now().format(dateTimeFormatter);
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
+ CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
+ list.add(instanceProfile);
+ this.isAdded = true;
+ return list;
}
@Override
@@ -108,61 +67,6 @@ public class KafkaTask extends Task {
LOGGER.error("task profile needs all required key");
return false;
}
-
return true;
}
-
- @Override
- public void addCallbacks() {
-
- }
-
- @Override
- public void run() {
- Thread.currentThread().setName("kafka-task-core-" + getTaskId());
- running = true;
- try {
- doRun();
- } catch (Throwable e) {
- LOGGER.error("do run error: ", e);
- }
- running = false;
- }
-
- private void doRun() {
- while (!isFinished()) {
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
- LOGGER.info("kafka task running! taskId {}", getTaskId());
- lastPrintTime = AgentUtils.getCurrentTime();
- }
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
- if (!initOK) {
- continue;
- }
-
- // Add instance profile to instance manager
- addInstanceProfile();
-
- String inlongGroupId = taskProfile.getInlongGroupId();
- String inlongStreamId = taskProfile.getInlongStreamId();
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
inlongGroupId, inlongStreamId,
- AgentUtils.getCurrentTime(), 1, 1);
- }
- }
-
- private void addInstanceProfile() {
- if (isAdded) {
- return;
- }
- String dataTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
- CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
- LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
- InstanceAction action = new InstanceAction(ActionType.ADD,
instanceProfile);
- while (!isFinished() && !instanceManager.submitAction(action)) {
- LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
- }
- this.isAdded = true;
- }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
index 6081cc388c..2a022ddb78 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
@@ -21,87 +21,28 @@ import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
-import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.file.Task;
-import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
-import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
-
-public class MongoDBTask extends Task {
+public class MongoDBTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(MongoDBTask.class);
public static final String DEFAULT_MONGODB_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
- public static final int CORE_THREAD_SLEEP_TIME = 5000;
- public static final int CORE_THREAD_PRINT_TIME = 10000;
-
- private TaskProfile taskProfile;
- private Db basicDb;
- private TaskManager taskManager;
- private InstanceManager instanceManager;
- private long lastPrintTime = 0;
- private boolean initOK = false;
- private volatile boolean running = false;
private boolean isAdded = false;
- private boolean isRestoreFromDB = false;
-
- private String database;
private String collection;
+ private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
@Override
- public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
- taskManager = (TaskManager) srcManager;
- commonInit(taskProfile, basicDb);
- initOK = true;
- }
-
- private void commonInit(TaskProfile taskProfile, Db basicDb) {
+ protected void initTask() {
LOGGER.info("mongoDB commonInit: {}", taskProfile.toJsonStr());
- this.taskProfile = taskProfile;
- this.basicDb = basicDb;
- this.database =
taskProfile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST);
this.collection =
taskProfile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST);
- this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
- instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
- basicDb, taskManager.getTaskDb());
- try {
- instanceManager.start();
- } catch (Exception e) {
- LOGGER.error("start instance manager error: ", e);
- }
- }
-
- @Override
- public void destroy() {
- doChangeState(State.SUCCEEDED);
- if (instanceManager != null) {
- instanceManager.stop();
- }
- }
-
- @Override
- public TaskProfile getProfile() {
- return taskProfile;
- }
-
- @Override
- public String getTaskId() {
- if (taskProfile == null) {
- return "";
- }
- return taskProfile.getTaskId();
}
@Override
@@ -110,61 +51,21 @@ public class MongoDBTask extends Task {
LOGGER.error("task profile needs all required key");
return false;
}
-
return true;
}
@Override
- public void addCallbacks() {
-
- }
-
- @Override
- public void run() {
- Thread.currentThread().setName("mongoDB-task-core-" + getTaskId());
- running = true;
- try {
- doRun();
- } catch (Throwable e) {
- LOGGER.error("do run error: ", e);
- }
- running = false;
- }
-
- private void doRun() {
- while (!isFinished()) {
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
- LOGGER.info("mongoDB task running! taskId {}", getTaskId());
- lastPrintTime = AgentUtils.getCurrentTime();
- }
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
- if (!initOK) {
- continue;
- }
-
- // Add instance profile to instance manager
- addInstanceProfile();
-
- String inlongGroupId = taskProfile.getInlongGroupId();
- String inlongStreamId = taskProfile.getInlongStreamId();
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
inlongGroupId, inlongStreamId,
- AgentUtils.getCurrentTime(), 1, 1);
- }
- }
-
- private void addInstanceProfile() {
+ protected List<InstanceProfile> getNewInstanceList() {
+ List<InstanceProfile> list = new ArrayList<>();
if (isAdded) {
- return;
+ return list;
}
- String dataTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+ String dataTime = LocalDateTime.now().format(dateTimeFormatter);
InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_MONGODB_INSTANCE, collection,
CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
- InstanceAction action = new InstanceAction(ActionType.ADD,
instanceProfile);
- while (!isFinished() && !instanceManager.submitAction(action)) {
- LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
- }
+ list.add(instanceProfile);
this.isAdded = true;
+ return list;
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
index 1f5ce56c59..a105c45774 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
@@ -20,95 +20,38 @@ package org.apache.inlong.agent.plugin.task;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
-import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.file.Task;
-import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
-import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_NAMESPACE;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_TENANT;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_TOPIC;
-public class PulsarTask extends Task {
+public class PulsarTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarTask.class);
public static final String DEFAULT_PULSAR_INSTANCE =
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
- public static final int CORE_THREAD_SLEEP_TIME = 5000;
- public static final int CORE_THREAD_PRINT_TIME = 10000;
-
- private TaskProfile taskProfile;
- private Db basicDb;
- private TaskManager taskManager;
- private InstanceManager instanceManager;
- private long lastPrintTime = 0;
- private boolean initOK = false;
- private volatile boolean running = false;
private boolean isAdded = false;
- private boolean isRestoreFromDB = false;
-
private String tenant;
private String namespace;
private String topic;
private String instanceId;
+ private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
@Override
- public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
- taskManager = (TaskManager) srcManager;
- commonInit(taskProfile, basicDb);
- initOK = true;
- }
-
- private void commonInit(TaskProfile taskProfile, Db basicDb) {
+ protected void initTask() {
LOGGER.info("pulsar commonInit: {}", taskProfile.toJsonStr());
- this.taskProfile = taskProfile;
- this.basicDb = basicDb;
this.tenant = taskProfile.get(TASK_PULSAR_TENANT);
this.namespace = taskProfile.get(TASK_PULSAR_NAMESPACE);
this.topic = taskProfile.get(TASK_PULSAR_TOPIC);
this.instanceId = tenant + "/" + namespace + "/" + topic;
-
- this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
- instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
- basicDb, taskManager.getTaskDb());
- try {
- instanceManager.start();
- } catch (Exception e) {
- LOGGER.error("start instance manager error: ", e);
- }
- }
-
- @Override
- public void destroy() {
- doChangeState(State.SUCCEEDED);
- if (instanceManager != null) {
- instanceManager.stop();
- }
- }
-
- @Override
- public TaskProfile getProfile() {
- return taskProfile;
- }
-
- @Override
- public String getTaskId() {
- if (taskProfile == null) {
- return "";
- }
- return taskProfile.getTaskId();
}
@Override
@@ -117,61 +60,21 @@ public class PulsarTask extends Task {
LOGGER.error("task profile needs all required key");
return false;
}
-
return true;
}
@Override
- public void addCallbacks() {
-
- }
-
- @Override
- public void run() {
- Thread.currentThread().setName("pulsar-task-core-" + getTaskId());
- running = true;
- try {
- doRun();
- } catch (Throwable e) {
- LOGGER.error("do run error: ", e);
- }
- running = false;
- }
-
- private void doRun() {
- while (!isFinished()) {
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
- LOGGER.info("pulsar task running! taskId {}", getTaskId());
- lastPrintTime = AgentUtils.getCurrentTime();
- }
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
- if (!initOK) {
- continue;
- }
-
- // Add instance profile to instance manager
- addInstanceProfile();
-
- String inlongGroupId = taskProfile.getInlongGroupId();
- String inlongStreamId = taskProfile.getInlongStreamId();
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
inlongGroupId, inlongStreamId,
- AgentUtils.getCurrentTime(), 1, 1);
- }
- }
-
- private void addInstanceProfile() {
+ protected List<InstanceProfile> getNewInstanceList() {
+ List<InstanceProfile> list = new ArrayList<>();
if (isAdded) {
- return;
+ return list;
}
- String dataTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+ String dataTime = LocalDateTime.now().format(dateTimeFormatter);
InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_PULSAR_INSTANCE, instanceId,
CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
- InstanceAction action = new InstanceAction(ActionType.ADD,
instanceProfile);
- while (!isFinished() && !instanceManager.submitAction(action)) {
- LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
- }
+ list.add(instanceProfile);
this.isAdded = true;
+ return list;
}
-}
+}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index b49a72313c..f06ac21f0a 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -21,14 +21,8 @@ import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
import org.apache.inlong.agent.core.task.TaskAction;
-import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.plugin.task.AbstractTask;
import org.apache.inlong.agent.plugin.task.file.FileScanner.BasicFileInfo;
import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
@@ -60,7 +54,9 @@ import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -68,64 +64,60 @@ import java.util.stream.Stream;
/**
* Watch directory, if new valid files are created, create jobs
correspondingly.
*/
-public class LogFileTask extends Task {
+public class LogFileTask extends AbstractTask {
- public static final String DEFAULT_FILE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.FileInstance";
private static final Logger LOGGER =
LoggerFactory.getLogger(LogFileTask.class);
+ public static final String DEFAULT_FILE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.FileInstance";
public static final String SCAN_CYCLE_RANCE = "-2";
- private TaskProfile taskProfile;
- private Db basicDb;
- private TaskManager taskManager;
- private InstanceManager instanceManager;
+ private static final int INSTANCE_QUEUE_CAPACITY = 10;
private final Map<String, WatchEntity> watchers = new
ConcurrentHashMap<>();
private final Set<String> watchFailedDirs = new HashSet<>();
private final Map<String/* dataTime */, Map<String/* fileName */,
InstanceProfile>> eventMap =
new ConcurrentHashMap<>();
public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
- public static final int CORE_THREAD_SLEEP_TIME = 1000;
public static final int CORE_THREAD_MAX_GAP_TIME_MS = 60 * 1000;
- public static final int CORE_THREAD_PRINT_TIME = 10000;
- private long lastPrintTime = 0;
private boolean retry;
private long startTime;
private long endTime;
private boolean realTime = false;
- private boolean initOK = false;
private Set<String> originPatterns;
private long lastScanTime = 0;
public final long SCAN_INTERVAL = 1 * 60 * 1000;
private volatile boolean runAtLeastOneTime = false;
private volatile long coreThreadUpdateTime = 0;
- private volatile boolean running = false;
+ private BlockingQueue<InstanceProfile> instanceQueue;
@Override
- public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
- taskManager = (TaskManager) srcManager;
- commonInit(taskProfile, basicDb);
+ protected void initTask() {
+ instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
+ retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
+ originPatterns =
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
+ .collect(Collectors.toSet());
+ if
(taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+ realTime = true;
+ }
if (retry) {
retryInit();
} else {
watchInit();
}
- initOK = true;
}
- private void commonInit(TaskProfile taskProfile, Db basicDb) {
- this.taskProfile = taskProfile;
- this.basicDb = basicDb;
- retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
- originPatterns =
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
- .collect(Collectors.toSet());
- if
(taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
- realTime = true;
+ @Override
+ protected List<InstanceProfile> getNewInstanceList() {
+ if (retry) {
+ runForRetry();
+ } else {
+ runForNormal();
}
- instanceManager = new InstanceManager(taskProfile.getTaskId(),
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
- basicDb, taskManager.getTaskDb());
- try {
- instanceManager.start();
- } catch (Exception e) {
- LOGGER.error("start instance manager error: ", e);
+ List<InstanceProfile> list = new ArrayList<>();
+ while (list.size() < INSTANCE_QUEUE_CAPACITY &&
!instanceQueue.isEmpty()) {
+ InstanceProfile profile = instanceQueue.poll();
+ if (profile != null) {
+ list.add(profile);
+ }
}
+ return list;
}
@Override
@@ -151,9 +143,8 @@ public class LogFileTask extends Task {
LOGGER.error("task profile needs time zone");
return false;
}
- boolean ret =
- profile.hasKey(TaskConstants.FILE_DIR_FILTER_PATTERNS)
- && profile.hasKey(TaskConstants.FILE_MAX_NUM);
+ boolean ret = profile.hasKey(TaskConstants.FILE_DIR_FILTER_PATTERNS)
+ && profile.hasKey(TaskConstants.FILE_MAX_NUM);
if (!ret) {
LOGGER.error("task profile needs file keys");
return false;
@@ -218,11 +209,7 @@ public class LogFileTask extends Task {
}
@Override
- public void destroy() {
- doChangeState(State.SUCCEEDED);
- if (instanceManager != null) {
- instanceManager.stop();
- }
+ protected void releaseTask() {
releaseWatchers(watchers);
}
@@ -243,66 +230,13 @@ public class LogFileTask extends Task {
});
}
- @Override
- public TaskProfile getProfile() {
- return taskProfile;
- }
-
- @Override
- public String getTaskId() {
- if (taskProfile == null) {
- return "";
- }
- return taskProfile.getTaskId();
- }
-
- @Override
- public void addCallbacks() {
-
- }
-
- @Override
- public void run() {
- Thread.currentThread().setName("directory-task-core-" + getTaskId());
- running = true;
- try {
- doRun();
- } catch (Throwable e) {
- LOGGER.error("do run error: ", e);
- }
- running = false;
- }
-
- private void doRun() {
- while (!isFinished()) {
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
- LOGGER.info("log file task running! taskId {}", getTaskId());
- lastPrintTime = AgentUtils.getCurrentTime();
- }
- coreThreadUpdateTime = AgentUtils.getCurrentTime();
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
- if (!initOK) {
- continue;
- }
- if (retry) {
- runForRetry();
- } else {
- runForNormal();
- }
- String inlongGroupId = taskProfile.getInlongGroupId();
- String inlongStreamId = taskProfile.getInlongStreamId();
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
inlongGroupId, inlongStreamId,
- AgentUtils.getCurrentTime(), 1, 1);
- }
- }
-
private void runForRetry() {
if (!runAtLeastOneTime) {
scanExistingFile();
runAtLeastOneTime = true;
}
dealWithEventMap();
- if (instanceManager.allInstanceFinished()) {
+ if (allInstanceFinished()) {
LOGGER.info("retry task finished, send action to task manager,
taskId {}", getTaskId());
TaskAction action = new
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
taskManager.submitAction(action);
@@ -435,13 +369,11 @@ public class LogFileTask extends Task {
for (InstanceProfile sortEvent : sortedEvents) {
String fileName = sortEvent.getInstanceId();
InstanceProfile profile = sameDataTimeEvents.get(fileName);
- InstanceAction action = new InstanceAction(ActionType.ADD,
profile);
- if (!isCurrentDataTime && instanceManager.isFull()) {
+ if (!isCurrentDataTime && isFull()) {
return;
}
- while (!isFinished() && !instanceManager.submitAction(action))
{
- LOGGER.error("instance manager action queue is full:
taskId {}", instanceManager.getTaskId());
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ if (!instanceQueue.offer(profile)) {
+ return;
}
sameDataTimeEvents.remove(fileName);
}
@@ -564,7 +496,7 @@ public class LogFileTask extends Task {
return;
}
Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName);
- if (!instanceManager.shouldAddAgain(fileName, fileUpdateTime)) {
+ if (!shouldAddAgain(fileName, fileUpdateTime)) {
LOGGER.info("addToEvenMap shouldAddAgain returns false skip taskId
{} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
return;