This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 52505bc739 [INLONG-11522][Agent]  Strictly process new instances in 
the order of submission (#11523)
52505bc739 is described below

commit 52505bc739120fd8748f67da41b8befa0e99b5f0
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Nov 21 21:04:06 2024 +0800

    [INLONG-11522][Agent]  Strictly process new instances in the order of 
submission (#11523)
---
 .../agent/core/instance/InstanceManager.java       | 29 ++++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 2a27e792e8..1b9dba8191 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -65,6 +65,7 @@ public class InstanceManager extends AbstractDaemon {
     private final ConcurrentHashMap<String, Instance> instanceMap;
     // instance profile queue.
     private final BlockingQueue<InstanceAction> actionQueue;
+    private final BlockingQueue<InstanceAction> addActionQueue;
     // task thread pool;
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
@@ -125,6 +126,7 @@ public class InstanceManager extends AbstractDaemon {
         instanceMap = new ConcurrentHashMap<>();
         this.instanceLimit = instanceLimit;
         actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
+        addActionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
     }
 
     public String getTaskId() {
@@ -167,6 +169,7 @@ public class InstanceManager extends AbstractDaemon {
                     AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
                     printInstanceState();
                     dealWithActionQueue();
+                    dealWithAddActionQueue();
                     keepPaceWithStore();
                     String inlongGroupId = taskFromStore.getInlongGroupId();
                     String inlongStreamId = taskFromStore.getInlongStreamId();
@@ -263,7 +266,9 @@ public class InstanceManager extends AbstractDaemon {
                 }
                 switch (action.getActionType()) {
                     case ADD:
-                        addInstance(action.getProfile());
+                        if (!addActionQueue.offer(action)) {
+                            LOGGER.error("it should never happen: addQueue is 
full");
+                        }
                         break;
                     case FINISH:
                         finishInstance(action.getProfile());
@@ -282,6 +287,20 @@ public class InstanceManager extends AbstractDaemon {
         }
     }
 
+    private void dealWithAddActionQueue() {
+        while (isRunnable()) {
+            if (instanceMap.size() > instanceLimit) {
+                LOGGER.error("instanceMap size {} over limit {}", 
instanceMap.size(), instanceLimit);
+                return;
+            }
+            InstanceAction action = addActionQueue.poll();
+            if (action == null) {
+                break;
+            }
+            addInstance(action.getProfile());
+        }
+    }
+
     @Override
     public void start() {
         restoreFromStore();
@@ -320,12 +339,6 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     private void addInstance(InstanceProfile profile) {
-        if (instanceMap.size() > instanceLimit) {
-            LOGGER.error("instanceMap size {} over limit {}", 
instanceMap.size(), instanceLimit);
-            actionQueue.offer(new InstanceAction(ActionType.ADD, profile));
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
-            return;
-        }
         LOGGER.info("add instance taskId {} instanceId {}", taskId, 
profile.getInstanceId());
         if (!shouldAddAgain(profile.getInstanceId(), 
profile.getFileUpdateTime())) {
             LOGGER.info("addInstance shouldAddAgain returns false skip taskId 
{} instanceId {}", taskId,
@@ -474,7 +487,7 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     public boolean isFull() {
-        return (instanceMap.size() + actionQueue.size()) >= instanceLimit * 
reserveCoefficient;
+        return (actionQueue.size() + addActionQueue.size()) >= 
ACTION_QUEUE_CAPACITY * reserveCoefficient;
     }
 
     public long getFinishedInstanceCount() {

Reply via email to