This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 71dd41a035 [INLONG-9700][Agent] Optimize the message ack logic to
reduce semaphore competition (#9701)
71dd41a035 is described below
commit 71dd41a035bec56035f3f60a58b091f820914197
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Feb 20 16:05:40 2024 +0800
[INLONG-9700][Agent] Optimize the message ack logic to reduce semaphore
competition (#9701)
---
.../main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 596c3bb411..a622c85598 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -137,7 +137,7 @@ public class ProxySink extends AbstractSink {
return () -> {
AgentThreadFactory.nameThread(
"flushCache-" + profile.getTaskId() + "-" +
profile.getInstanceId());
- LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName);
+ LOGGER.info("start flush cache {}:{} flush interval {}",
inlongGroupId, sourceName, batchFlushInterval);
running = true;
while (!shutdown) {
sendMessageFromCache();
@@ -249,14 +249,16 @@ public class ProxySink extends AbstractSink {
private void doFlushOffset() {
packageAckInfoLock.writeLock().lock();
OffsetAckInfo info = null;
+ int lenToRelease = 0;
for (int i = 0; i < ackInfoList.size();) {
if (ackInfoList.get(i).getHasAck()) {
info = ackInfoList.remove(i);
-
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen());
+ lenToRelease += info.getLen();
} else {
break;
}
}
+ MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT,
lenToRelease);
if (info != null) {
LOGGER.info("save offset {} taskId {} instanceId {}",
info.getOffset(), profile.getTaskId(),
profile.getInstanceId());