This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c86def6bf1 [INLONG-9580][Agent] Add unit testing to taskmanager to
test their ability to recover tasks from DB (#9581)
c86def6bf1 is described below
commit c86def6bf187801b87dd8e2cdd8641034f3cb332
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Jan 17 18:34:08 2024 +0800
[INLONG-9580][Agent] Add unit testing to taskmanager to test their ability
to recover tasks from DB (#9581)
---
.../inlong/agent/core/task/file/TaskManager.java | 4 +-
.../inlong/agent/core/task/TestTaskManager.java | 44 +++++++++++++++++-----
2 files changed, 37 insertions(+), 11 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 684600dbb2..d4853085de 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -418,13 +418,15 @@ public class TaskManager extends AbstractDaemon {
}
private void restoreFromDb() {
+ LOGGER.info("restore from db start");
List<TaskProfile> taskProfileList = taskDb.getTasks();
taskProfileList.forEach((profile) -> {
if (profile.getState() == TaskStateEnum.RUNNING) {
- LOGGER.info("restoreFromDb taskId {}", profile.getTaskId());
+ LOGGER.info("restore from db taskId {}", profile.getTaskId());
addToMemory(profile);
}
});
+ LOGGER.info("restore from db end");
}
private void stopAllTasks() {
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
index 4fff284a80..08e87086e2 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.core.task;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.core.AgentBaseTestsHelper;
import org.apache.inlong.agent.core.task.file.TaskManager;
+import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.junit.AfterClass;
@@ -44,24 +45,37 @@ public class TestTaskManager {
@BeforeClass
public static void setup() {
helper = new
AgentBaseTestsHelper(TestTaskManager.class.getName()).setupAgentHome();
- try {
- manager = new TaskManager();
- manager.start();
- } catch (Exception e) {
- Assert.assertTrue("manager start error", false);
- }
}
@AfterClass
public static void teardown() throws Exception {
- manager.stop();
- helper.teardownAgentHome();
}
@Test
public void testTaskManager() {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false,
0L, 0L, TaskStateEnum.RUNNING, "GMT+8:00");
+ try {
+ manager = new TaskManager();
+ TaskProfileDb taskProfileDb = manager.getTaskDb();
+ for (int i = 1; i <= 10; i++) {
+ TaskProfile taskProfile = helper.getTaskProfile(i, pattern,
false, 0L, 0L, TaskStateEnum.RUNNING,
+ "GMT+8:00");
+ taskProfile.setTaskClass(MockTask.class.getCanonicalName());
+ taskProfileDb.storeTask(taskProfile);
+ }
+ manager.start();
+ for (int i = 1; i <= 10; i++) {
+ String taskId = String.valueOf(i);
+ await().atMost(3, TimeUnit.SECONDS).until(() ->
manager.getTask(taskId) != null);
+ Assert.assertTrue(manager.getTask(taskId) != null);
+ }
+ } catch (Exception e) {
+ LOGGER.error("manager start error: ", e);
+ Assert.assertTrue("manager start error", false);
+ }
+
+ TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, false,
0L, 0L, TaskStateEnum.RUNNING,
+ "GMT+8:00");
String taskId1 = taskProfile1.getTaskId();
taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles1 = new ArrayList<>();
@@ -77,13 +91,16 @@ public class TestTaskManager {
manager.submitTaskProfiles(taskProfiles1);
await().atMost(3, TimeUnit.SECONDS).until(() ->
manager.getTask(taskId1) == null);
Assert.assertTrue(manager.getTaskProfile(taskId1).getState() ==
TaskStateEnum.FROZEN);
+
+ // test restore from froze
taskProfile1.setState(TaskStateEnum.RUNNING);
manager.submitTaskProfiles(taskProfiles1);
await().atMost(3, TimeUnit.SECONDS).until(() ->
manager.getTask(taskId1) != null);
Assert.assertTrue(manager.getTaskProfile(taskId1).getState() ==
TaskStateEnum.RUNNING);
// test delete
- TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false,
0L, 0L, TaskStateEnum.RUNNING, "GMT+8:00");
+ TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, false,
0L, 0L, TaskStateEnum.RUNNING,
+ "GMT+8:00");
taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles2 = new ArrayList<>();
taskProfiles2.add(taskProfile2);
@@ -93,5 +110,12 @@ public class TestTaskManager {
String taskId2 = taskProfile2.getTaskId();
await().atMost(3, TimeUnit.SECONDS).until(() ->
manager.getTask(taskId2) != null);
Assert.assertTrue(manager.getTaskProfile(taskId2).getState() ==
TaskStateEnum.RUNNING);
+
+ try {
+ manager.stop();
+ helper.teardownAgentHome();
+ } catch (Exception e) {
+ Assert.assertTrue("manager stop error", false);
+ }
}
}