This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 92d7774915 [INLONG-11760][Agent] Increase the number of global
instances control (#11761)
92d7774915 is described below
commit 92d7774915864688f8e26b009069f5165f0733a1
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Feb 14 10:36:11 2025 +0800
[INLONG-11760][Agent] Increase the number of global instances control
(#11761)
---
.../inlong/agent/constant/AgentConstants.java | 2 ++
.../inlong/agent/constant/CommonConstants.java | 2 +-
.../org/apache/inlong/agent/plugin/file/Task.java | 2 ++
.../apache/inlong/agent/core/HeartbeatManager.java | 2 +-
.../inlong/agent/core/instance/InstanceManager.java | 21 +++++++++++++++++++--
.../apache/inlong/agent/core/task/TaskManager.java | 8 ++++++++
.../apache/inlong/agent/plugin/sinks/ProxySink.java | 4 ++--
.../apache/inlong/agent/plugin/sinks/Sender.java | 4 ++--
.../inlong/agent/plugin/task/AbstractTask.java | 7 ++++++-
.../apache/inlong/agent/plugin/task/CronTask.java | 5 +++++
.../agent/plugin/instance/TestInstanceManager.java | 2 +-
.../apache/inlong/agent/plugin/task/MockTask.java | 5 +++++
12 files changed, 54 insertions(+), 10 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 0c73c12852..45ba789250 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -81,6 +81,8 @@ public class AgentConstants {
public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2";
public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10";
public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600";
+ public static final String AGENT_INSTANCE_LIMIT = "agent.instance.limit";
+ public static final int DEFAULT_AGENT_INSTANCE_LIMIT = 100;
// pulsar sink config
public static final String PULSAR_CLIENT_IO_TREHAD_NUM =
"agent.sink.pulsar.client.io.thread.num";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 757db41afd..db0e509718 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -29,7 +29,7 @@ public class CommonConstants {
public static final String DEFAULT_PROXY_INLONG_STREAM_ID =
"default_inlong_stream_id";
public static final String PROXY_TOTAL_ASYNC_PROXY_SIZE =
"proxy.total.async.proxy.size";
- public static final int DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE = 200 * 1024
* 1024;
+ public static final int DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB = 200 *
1024;
public static final String PROXY_ALIVE_CONNECTION_NUM =
"proxy.alive.connection.num";
public static final int DEFAULT_PROXY_ALIVE_CONNECTION_NUM = 10;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
index 40ad855db7..ea1577917c 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
@@ -54,4 +54,6 @@ public abstract class Task extends AbstractStateWrapper {
* is profile valid
*/
public abstract boolean isProfileValid(TaskProfile profile);
+
+ public abstract int getInstanceNum();
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 82cd5cb648..d1d759cd0a 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -202,7 +202,7 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
proxyClientConfig = new TcpMsgSenderConfig(managerAddr,
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
proxyClientConfig.setMaxInFlightSizeInKb(
- CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE /
1024);
+ CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB);
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setNettyWorkerThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setRequestTimeoutMs(30000L);
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 23c16cabe8..65a584fa8a 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -22,6 +22,8 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.store.InstanceStore;
@@ -81,6 +83,8 @@ public class InstanceManager extends AbstractDaemon {
private long auditVersion;
private volatile boolean running = false;
private final double reserveCoefficient = 0.8;
+ protected TaskManager taskManager;
+ private final int globalInstanceLimit;
private class InstancePrintStat {
@@ -118,7 +122,9 @@ public class InstanceManager extends AbstractDaemon {
/**
* Init task manager.
*/
- public InstanceManager(String taskId, int instanceLimit, Store basicStore,
TaskStore taskStore) {
+ public InstanceManager(TaskManager taskManager, String taskId, int
instanceLimit, Store basicStore,
+ TaskStore taskStore) {
+ this.taskManager = taskManager;
this.taskId = taskId;
instanceStore = new InstanceStore(basicStore);
this.taskStore = taskStore;
@@ -127,6 +133,8 @@ public class InstanceManager extends AbstractDaemon {
this.instanceLimit = instanceLimit;
actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
addActionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
+ globalInstanceLimit =
agentConf.getInt(AgentConstants.AGENT_INSTANCE_LIMIT,
+ AgentConstants.DEFAULT_AGENT_INSTANCE_LIMIT);
}
public String getTaskId() {
@@ -292,8 +300,12 @@ public class InstanceManager extends AbstractDaemon {
private void dealWithAddActionQueue() {
while (isRunnable()) {
+ if (taskManager != null && taskManager.getInstanceNum() >
globalInstanceLimit) {
+ LOGGER.error("global instance num {} over limit {}",
taskManager.getInstanceNum(), globalInstanceLimit);
+ return;
+ }
if (instanceMap.size() > instanceLimit) {
- LOGGER.error("instanceMap size {} over limit {}",
instanceMap.size(), instanceLimit);
+ LOGGER.error("task {} instanceMap size {} over limit {}",
taskId, instanceMap.size(), instanceLimit);
return;
}
InstanceAction action = addActionQueue.poll();
@@ -507,4 +519,9 @@ public class InstanceManager extends AbstractDaemon {
}
return count;
}
+
+ public int getInstanceNum() {
+
+ return instanceMap.size();
+ }
}
\ No newline at end of file
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 6db94a546b..201318a30f 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -527,6 +527,14 @@ public class TaskManager extends AbstractDaemon {
return taskMap.get(taskId);
}
+ public int getInstanceNum() {
+ int num = 0;
+ for (Task task : taskMap.values()) {
+ num += task.getInstanceNum();
+ }
+ return num;
+ }
+
public TaskProfile getTaskProfile(String taskId) {
return taskStore.getTask(taskId);
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 92bfaa427f..288878a43b 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -275,8 +275,8 @@ public class ProxySink extends AbstractSink {
}
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT,
lenToRelease);
if (info != null) {
- LOGGER.info("save offset {} taskId {} instanceId {}",
info.getOffset(), profile.getTaskId(),
- profile.getInstanceId());
+ LOGGER.info("save offset {} taskId {} instanceId {} ackInfoList
{}", info.getOffset(), profile.getTaskId(),
+ profile.getInstanceId(), ackInfoList.size());
OffsetProfile offsetProfile = new
OffsetProfile(profile.getTaskId(), profile.getInstanceId(),
info.getOffset(), profile.get(INODE_INFO));
offsetManager.setOffset(offsetProfile);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
index 3195ee45c2..a310bfcdba 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
@@ -120,7 +120,7 @@ public class Sender {
totalAsyncBufSize = profile
.getInt(
CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
- CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+
CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB);
aliveConnectionNum = profile
.getInt(
CommonConstants.PROXY_ALIVE_CONNECTION_NUM,
CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
@@ -203,7 +203,7 @@ public class Sender {
private void createMessageSender() throws Exception {
TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(
managerAddr, inlongGroupId, authSecretId, authSecretKey);
- proxyClientConfig.setMaxInFlightSizeInKb(totalAsyncBufSize / 1024);
+ proxyClientConfig.setMaxInFlightSizeInKb(totalAsyncBufSize);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);
proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index ef8107c68e..780a99b844 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -60,7 +60,7 @@ public abstract class AbstractTask extends Task {
this.taskProfile = taskProfile;
this.basicStore = basicStore;
auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION));
- instanceManager = new InstanceManager(taskProfile.getTaskId(),
getInstanceLimit(),
+ instanceManager = new InstanceManager(taskManager,
taskProfile.getTaskId(), getInstanceLimit(),
basicStore, taskManager.getTaskStore());
try {
instanceManager.start();
@@ -163,4 +163,9 @@ public abstract class AbstractTask extends Task {
protected boolean isFull() {
return instanceManager.isFull();
}
+
+ @Override
+ public int getInstanceNum() {
+ return instanceManager.getInstanceNum();
+ }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
index 8f333b8d46..deecc120f3 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
@@ -56,6 +56,11 @@ public class CronTask extends Task {
return true;
}
+ @Override
+ public int getInstanceNum() {
+ return 0;
+ }
+
@Override
public void addCallbacks() {
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
index 4340bd6176..172387bce2 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
@@ -83,7 +83,7 @@ public class TestInstanceManager {
Store taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
TaskStore taskStore = new TaskStore(taskBasicStore);
taskStore.storeTask(taskProfile);
- manager = new InstanceManager("1", 20, basicInstanceStore, taskStore);
+ manager = new InstanceManager(null, "1", 20, basicInstanceStore,
taskStore);
manager.CORE_THREAD_SLEEP_TIME_MS = 100;
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java
index acfce5bb9a..32faf568d2 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockTask.java
@@ -62,6 +62,11 @@ public class MockTask extends Task {
return true;
}
+ @Override
+ public int getInstanceNum() {
+ return 0;
+ }
+
@Override
public void addCallbacks() {