This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 71dd41a035 [INLONG-9700][Agent] Optimize the message ack logic to 
reduce semaphore competition (#9701)
71dd41a035 is described below

commit 71dd41a035bec56035f3f60a58b091f820914197
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Feb 20 16:05:40 2024 +0800

    [INLONG-9700][Agent] Optimize the message ack logic to reduce semaphore 
competition (#9701)
---
 .../main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 596c3bb411..a622c85598 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -137,7 +137,7 @@ public class ProxySink extends AbstractSink {
         return () -> {
             AgentThreadFactory.nameThread(
                     "flushCache-" + profile.getTaskId() + "-" + 
profile.getInstanceId());
-            LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName);
+            LOGGER.info("start flush cache {}:{} flush interval {}", 
inlongGroupId, sourceName, batchFlushInterval);
             running = true;
             while (!shutdown) {
                 sendMessageFromCache();
@@ -249,14 +249,16 @@ public class ProxySink extends AbstractSink {
     private void doFlushOffset() {
         packageAckInfoLock.writeLock().lock();
         OffsetAckInfo info = null;
+        int lenToRelease = 0;
         for (int i = 0; i < ackInfoList.size();) {
             if (ackInfoList.get(i).getHasAck()) {
                 info = ackInfoList.remove(i);
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen());
+                lenToRelease += info.getLen();
             } else {
                 break;
             }
         }
+        MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
lenToRelease);
         if (info != null) {
             LOGGER.info("save offset {} taskId {} instanceId {}", 
info.getOffset(), profile.getTaskId(),
                     profile.getInstanceId());

Reply via email to