This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8ade12d2dd [INLONG-11516][Agent] Accelerate the process exit speed
(#11517)
8ade12d2dd is described below
commit 8ade12d2dde155283675e78554eda2a53ca23b73
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Nov 21 16:32:06 2024 +0800
[INLONG-11516][Agent] Accelerate the process exit speed (#11517)
* [INLONG-11516][Agent] Accelerate the process exit speed
* [INLONG-11516][Agent] Add modifications to InstacneManager
---
.../org/apache/inlong/agent/plugin/Instance.java | 5 +++++
.../agent/core/instance/InstanceManager.java | 21 ++++++++++++++++++---
.../agent/plugin/instance/CommonInstance.java | 22 ++++++++++++++++++----
.../plugin/sinks/filecollect/SenderManager.java | 6 +++++-
.../agent/plugin/sources/file/AbstractSource.java | 14 +++++++-------
.../inlong/agent/plugin/instance/MockInstance.java | 5 +++++
.../agent/plugin/sources/TestLogFileSource.java | 2 +-
7 files changed, 59 insertions(+), 16 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
index 0d43587f6e..e67d543882 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
@@ -39,6 +39,11 @@ public abstract class Instance extends AbstractStateWrapper {
*/
public abstract void destroy();
+ /**
+ * notify destroy instance.
+ */
+ public abstract void notifyDestroy();
+
/**
* get instance profile
*/
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 06dd20a99e..2a27e792e8 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -147,6 +147,9 @@ public class InstanceManager extends AbstractDaemon {
if (action == null) {
return false;
}
+ if (isFull()) {
+ return false;
+ }
return actionQueue.offer(action);
}
@@ -163,7 +166,7 @@ public class InstanceManager extends AbstractDaemon {
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
printInstanceState();
- dealWithActionQueue(actionQueue);
+ dealWithActionQueue();
keepPaceWithStore();
String inlongGroupId = taskFromStore.getInlongGroupId();
String inlongStreamId = taskFromStore.getInlongStreamId();
@@ -251,10 +254,10 @@ public class InstanceManager extends AbstractDaemon {
});
}
- private void dealWithActionQueue(BlockingQueue<InstanceAction> queue) {
+ private void dealWithActionQueue() {
while (isRunnable()) {
try {
- InstanceAction action = queue.poll();
+ InstanceAction action = actionQueue.poll();
if (action == null) {
break;
}
@@ -375,6 +378,15 @@ public class InstanceManager extends AbstractDaemon {
instance.getProfile().getSinkDataTime(), 1, 1, auditVersion);
}
+ private void notifyDestroyInstance(String instanceId) {
+ Instance instance = instanceMap.get(instanceId);
+ if (instance == null) {
+ LOGGER.error("try to notify destroy instance but not found: taskId
{} instanceId {}", taskId, instanceId);
+ return;
+ }
+ instance.notifyDestroy();
+ }
+
private void addToStore(InstanceProfile profile, boolean addNew) {
LOGGER.info("add instance to instance store state {} instanceId {}",
profile.getState(),
profile.getInstanceId());
@@ -433,6 +445,9 @@ public class InstanceManager extends AbstractDaemon {
}
private void stopAllInstances() {
+ instanceMap.values().forEach((instance) -> {
+ notifyDestroyInstance(instance.getInstanceId());
+ });
instanceMap.values().forEach((instance) -> {
deleteFromMemory(instance.getInstanceId());
});
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
index 415b05825a..7267066aee 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
@@ -95,15 +95,29 @@ public abstract class CommonInstance extends Instance {
@Override
public void destroy() {
- if (!inited) {
- return;
- }
- doChangeState(State.SUCCEEDED);
+ Long start = AgentUtils.getCurrentTime();
+ notifyDestroy();
while (running) {
AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
}
+ LOGGER.info("destroy instance wait run elapse {} ms instance {}",
AgentUtils.getCurrentTime() - start,
+ profile.getInstanceId());
+ start = AgentUtils.getCurrentTime();
this.source.destroy();
+ LOGGER.info("destroy instance wait source elapse {} ms instance {}",
AgentUtils.getCurrentTime() - start,
+ profile.getInstanceId());
+ start = AgentUtils.getCurrentTime();
this.sink.destroy();
+ LOGGER.info("destroy instance wait sink elapse {} ms instance {}",
AgentUtils.getCurrentTime() - start,
+ profile.getInstanceId());
+ }
+
+ @Override
+ public void notifyDestroy() {
+ if (!inited) {
+ return;
+ }
+ doChangeState(State.SUCCEEDED);
}
@Override
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index a37a171a37..ec4502a7fb 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -70,6 +70,7 @@ public class SenderManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(SenderManager.class);
private static final SequentialID SEQUENTIAL_ID =
SequentialID.getInstance();
+ public static final int RESEND_QUEUE_WAIT_MS = 10;
// cache for group and sender list, share the map cross agent lifecycle.
private DefaultMessageSender sender;
private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
@@ -172,9 +173,12 @@ public class SenderManager {
}
private void closeMessageSender() {
+ Long start = AgentUtils.getCurrentTime();
if (sender != null) {
sender.close();
}
+ LOGGER.info("close sender elapse {} ms instance {}",
AgentUtils.getCurrentTime() - start,
+ profile.getInstanceId());
}
private AgentMetricItem getMetricItem(Map<String, String> otherDimensions)
{
@@ -286,7 +290,7 @@ public class SenderManager {
resendRunning = true;
while (!shutdown) {
try {
- AgentSenderCallback callback = resendQueue.poll(1,
TimeUnit.SECONDS);
+ AgentSenderCallback callback =
resendQueue.poll(RESEND_QUEUE_WAIT_MS, TimeUnit.MILLISECONDS);
if (callback != null) {
SenderMessage message = callback.message;
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND,
message.getGroupId(),
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 8929b33d01..803b9235d2 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -80,8 +80,8 @@ public abstract class AbstractSource implements Source {
protected final Integer BATCH_READ_LINE_COUNT = 10000;
protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
- protected final Integer READ_WAIT_TIMEOUT_MS = 10;
- private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
+ protected final Integer WAIT_TIMEOUT_MS = 10;
+ private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60 * 100;
private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
protected BlockingQueue<SourceData> queue;
@@ -172,7 +172,7 @@ public abstract class AbstractSource implements Source {
emptyCount = 0;
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
- AgentUtils.silenceSleepInSeconds(1);
+ AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
continue;
}
emptyCount = 0;
@@ -231,7 +231,7 @@ public abstract class AbstractSource implements Source {
if (!isRunnable()) {
return false;
}
- AgentUtils.silenceSleepInSeconds(1);
+ AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
}
}
return true;
@@ -247,7 +247,7 @@ public abstract class AbstractSource implements Source {
try {
boolean offerSuc = false;
while (isRunnable() && !offerSuc) {
- offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
+ offerSuc = queue.offer(sourceData, WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
}
if (!offerSuc) {
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.getData().length);
@@ -338,7 +338,7 @@ public abstract class AbstractSource implements Source {
private SourceData readFromQueue() {
SourceData sourceData = null;
try {
- sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("poll {} data get interrupted.", instanceId);
}
@@ -405,7 +405,7 @@ public abstract class AbstractSource implements Source {
while (queue != null && !queue.isEmpty()) {
SourceData sourceData = null;
try {
- sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ sourceData = queue.poll(WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("poll {} data get interrupted.", instanceId, e);
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java
index d3dc67df5c..278a9298f9 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java
@@ -55,6 +55,11 @@ public class MockInstance extends Instance {
destroyTime = index.getAndAdd(1);
}
+ @Override
+ public void notifyDestroy() {
+
+ }
+
@Override
public InstanceProfile getProfile() {
return profile;
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 6ee892c914..5d6871fecb 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -90,7 +90,7 @@ public class TestLogFileSource {
Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS",
0);
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE",
2);
Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
- Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
+ Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
if (offset > 0) {
OffsetProfile offsetProfile = new
OffsetProfile(instanceProfile.getTaskId(),
instanceProfile.getInstanceId(),