This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ec4c5789c7 [INLONG-11562][Agent] Modify the MemoryManager class to
support adding semaphore (#11563)
ec4c5789c7 is described below
commit ec4c5789c746c3c3b86c55c2389a43c7a9bb66fe
Author: justinwwhuang <[email protected]>
AuthorDate: Sun Dec 1 16:09:17 2024 +0800
[INLONG-11562][Agent] Modify the MemoryManager class to support adding
semaphore (#11563)
---
.../inlong/agent/core/task/MemoryManager.java | 25 ++++++++++++++++++++++
1 file changed, 25 insertions(+)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
index d785effdac..b262e171e9 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
@@ -23,6 +23,8 @@ import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
@@ -44,6 +46,7 @@ public class MemoryManager {
private ConcurrentHashMap<String, Semaphore> semaphoreMap = new
ConcurrentHashMap<>();
private ConcurrentHashMap<String, Long> lastPrintTime = new
ConcurrentHashMap<>();
private static final int PRINT_INTERVAL_MS = 1000;
+ private Set<String> defaultSemaphoreTypes = new HashSet<>();
private MemoryManager() {
this.conf = AgentConfiguration.getAgentConf();
@@ -52,16 +55,19 @@ public class MemoryManager {
conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT,
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT));
semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore);
lastPrintTime.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, 0L);
+ defaultSemaphoreTypes.add(AGENT_GLOBAL_READER_SOURCE_PERMIT);
semaphore = new Semaphore(
conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT,
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT));
semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore);
lastPrintTime.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, 0L);
+ defaultSemaphoreTypes.add(AGENT_GLOBAL_READER_QUEUE_PERMIT);
semaphore = new Semaphore(
conf.getInt(AGENT_GLOBAL_WRITER_PERMIT,
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT));
semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore);
lastPrintTime.put(AGENT_GLOBAL_WRITER_PERMIT, 0L);
+ defaultSemaphoreTypes.add(AGENT_GLOBAL_WRITER_PERMIT);
}
/**
@@ -78,6 +84,20 @@ public class MemoryManager {
return memoryManager;
}
+ public void addSemaphore(String semaphoreType, int permit) {
+ if (semaphoreMap.containsKey(semaphoreType)) {
+ return;
+ }
+ synchronized (MemoryManager.class) {
+ if (semaphoreMap.containsKey(semaphoreType)) {
+ return;
+ }
+ Semaphore semaphore = new Semaphore(permit);
+ semaphoreMap.put(semaphoreType, semaphore);
+ lastPrintTime.put(semaphoreType, 0L);
+ }
+ }
+
public boolean tryAcquire(String semaphoreName, int permit) {
Semaphore semaphore = semaphoreMap.get(semaphoreName);
if (semaphore == null) {
@@ -123,5 +143,10 @@ public class MemoryManager {
printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT, "printAll");
printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT, "printAll");
printDetail(AGENT_GLOBAL_WRITER_PERMIT, "printAll");
+ semaphoreMap.entrySet().forEach(entry -> {
+ if (!defaultSemaphoreTypes.contains(entry.getKey())) {
+ printDetail(entry.getKey(), "printAll");
+ }
+ });
}
}