This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 55f639537d [INLONG-9784][Audit] Optimize sending memory management
when the audit-proxy config is null (#9786)
55f639537d is described below
commit 55f639537d30572a7cdbce7cec84e4fdf3c4d416
Author: doleyzi <[email protected]>
AuthorDate: Wed Mar 6 21:47:11 2024 +0800
[INLONG-9784][Audit] Optimize sending memory management when the
audit-proxy config is null (#9786)
---
.../main/java/org/apache/inlong/audit/send/SenderGroup.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
index 4443e58c2f..40e287bd77 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
@@ -81,12 +81,17 @@ public class SenderGroup {
* @return
*/
public SenderResult send(ByteBuf dataBuf) {
+ if (dataBuf == null) {
+ return new SenderResult("dataBuf is null", 0, false);
+ }
LinkedBlockingQueue<SenderChannel> channels =
channelGroups.get(mIndex);
SenderChannel channel = null;
+ boolean dataBufReleased = false;
try {
if (channels.size() <= 0) {
LOG.error("channels is empty");
dataBuf.release();
+ dataBufReleased = true;
return new SenderResult("channels is empty", 0, false);
}
boolean isOk = false;
@@ -133,6 +138,7 @@ public class SenderGroup {
if (channel == null) {
LOG.error("can not get a channel");
dataBuf.release();
+ dataBufReleased = true;
return new SenderResult("can not get a channel", 0, false);
}
@@ -145,8 +151,10 @@ public class SenderGroup {
}
t =
channel.getChannel().writeAndFlush(dataBuf).sync().await();
}
+ dataBufReleased = true;
} else {
dataBuf.release();
+ dataBufReleased = true;
}
return new SenderResult(channel.getAddr().getHostString(),
channel.getAddr().getPort(), t.isSuccess());
} catch (Throwable ex) {
@@ -155,8 +163,12 @@ public class SenderGroup {
return new SenderResult("127.0.0.1", 0, false);
} finally {
if (channel != null) {
+ channel.release();
channels.offer(channel);
}
+ if (!dataBufReleased) {
+ dataBuf.release();
+ }
}
}