This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 55f639537d [INLONG-9784][Audit] Optimize sending memory management 
when the audit-proxy config is null (#9786)
55f639537d is described below

commit 55f639537d30572a7cdbce7cec84e4fdf3c4d416
Author: doleyzi <[email protected]>
AuthorDate: Wed Mar 6 21:47:11 2024 +0800

    [INLONG-9784][Audit] Optimize sending memory management when the 
audit-proxy config is null (#9786)
---
 .../main/java/org/apache/inlong/audit/send/SenderGroup.java  | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
index 4443e58c2f..40e287bd77 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
@@ -81,12 +81,17 @@ public class SenderGroup {
      * @return
      */
     public SenderResult send(ByteBuf dataBuf) {
+        if (dataBuf == null) {
+            return new SenderResult("dataBuf is null", 0, false);
+        }
         LinkedBlockingQueue<SenderChannel> channels = 
channelGroups.get(mIndex);
         SenderChannel channel = null;
+        boolean dataBufReleased = false;
         try {
             if (channels.size() <= 0) {
                 LOG.error("channels is empty");
                 dataBuf.release();
+                dataBufReleased = true;
                 return new SenderResult("channels is empty", 0, false);
             }
             boolean isOk = false;
@@ -133,6 +138,7 @@ public class SenderGroup {
             if (channel == null) {
                 LOG.error("can not get a channel");
                 dataBuf.release();
+                dataBufReleased = true;
                 return new SenderResult("can not get a channel", 0, false);
             }
 
@@ -145,8 +151,10 @@ public class SenderGroup {
                     }
                     t = 
channel.getChannel().writeAndFlush(dataBuf).sync().await();
                 }
+                dataBufReleased = true;
             } else {
                 dataBuf.release();
+                dataBufReleased = true;
             }
             return new SenderResult(channel.getAddr().getHostString(), 
channel.getAddr().getPort(), t.isSuccess());
         } catch (Throwable ex) {
@@ -155,8 +163,12 @@ public class SenderGroup {
             return new SenderResult("127.0.0.1", 0, false);
         } finally {
             if (channel != null) {
+                channel.release();
                 channels.offer(channel);
             }
+            if (!dataBufReleased) {
+                dataBuf.release();
+            }
         }
     }
 

Reply via email to