This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4552466dfe [INLONG-9241][Agent] Print task and instance detail every
ten seconds (#9243)
4552466dfe is described below
commit 4552466dfefed046a2188571a3213570244c7d9e
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Nov 9 09:57:24 2023 +0800
[INLONG-9241][Agent] Print task and instance detail every ten seconds
(#9243)
---
.../agent/core/instance/InstanceManager.java | 28 ++++++++++++++++++----
.../inlong/agent/core/task/file/TaskManager.java | 17 +++++++++++++
.../agent/core/instance/TestInstanceManager.java | 1 +
3 files changed, 42 insertions(+), 4 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index de96b93bc1..d4a278a974 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -47,7 +47,9 @@ public class InstanceManager extends AbstractDaemon {
private static final Logger LOGGER =
LoggerFactory.getLogger(InstanceManager.class);
private static final int ACTION_QUEUE_CAPACITY = 100;
- public static final int CORE_THREAD_SLEEP_TIME = 100;
+ public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
+ public static final int CORE_THREAD_PRINT_TIME = 10000;
+ private long lastPrintTime = 0;
// task in db
private final InstanceDb instanceDb;
// task in memory
@@ -109,7 +111,8 @@ public class InstanceManager extends AbstractDaemon {
running = true;
while (isRunnable()) {
try {
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
+ printInstanceDetail();
dealWithActionQueue(actionQueue);
keepPaceWithDb();
} catch (Throwable ex) {
@@ -122,6 +125,22 @@ public class InstanceManager extends AbstractDaemon {
};
}
+ private void printInstanceDetail() {
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
+ LOGGER.info("instanceManager coreThread running! taskId {} action
count {}", taskId,
+ actionQueue.size());
+ List<InstanceProfile> instances = instanceDb.getInstances(taskId);
+ for (int i = 0; i < instances.size(); i++) {
+ InstanceProfile instance = instances.get(i);
+ LOGGER.info(
+ "instanceManager coreThread instance taskId {} index
{} total {} instanceId {} state {}",
+ taskId, i,
+ instances.size(), instance.getInstanceId(),
instance.getState());
+ }
+ lastPrintTime = AgentUtils.getCurrentTime();
+ }
+ }
+
private void keepPaceWithDb() {
traverseDbTasksToMemory();
traverseMemoryTasksToDb();
@@ -215,7 +234,7 @@ public class InstanceManager extends AbstractDaemon {
public void waitForTerminate() {
super.waitForTerminate();
while (running) {
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
}
}
@@ -241,7 +260,8 @@ public class InstanceManager extends AbstractDaemon {
}
LOGGER.info("add instance taskId {} instanceId {}", taskId,
profile.getInstanceId());
if (!shouldAddAgain(profile.getInstanceId(),
profile.getFileUpdateTime())) {
- LOGGER.info("shouldAddAgain returns false skip taskId {}
instanceId {}", taskId, profile.getInstanceId());
+ LOGGER.info("addInstance shouldAddAgain returns false skip taskId
{} instanceId {}", taskId,
+ profile.getInstanceId());
return;
}
addToDb(profile);
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 9aa71a96ab..b4cb79a48c 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -56,7 +56,9 @@ public class TaskManager extends AbstractDaemon {
private static final Logger LOGGER =
LoggerFactory.getLogger(TaskManager.class);
public static final int CONFIG_QUEUE_CAPACITY = 1;
public static final int CORE_THREAD_SLEEP_TIME = 1000;
+ public static final int CORE_THREAD_PRINT_TIME = 10000;
private static final int ACTION_QUEUE_CAPACITY = 100000;
+ private long lastPrintTime = 0;
// task basic db
private final Db taskBasicDb;
// instance basic db
@@ -143,6 +145,7 @@ public class TaskManager extends AbstractDaemon {
while (isRunnable()) {
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ printTaskDetail();
dealWithConfigQueue(configQueue);
dealWithActionQueue(actionQueue);
} catch (Throwable ex) {
@@ -153,6 +156,20 @@ public class TaskManager extends AbstractDaemon {
};
}
+ private void printTaskDetail() {
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
+ LOGGER.info("taskManager coreThread running!");
+ List<TaskProfile> tasks = taskDb.getTasks();
+ for (int i = 0; i < tasks.size(); i++) {
+ TaskProfile task = tasks.get(i);
+ LOGGER.info("taskManager coreThread task index {} total {}
taskId {} state {}",
+ i, tasks.size(), task.getTaskId(), task.getState());
+ }
+ lastPrintTime = AgentUtils.getCurrentTime();
+ }
+
+ }
+
private void dealWithConfigQueue(BlockingQueue<List<TaskProfile>> queue) {
List<TaskProfile> dataConfigs = queue.poll();
if (dataConfigs == null) {
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index b1107bb2ab..cff9a7c243 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -51,6 +51,7 @@ public class TestInstanceManager {
Db basicDb = TaskManager.initDb("/localdb");
taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L,
TaskStateEnum.RUNNING);
manager = new InstanceManager("1", 2, basicDb);
+ manager.CORE_THREAD_SLEEP_TIME_MS = 100;
manager.start();
}