This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 92d7774915 [INLONG-11760][Agent] Increase the number of global 
instances control (#11761)
92d7774915 is described below

commit 92d7774915864688f8e26b009069f5165f0733a1
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Feb 14 10:36:11 2025 +0800

    [INLONG-11760][Agent] Increase the number of global instances control 
(#11761)
---
 .../inlong/agent/constant/AgentConstants.java       |  2 ++
 .../inlong/agent/constant/CommonConstants.java      |  2 +-
 .../org/apache/inlong/agent/plugin/file/Task.java   |  2 ++
 .../apache/inlong/agent/core/HeartbeatManager.java  |  2 +-
 .../inlong/agent/core/instance/InstanceManager.java | 21 +++++++++++++++++++--
 .../apache/inlong/agent/core/task/TaskManager.java  |  8 ++++++++
 .../apache/inlong/agent/plugin/sinks/ProxySink.java |  4 ++--
 .../apache/inlong/agent/plugin/sinks/Sender.java    |  4 ++--
 .../inlong/agent/plugin/task/AbstractTask.java      |  7 ++++++-
 .../apache/inlong/agent/plugin/task/CronTask.java   |  5 +++++
 .../agent/plugin/instance/TestInstanceManager.java  |  2 +-
 .../apache/inlong/agent/plugin/task/MockTask.java   |  5 +++++
 12 files changed, 54 insertions(+), 10 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 0c73c12852..45ba789250 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -81,6 +81,8 @@ public class AgentConstants {
     public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2";
     public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10";
     public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600";
+    public static final String AGENT_INSTANCE_LIMIT = "agent.instance.limit";
+    public static final int DEFAULT_AGENT_INSTANCE_LIMIT = 100;
 
     // pulsar sink config
     public static final String PULSAR_CLIENT_IO_TREHAD_NUM = 
"agent.sink.pulsar.client.io.thread.num";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 757db41afd..db0e509718 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -29,7 +29,7 @@ public class CommonConstants {
     public static final String DEFAULT_PROXY_INLONG_STREAM_ID = 
"default_inlong_stream_id";
 
     public static final String PROXY_TOTAL_ASYNC_PROXY_SIZE = 
"proxy.total.async.proxy.size";
-    public static final int DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE = 200 * 1024 
* 1024;
+    public static final int DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB = 200 * 
1024;
 
     public static final String PROXY_ALIVE_CONNECTION_NUM = 
"proxy.alive.connection.num";
     public static final int DEFAULT_PROXY_ALIVE_CONNECTION_NUM = 10;
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
index 40ad855db7..ea1577917c 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
@@ -54,4 +54,6 @@ public abstract class Task extends AbstractStateWrapper {
      * is profile valid
      */
     public abstract boolean isProfileValid(TaskProfile profile);
+
+    public abstract int getInstanceNum();
 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 82cd5cb648..d1d759cd0a 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -202,7 +202,7 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
             proxyClientConfig = new TcpMsgSenderConfig(managerAddr,
                     INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
             proxyClientConfig.setMaxInFlightSizeInKb(
-                    CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE / 
1024);
+                    CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB);
             
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
             
proxyClientConfig.setNettyWorkerThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
             proxyClientConfig.setRequestTimeoutMs(30000L);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 23c16cabe8..65a584fa8a 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -22,6 +22,8 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.core.task.TaskManager;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Instance;
 import org.apache.inlong.agent.store.InstanceStore;
@@ -81,6 +83,8 @@ public class InstanceManager extends AbstractDaemon {
     private long auditVersion;
     private volatile boolean running = false;
     private final double reserveCoefficient = 0.8;
+    protected TaskManager taskManager;
+    private final int globalInstanceLimit;
 
     private class InstancePrintStat {
 
@@ -118,7 +122,9 @@ public class InstanceManager extends AbstractDaemon {
     /**
      * Init task manager.
      */
-    public InstanceManager(String taskId, int instanceLimit, Store basicStore, 
TaskStore taskStore) {
+    public InstanceManager(TaskManager taskManager, String taskId, int 
instanceLimit, Store basicStore,
+            TaskStore taskStore) {
+        this.taskManager = taskManager;
         this.taskId = taskId;
         instanceStore = new InstanceStore(basicStore);
         this.taskStore = taskStore;
@@ -127,6 +133,8 @@ public class InstanceManager extends AbstractDaemon {
         this.instanceLimit = instanceLimit;
         actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
         addActionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
+        globalInstanceLimit = 
agentConf.getInt(AgentConstants.AGENT_INSTANCE_LIMIT,
+                AgentConstants.DEFAULT_AGENT_INSTANCE_LIMIT);
     }
 
     public String getTaskId() {
@@ -292,8 +300,12 @@ public class InstanceManager extends AbstractDaemon {
 
     private void dealWithAddActionQueue() {
         while (isRunnable()) {
+            if (taskManager != null && taskManager.getInstanceNum() > 
globalInstanceLimit) {
+                LOGGER.error("global instance num {} over limit {}", 
taskManager.getInstanceNum(), globalInstanceLimit);
+                return;
+            }
             if (instanceMap.size() > instanceLimit) {
-                LOGGER.error("instanceMap size {} over limit {}", 
instanceMap.size(), instanceLimit);
+                LOGGER.error("task {} instanceMap size {} over limit {}", 
taskId, instanceMap.size(), instanceLimit);
                 return;
             }
             InstanceAction action = addActionQueue.poll();
@@ -507,4 +519,9 @@ public class InstanceManager extends AbstractDaemon {
         }
         return count;
     }
+
+    public int getInstanceNum() {
+
+        return instanceMap.size();
+    }
 }
\ No newline at end of file
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 6db94a546b..201318a30f 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -527,6 +527,14 @@ public class TaskManager extends AbstractDaemon {
         return taskMap.get(taskId);
     }
 
+    public int getInstanceNum() {
+        int num = 0;
+        for (Task task : taskMap.values()) {
+            num += task.getInstanceNum();
+        }
+        return num;
+    }
+
     public TaskProfile getTaskProfile(String taskId) {
         return taskStore.getTask(taskId);
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 92bfaa427f..288878a43b 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -275,8 +275,8 @@ public class ProxySink extends AbstractSink {
         }
         MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
lenToRelease);
         if (info != null) {
-            LOGGER.info("save offset {} taskId {} instanceId {}", 
info.getOffset(), profile.getTaskId(),
-                    profile.getInstanceId());
+            LOGGER.info("save offset {} taskId {} instanceId {} ackInfoList 
{}", info.getOffset(), profile.getTaskId(),
+                    profile.getInstanceId(), ackInfoList.size());
             OffsetProfile offsetProfile = new 
OffsetProfile(profile.getTaskId(), profile.getInstanceId(),
                     info.getOffset(), profile.get(INODE_INFO));
             offsetManager.setOffset(offsetProfile);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
index 3195ee45c2..a310bfcdba 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
@@ -120,7 +120,7 @@ public class Sender {
         totalAsyncBufSize = profile
                 .getInt(
                         CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
-                        CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+                        
CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB);
         aliveConnectionNum = profile
                 .getInt(
                         CommonConstants.PROXY_ALIVE_CONNECTION_NUM, 
CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
@@ -203,7 +203,7 @@ public class Sender {
     private void createMessageSender() throws Exception {
         TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(
                 managerAddr, inlongGroupId, authSecretId, authSecretKey);
-        proxyClientConfig.setMaxInFlightSizeInKb(totalAsyncBufSize / 1024);
+        proxyClientConfig.setMaxInFlightSizeInKb(totalAsyncBufSize);
         proxyClientConfig.setAliveConnections(aliveConnectionNum);
         proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);
         proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index ef8107c68e..780a99b844 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -60,7 +60,7 @@ public abstract class AbstractTask extends Task {
         this.taskProfile = taskProfile;
         this.basicStore = basicStore;
         auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION));
-        instanceManager = new InstanceManager(taskProfile.getTaskId(), 
getInstanceLimit(),
+        instanceManager = new InstanceManager(taskManager, 
taskProfile.getTaskId(), getInstanceLimit(),
                 basicStore, taskManager.getTaskStore());
         try {
             instanceManager.start();
@@ -163,4 +163,9 @@ public abstract class AbstractTask extends Task {
     protected boolean isFull() {
         return instanceManager.isFull();
     }
+
+    @Override
+    public int getInstanceNum() {
+        return instanceManager.getInstanceNum();
+    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
index 8f333b8d46..deecc120f3 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
@@ -56,6 +56,11 @@ public class CronTask extends Task {
         return true;
     }
 
+    @Override
+    public int getInstanceNum() {
+        return 0;
+    }
+
     @Override
     public void addCallbacks() {
 
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
index 4340bd6176..172387bce2 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
@@ -83,7 +83,7 @@ public class TestInstanceManager {
         Store taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
         TaskStore taskStore = new TaskStore(taskBasicStore);
         taskStore.storeTask(taskProfile);
-        manager = new InstanceManager("1", 20, basicInstanceStore, taskStore);
+        manager = new InstanceManager(null, "1", 20, basicInstanceStore, 
taskStore);
         manager.CORE_THREAD_SLEEP_TIME_MS = 100;
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java
index acfce5bb9a..32faf568d2 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java
@@ -62,6 +62,11 @@ public class MockTask extends Task {
         return true;
     }
 
+    @Override
+    public int getInstanceNum() {
+        return 0;
+    }
+
     @Override
     public void addCallbacks() {
 

Reply via email to