This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a87e7e1a3d [INLONG-9582][Agent] Add unit testing to instance manager
to test their ability to recover tasks from DB (#9583)
a87e7e1a3d is described below
commit a87e7e1a3da7d75c136bce2cbb5096d8d49e2262
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Jan 18 10:09:26 2024 +0800
[INLONG-9582][Agent] Add unit testing to instance manager to test their
ability to recover tasks from DB (#9583)
---
.../inlong/agent/core/instance/InstanceManager.java | 6 +++++-
.../agent/core/instance/TestInstanceManager.java | 18 ++++++++++++++++--
2 files changed, 21 insertions(+), 3 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 3b74cf4e48..e980354fe2 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -132,6 +132,10 @@ public class InstanceManager extends AbstractDaemon {
return taskId;
}
+ public InstanceDb getInstanceDb() {
+ return instanceDb;
+ }
+
public Instance getInstance(String instanceId) {
return instanceMap.get(instanceId);
}
@@ -167,7 +171,7 @@ public class InstanceManager extends AbstractDaemon {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId,
inlongStreamId,
AgentUtils.getCurrentTime(), 1, 1);
} catch (Throwable ex) {
- LOGGER.error("coreThread {}", ex);
+ LOGGER.error("coreThread error: ", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
}
runAtLeastOneTime = true;
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 34772636ad..8d267e71a1 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentBaseTestsHelper;
import org.apache.inlong.agent.core.task.file.TaskManager;
import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.db.InstanceDb;
import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
@@ -57,9 +58,9 @@ public class TestInstanceManager {
taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L,
TaskStateEnum.RUNNING, "GMT+6:00");
Db taskBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
TaskProfileDb taskDb = new TaskProfileDb(taskBasicDb);
- manager = new InstanceManager("1", 2, basicDb, taskDb);
+ taskDb.storeTask(taskProfile);
+ manager = new InstanceManager("1", 20, basicDb, taskDb);
manager.CORE_THREAD_SLEEP_TIME_MS = 100;
- manager.start();
}
@AfterClass
@@ -70,6 +71,19 @@ public class TestInstanceManager {
@Test
public void testInstanceManager() {
+ InstanceDb instanceDb = manager.getInstanceDb();
+ for (int i = 1; i <= 10; i++) {
+ InstanceProfile profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
+ String.valueOf(i), taskProfile.getCycleUnit(),
"2023092710",
+ AgentUtils.getCurrentTime());
+ instanceDb.storeInstance(profile);
+ }
+ manager.start();
+ for (int i = 1; i <= 10; i++) {
+ String instanceId = String.valueOf(i);
+ await().atMost(1, TimeUnit.SECONDS).until(() ->
manager.getInstance(instanceId) != null);
+
Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() ==
InstanceStateEnum.DEFAULT);
+ }
long timeBefore = AgentUtils.getCurrentTime();
InstanceProfile profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
helper.getTestRootDir() + "/2023092710_1.txt",
taskProfile.getCycleUnit(), "2023092710",