This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 52505bc739 [INLONG-11522][Agent] Strictly process new instances in
the order of submission (#11523)
52505bc739 is described below
commit 52505bc739120fd8748f67da41b8befa0e99b5f0
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Nov 21 21:04:06 2024 +0800
[INLONG-11522][Agent] Strictly process new instances in the order of
submission (#11523)
---
.../agent/core/instance/InstanceManager.java | 29 ++++++++++++++++------
1 file changed, 21 insertions(+), 8 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 2a27e792e8..1b9dba8191 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -65,6 +65,7 @@ public class InstanceManager extends AbstractDaemon {
private final ConcurrentHashMap<String, Instance> instanceMap;
// instance profile queue.
private final BlockingQueue<InstanceAction> actionQueue;
+ private final BlockingQueue<InstanceAction> addActionQueue;
// task thread pool;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
@@ -125,6 +126,7 @@ public class InstanceManager extends AbstractDaemon {
instanceMap = new ConcurrentHashMap<>();
this.instanceLimit = instanceLimit;
actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
+ addActionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
}
public String getTaskId() {
@@ -167,6 +169,7 @@ public class InstanceManager extends AbstractDaemon {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
printInstanceState();
dealWithActionQueue();
+ dealWithAddActionQueue();
keepPaceWithStore();
String inlongGroupId = taskFromStore.getInlongGroupId();
String inlongStreamId = taskFromStore.getInlongStreamId();
@@ -263,7 +266,9 @@ public class InstanceManager extends AbstractDaemon {
}
switch (action.getActionType()) {
case ADD:
- addInstance(action.getProfile());
+ if (!addActionQueue.offer(action)) {
+ LOGGER.error("it should never happen: addQueue is
full");
+ }
break;
case FINISH:
finishInstance(action.getProfile());
@@ -282,6 +287,20 @@ public class InstanceManager extends AbstractDaemon {
}
}
+ private void dealWithAddActionQueue() {
+ while (isRunnable()) {
+ if (instanceMap.size() > instanceLimit) {
+ LOGGER.error("instanceMap size {} over limit {}",
instanceMap.size(), instanceLimit);
+ return;
+ }
+ InstanceAction action = addActionQueue.poll();
+ if (action == null) {
+ break;
+ }
+ addInstance(action.getProfile());
+ }
+ }
+
@Override
public void start() {
restoreFromStore();
@@ -320,12 +339,6 @@ public class InstanceManager extends AbstractDaemon {
}
private void addInstance(InstanceProfile profile) {
- if (instanceMap.size() > instanceLimit) {
- LOGGER.error("instanceMap size {} over limit {}",
instanceMap.size(), instanceLimit);
- actionQueue.offer(new InstanceAction(ActionType.ADD, profile));
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
- return;
- }
LOGGER.info("add instance taskId {} instanceId {}", taskId,
profile.getInstanceId());
if (!shouldAddAgain(profile.getInstanceId(),
profile.getFileUpdateTime())) {
LOGGER.info("addInstance shouldAddAgain returns false skip taskId
{} instanceId {}", taskId,
@@ -474,7 +487,7 @@ public class InstanceManager extends AbstractDaemon {
}
public boolean isFull() {
- return (instanceMap.size() + actionQueue.size()) >= instanceLimit *
reserveCoefficient;
+ return (actionQueue.size() + addActionQueue.size()) >=
ACTION_QUEUE_CAPACITY * reserveCoefficient;
}
public long getFinishedInstanceCount() {