This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 293ab1ab3e [INLONG-9766][Audit] Support user-defined SocketAddress 
loader getting AuditProxy (#9767)
293ab1ab3e is described below

commit 293ab1ab3e539e9c46f1740cbebbd0b6f623d05a
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Mar 6 12:00:51 2024 +0800

    [INLONG-9766][Audit] Support user-defined SocketAddress loader getting 
AuditProxy (#9767)
---
 .../org/apache/inlong/audit/AuditOperator.java     |  95 ++++++++++---
 .../audit/DefaultISocketAddressListLoader.java     |  63 +++++++++
 .../inlong/audit/DnsSocketAddressListLoader.java   |  77 +++++++++++
 ...derResult.java => SocketAddressListLoader.java} |  36 ++---
 .../apache/inlong/audit/send/SenderChannel.java    |  76 ++++++++---
 .../org/apache/inlong/audit/send/SenderGroup.java  | 134 +++++++++++--------
 .../apache/inlong/audit/send/SenderHandler.java    |  26 ++++
 .../apache/inlong/audit/send/SenderManager.java    |  89 +++++++++----
 .../org/apache/inlong/audit/util/AuditData.java    |  51 ++++++-
 .../java/org/apache/inlong/audit/util/IpPort.java  | 148 ---------------------
 .../org/apache/inlong/audit/util/SenderResult.java |  12 +-
 .../apache/inlong/audit/util/AuditDataTest.java    |   4 +-
 .../org/apache/inlong/audit/util/IpPortTest.java   |  55 --------
 13 files changed, 512 insertions(+), 354 deletions(-)

diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
index e720a75cb4..4ff2436a32 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
@@ -23,6 +23,8 @@ import org.apache.inlong.audit.util.AuditConfig;
 import org.apache.inlong.audit.util.Config;
 import org.apache.inlong.audit.util.StatInfo;
 
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,9 +35,10 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;
@@ -58,24 +61,14 @@ public class AuditOperator implements Serializable {
     private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new 
ConcurrentHashMap<>();
     private final List<String> deleteKeyList = new ArrayList<>();
     private final Config config = new Config();
-    private final Timer timer = new Timer();
     private int packageId = 1;
     private int dataId = 0;
     private boolean initialized = false;
     private SenderManager manager;
 
-    private final TimerTask timerTask = new TimerTask() {
-
-        @Override
-        public void run() {
-            try {
-                send();
-            } catch (Exception e) {
-                LOGGER.error(e.getMessage());
-            }
-        }
-    };
+    private final ScheduledExecutorService timeoutExecutor = 
Executors.newSingleThreadScheduledExecutor();
     private AuditConfig auditConfig = null;
+    private SocketAddressListLoader loader = null;
 
     /**
      * Not support create from outer.
@@ -99,13 +92,71 @@ public class AuditOperator implements Serializable {
             return;
         }
         config.init();
-        timer.schedule(timerTask, PERIOD, PERIOD);
+        timeoutExecutor.scheduleWithFixedDelay(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    loadIpPortList();
+                    send();
+                } catch (Exception e) {
+                    LOGGER.error(e.getMessage());
+                }
+            }
+
+        }, PERIOD, PERIOD, TimeUnit.MILLISECONDS);
         if (auditConfig == null) {
             auditConfig = new AuditConfig();
         }
         this.manager = new SenderManager(auditConfig);
     }
 
+    private void loadIpPortList() {
+        if (loader == null) {
+            return;
+        }
+        try {
+            List<String> ipPortList = loader.loadSocketAddressList();
+            if (ipPortList != null && ipPortList.size() > 0) {
+                HashSet<String> ipPortSet = new HashSet<>();
+                ipPortSet.addAll(ipPortList);
+                this.setAuditProxy(ipPortSet);
+            }
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage());
+        }
+    }
+
+    /**
+     * set loader
+     * @param loader the loader to set
+     */
+    public void setLoader(SocketAddressListLoader loader) {
+        this.loader = loader;
+    }
+
+    /**
+     * setLoaderClass
+     * @param loaderClassName
+     */
+    public void setLoaderClass(String loaderClassName) {
+        if (StringUtils.isEmpty(loaderClassName)) {
+            return;
+        }
+        try {
+            Class<?> loaderClass = ClassUtils.getClass(loaderClassName);
+            Object loaderObject = 
loaderClass.getDeclaredConstructor().newInstance();
+            if (loaderObject instanceof SocketAddressListLoader) {
+                SocketAddressListLoader loader = (SocketAddressListLoader) 
loaderObject;
+                this.loader = loader;
+                LOGGER.info("audit IpPortListLoader loaderClass:{}", 
loaderClassName);
+            }
+        } catch (Throwable t) {
+            LOGGER.error("Fail to init 
IpPortListLoader,loaderClass:{},error:{}",
+                    loaderClassName, t.getMessage(), t);
+        }
+    }
+
     /**
      * Set AuditProxy from the ip
      */
@@ -142,6 +193,16 @@ public class AuditOperator implements Serializable {
     public void add(int auditID, String auditTag, String inlongGroupID, String 
inlongStreamID, Long logTime,
             long count, long size) {
         long delayTime = System.currentTimeMillis() - logTime;
+        add(auditID, auditTag, inlongGroupID, inlongStreamID, logTime, count, 
size, delayTime * count);
+    }
+
+    public void add(int auditID, String inlongGroupID, String inlongStreamID, 
Long logTime, long count, long size,
+            long delayTime) {
+        add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, 
logTime, count, size, delayTime);
+    }
+
+    public void add(int auditID, String auditTag, String inlongGroupID, String 
inlongStreamID, Long logTime,
+            long count, long size, long delayTime) {
         String key = (logTime / PERIOD) + FIELD_SEPARATORS + inlongGroupID + 
FIELD_SEPARATORS
                 + inlongStreamID + FIELD_SEPARATORS + auditID + 
FIELD_SEPARATORS + auditTag;
         addByKey(key, count, size, delayTime);
@@ -156,7 +217,7 @@ public class AuditOperator implements Serializable {
         }
         countMap.get(key).count.addAndGet(count);
         countMap.get(key).size.addAndGet(size);
-        countMap.get(key).delay.addAndGet(delayTime * count);
+        countMap.get(key).delay.addAndGet(delayTime);
     }
 
     /**
@@ -240,7 +301,7 @@ public class AuditOperator implements Serializable {
     private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
         AuditApi.BaseCommand.Builder baseCommand = 
AuditApi.BaseCommand.newBuilder();
         
baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
-        manager.send(baseCommand.build());
+        manager.send(baseCommand.build(), auditRequest);
     }
 
     /**
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java
new file mode 100644
index 0000000000..c6629e6de2
--- /dev/null
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DefaultISocketAddressListLoader
+ */
+public class DefaultISocketAddressListLoader implements 
SocketAddressListLoader {
+
+    public static final String KEY_AUDIT_PROXYS = "audit.proxys";
+
+    private Map<String, String> commonProperties;
+
+    @Override
+    public void setCommonProperties(Map<String, String> commonProperties) {
+        this.commonProperties = commonProperties;
+    }
+
+    /**
+     * loadSocketAddressList
+     * @return
+     */
+    @Override
+    public List<String> loadSocketAddressList() {
+        List<String> ipPortList = new ArrayList<>();
+        String strAuditProxys = commonProperties.get(KEY_AUDIT_PROXYS);
+        if (strAuditProxys == null) {
+            return ipPortList;
+        }
+        String[] ipPorts = strAuditProxys.split("\\s+");
+        for (String tmpIPPort : ipPorts) {
+            if (StringUtils.isBlank(tmpIPPort)) {
+                continue;
+            }
+            ipPortList.add(tmpIPPort.trim());
+        }
+        Collections.sort(ipPortList);
+        return ipPortList;
+    }
+
+}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java
new file mode 100644
index 0000000000..95c9d969d3
--- /dev/null
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DnsIpPortListLoader
+ */
+public class DnsSocketAddressListLoader implements SocketAddressListLoader {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DnsSocketAddressListLoader.class);
+    public static final String KEY_DNS_ADDRESS = "audit.dns.address";
+    public static final String KEY_DNS_PORT = "audit.dns.port";
+
+    private Map<String, String> commonProperties;
+
+    @Override
+    public void setCommonProperties(Map<String, String> commonProperties) {
+        this.commonProperties = commonProperties;
+    }
+
+    @Override
+    public List<String> loadSocketAddressList() {
+        if (commonProperties == null) {
+            return null;
+        }
+        List<String> ipPortList = new ArrayList<>();
+        String dns = commonProperties.get(KEY_DNS_ADDRESS);
+        String dnsPort = commonProperties.get(KEY_DNS_PORT);
+        if (!StringUtils.isEmpty(dns) && !StringUtils.isEmpty(dnsPort)) {
+            try {
+                InetAddress[] addrs = InetAddress.getAllByName(dns);
+                for (InetAddress addr : addrs) {
+                    ipPortList.add(addr.getHostAddress() + ":" + dnsPort);
+                }
+            } catch (Throwable t) {
+                LOG.error(t.getMessage(), t);
+            }
+        }
+        Collections.sort(ipPortList);
+        return ipPortList;
+    }
+
+    public static void main(String[] args) {
+
+        try {
+            InetAddress[] addrs = InetAddress.getAllByName("inlong.woa.com");
+            System.out.println(addrs);
+        } catch (Throwable t) {
+            LOG.error(t.getMessage(), t);
+        }
+    }
+}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java
similarity index 57%
copy from 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
copy to 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java
index 69dcaf8f40..0902964a7d 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java
@@ -15,33 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.util;
+package org.apache.inlong.audit;
 
-public class SenderResult {
+import java.util.List;
+import java.util.Map;
 
-    public final IpPort ipPort;
-    public boolean result;
+/**
+ * SocketAddressListLoader
+ */
+public interface SocketAddressListLoader {
+
+    String KEY_ADDRESS_LOADER = "audit.address.loader";
 
-    /**
-     * Constructor
-     *
-     * @param ipPort
-     * @param result
-     */
-    public SenderResult(IpPort ipPort, boolean result) {
-        this.ipPort = ipPort;
-        this.result = result;
-    }
+    void setCommonProperties(Map<String, String> commonProperties);
 
-    /**
-     * Constructor
-     *
-     * @param sendIp
-     * @param sendPort
-     * @param result
-     */
-    public SenderResult(String sendIp, int sendPort, boolean result) {
-        this.ipPort = new IpPort(sendIp, sendPort);
-        this.result = result;
-    }
+    List<String> loadSocketAddressList();
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
index 3f45294889..8977221ec7 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
@@ -18,17 +18,18 @@
 package org.apache.inlong.audit.send;
 
 import org.apache.inlong.audit.util.EventLoopUtil;
-import org.apache.inlong.audit.util.IpPort;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.util.Attribute;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 
@@ -40,8 +41,9 @@ public class SenderChannel {
     public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
     public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
 
-    private IpPort ipPort;
+    private InetSocketAddress addr;
     private Channel channel;
+    private String channelKey;
     private Semaphore packToken;
     private Bootstrap client;
     private SenderManager senderManager;
@@ -49,10 +51,10 @@ public class SenderChannel {
     /**
      * Constructor
      *
-     * @param ipPort
+     * @param addr
      */
-    public SenderChannel(IpPort ipPort, int maxSynchRequest, SenderManager 
senderManager) {
-        this.ipPort = ipPort;
+    public SenderChannel(InetSocketAddress addr, int maxSynchRequest, 
SenderManager senderManager) {
+        this.addr = addr;
         this.packToken = new Semaphore(maxSynchRequest);
         this.senderManager = senderManager;
     }
@@ -66,6 +68,20 @@ public class SenderChannel {
         return packToken.tryAcquire();
     }
 
+    /**
+     * Try acquire channel
+     *
+     * @return
+     */
+    public boolean acquire() {
+        try {
+            packToken.acquire();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        return true;
+    }
+
     /**
      * release channel
      */
@@ -78,25 +94,23 @@ public class SenderChannel {
      */
     @Override
     public String toString() {
-        return ipPort.key;
+        return addr.toString();
     }
 
     /**
-     * get ipPort
-     *
-     * @return the ipPort
+     * get addr
+     * @return the addr
      */
-    public IpPort getIpPort() {
-        return ipPort;
+    public InetSocketAddress getAddr() {
+        return addr;
     }
 
     /**
-     * set ipPort
-     *
-     * @param ipPort the ipPort to set
+     * set addr
+     * @param addr the addr to set
      */
-    public void setIpPort(IpPort ipPort) {
-        this.ipPort = ipPort;
+    public void setAddr(InetSocketAddress addr) {
+        this.addr = addr;
     }
 
     /**
@@ -115,10 +129,28 @@ public class SenderChannel {
      */
     public void setChannel(Channel channel) {
         this.channel = channel;
+        Attribute<String> attr = this.channel.attr(SenderGroup.CHANNEL_KEY);
+        attr.set(channelKey);
+    }
+
+    /**
+     * get channelKey
+     * @return the channelKey
+     */
+    public String getChannelKey() {
+        return channelKey;
+    }
+
+    /**
+     * set channelKey
+     * @param channelKey the channelKey to set
+     */
+    public void setChannelKey(String channelKey) {
+        this.channelKey = channelKey;
     }
 
     private void init() {
-        ThreadFactory selfDefineFactory = new 
DefaultThreadFactory("audit-client-io",
+        ThreadFactory selfDefineFactory = new 
DefaultThreadFactory("audit-client-io" + Thread.currentThread().getId(),
                 Thread.currentThread().isDaemon());
 
         EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(DEFAULT_SEND_THREADNUM,
@@ -127,8 +159,8 @@ public class SenderChannel {
         client.group(eventLoopGroup);
         
client.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
         client.option(ChannelOption.SO_KEEPALIVE, true);
-        client.option(ChannelOption.TCP_NODELAY, true);
-        client.option(ChannelOption.SO_REUSEADDR, true);
+        client.option(ChannelOption.TCP_NODELAY, false);
+        client.option(ChannelOption.SO_REUSEADDR, false);
         client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
         client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
         client.handler(new ClientPipelineFactory(senderManager));
@@ -149,11 +181,13 @@ public class SenderChannel {
             }
 
             synchronized (client) {
-                ChannelFuture future = client.connect(this.ipPort.addr).sync();
+                ChannelFuture future = client.connect(this.addr).sync();
                 this.channel = future.channel();
+                Attribute<String> attr = 
this.channel.attr(SenderGroup.CHANNEL_KEY);
+                attr.set(channelKey);
             }
         } catch (Throwable e) {
-            LOG.error("connect {} failed. {}", this.getIpPort(), 
e.getMessage());
+            LOG.error("connect {} failed. {}", this.getAddr(), e.getMessage());
             return false;
         }
         return true;
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
index c28e0531ff..4443e58c2f 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
@@ -17,25 +17,28 @@
 
 package org.apache.inlong.audit.send;
 
-import org.apache.inlong.audit.util.IpPort;
 import org.apache.inlong.audit.util.SenderResult;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class SenderGroup {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(SenderGroup.class);
+    public static final Logger LOG = 
LoggerFactory.getLogger(SenderGroup.class);
+    public static final AttributeKey<String> CHANNEL_KEY = 
AttributeKey.newInstance("channelKey");
     // maximum number of sending
     public static final int MAX_SEND_TIMES = 3;
     public static final int DEFAULT_WAIT_TIMES = 10000;
@@ -55,6 +58,8 @@ public class SenderGroup {
 
     private SenderManager senderManager;
 
+    private AtomicLong channelId = new AtomicLong(0);
+
     /**
      * constructor
      *
@@ -80,18 +85,17 @@ public class SenderGroup {
         SenderChannel channel = null;
         try {
             if (channels.size() <= 0) {
-                logger.error("channels is empty");
+                LOG.error("channels is empty");
                 dataBuf.release();
                 return new SenderResult("channels is empty", 0, false);
             }
             boolean isOk = false;
+            // tryAcquire
             for (int tryIndex = 0; tryIndex < MAX_SEND_TIMES; tryIndex++) {
-                int random = RANDOM_MIN + (int) (Math.random() * 
(channels.size() - RANDOM_MIN));
-                channels = channelGroups.get(mIndex);
                 for (int i = 0; i < channels.size(); i++) {
                     channel = channels.poll();
                     if (channel.tryAcquire()) {
-                        if (random == i && channel.connect()) {
+                        if (channel.connect()) {
                             isOk = true;
                             break;
                         }
@@ -108,11 +112,26 @@ public class SenderGroup {
                 try {
                     Thread.sleep(waitChannelIntervalMs);
                 } catch (Throwable e) {
-                    logger.error(e.getMessage());
+                    LOG.error(e.getMessage());
                 }
             }
+            // acquire
             if (channel == null) {
-                logger.error("can not get a channel");
+                for (int i = 0; i < channels.size(); i++) {
+                    channel = channels.poll();
+                    if (!channel.connect()) {
+                        channels.offer(channel);
+                        channel = null;
+                        continue;
+                    }
+                    if (channel.acquire()) {
+                        break;
+                    }
+                }
+            }
+            // error
+            if (channel == null) {
+                LOG.error("can not get a channel");
                 dataBuf.release();
                 return new SenderResult("can not get a channel", 0, false);
             }
@@ -129,39 +148,27 @@ public class SenderGroup {
             } else {
                 dataBuf.release();
             }
-            return new SenderResult(channel.getIpPort().ip, 
channel.getIpPort().port, t.isSuccess());
+            return new SenderResult(channel.getAddr().getHostString(), 
channel.getAddr().getPort(), t.isSuccess());
         } catch (Throwable ex) {
-            logger.error(ex.getMessage());
+            LOG.error(ex.getMessage(), ex);
             this.setHasSendError(true);
-            return new SenderResult(ex.getMessage(), 0, false);
+            return new SenderResult("127.0.0.1", 0, false);
         } finally {
             if (channel != null) {
-                channel.release();
                 channels.offer(channel);
             }
         }
     }
 
-    /**
-     * release channel
-     */
-    public void release(String ipPort) {
-        SenderChannel channel = this.totalChannels.get(ipPort);
-        if (channel != null) {
-            channel.release();
+    public void release(Channel channel) {
+        Attribute<String> attr = channel.attr(CHANNEL_KEY);
+        String key = attr.get();
+        if (key == null) {
+            return;
         }
-    }
-
-    /**
-     * release channel
-     */
-    public void release(InetSocketAddress addr) {
-        String destIp = addr.getHostName();
-        int destPort = addr.getPort();
-        String ipPort = IpPort.getIpPortKey(destIp, destPort);
-        SenderChannel channel = this.totalChannels.get(ipPort);
-        if (channel != null) {
-            channel.release();
+        SenderChannel senderChannel = this.totalChannels.get(key);
+        if (senderChannel != null) {
+            senderChannel.release();
         }
     }
 
@@ -170,47 +177,64 @@ public class SenderGroup {
      *
      * @param ipLists
      */
-    public void updateConfig(Set<String> ipLists) {
+    public void updateConfig(List<String> ipLists) {
         try {
             for (SenderChannel dc : deleteChannels) {
-                dc.getChannel().disconnect();
-                dc.getChannel().close();
+                if (dc.getChannel() != null) {
+                    try {
+                        dc.getChannel().disconnect();
+                        dc.getChannel().close();
+                    } catch (Exception e) {
+                        LOG.error(e.getMessage(), e);
+                    }
+                }
             }
             deleteChannels.clear();
             int newIndex = mIndex ^ 0x01;
             LinkedBlockingQueue<SenderChannel> newChannels = 
this.channelGroups.get(newIndex);
             newChannels.clear();
+            List<String> waitingDeleteChannelKey = new 
ArrayList<>(totalChannels.size());
+            waitingDeleteChannelKey.addAll(totalChannels.keySet());
             for (String ipPort : ipLists) {
-                SenderChannel channel = totalChannels.get(ipPort);
-                if (channel != null) {
-                    newChannels.add(channel);
-                    continue;
-                }
                 try {
-                    IpPort ipPortObj = IpPort.parseIpPort(ipPort);
-                    if (ipPortObj == null) {
+                    InetSocketAddress addr = parseAddress(ipPort);
+                    if (addr == null) {
                         continue;
                     }
-                    channel = new SenderChannel(ipPortObj, maxSynchRequest, 
senderManager);
+                    String key = String.valueOf(channelId.getAndIncrement());
+                    SenderChannel channel = new SenderChannel(addr, 
maxSynchRequest, senderManager);
+                    channel.setChannelKey(key);
                     newChannels.add(channel);
-                    totalChannels.put(ipPort, channel);
+                    totalChannels.put(key, channel);
                 } catch (Exception e) {
-                    logger.error(e.getMessage());
+                    LOG.error(e.getMessage(), e);
                 }
             }
 
-            for (Entry<String, SenderChannel> entry : 
totalChannels.entrySet()) {
-                if (!ipLists.contains(entry.getKey())) {
-                    deleteChannels.add(entry.getValue());
-                }
-            }
-            for (SenderChannel dc : deleteChannels) {
-                totalChannels.remove(dc.getIpPort().key);
-            }
+            waitingDeleteChannelKey.forEach(v -> 
deleteChannels.add(totalChannels.remove(v)));
             this.mIndex = newIndex;
         } catch (Throwable e) {
-            logger.error("Update Sender Ip Failed." + e.getMessage());
+            LOG.error("Update Sender Ip Failed." + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * parseAddress
+     * 
+     * @param  InetSocketAddress
+     * @return
+     */
+    private static InetSocketAddress parseAddress(String ipPort) {
+        String[] splits = ipPort.split(":");
+        if (splits.length == 2) {
+            String strIp = splits[0];
+            String strPort = splits[1];
+            int port = NumberUtils.toInt(strPort, 0);
+            if (port > 0) {
+                return new InetSocketAddress(strIp, port);
+            }
         }
+        return null;
     }
 
     /**
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
index e2f8fc5b90..c5f5c481e9 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
@@ -17,11 +17,14 @@
 
 package org.apache.inlong.audit.send;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+
 public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SenderHandler.class);
@@ -40,6 +43,7 @@ public class SenderHandler extends 
SimpleChannelInboundHandler<byte[]> {
     @Override
     public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, 
byte[] e) {
         try {
+            manager.release(ctx.channel());
             manager.onMessageReceived(ctx, e);
         } catch (Throwable ex) {
             LOGGER.error("channelRead0 error: ", ex);
@@ -52,6 +56,7 @@ public class SenderHandler extends 
SimpleChannelInboundHandler<byte[]> {
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
         try {
+            manager.release(ctx.channel());
             manager.onExceptionCaught(ctx, e);
         } catch (Throwable ex) {
             LOGGER.error("caught exception: ", ex);
@@ -64,6 +69,7 @@ public class SenderHandler extends 
SimpleChannelInboundHandler<byte[]> {
     @Override
     public void channelInactive(ChannelHandlerContext ctx) {
         try {
+            manager.release(ctx.channel());
             super.channelInactive(ctx);
         } catch (Throwable ex) {
             LOGGER.error("channelInactive error: ", ex);
@@ -76,9 +82,29 @@ public class SenderHandler extends 
SimpleChannelInboundHandler<byte[]> {
     @Override
     public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
         try {
+            manager.release(ctx.channel());
             super.channelUnregistered(ctx);
         } catch (Throwable ex) {
             LOGGER.error("channelUnregistered error: ", ex);
         }
     }
+
+    /**
+     * parseInetSocketAddress
+     * 
+     * @param  channel
+     * @return
+     */
+    public static InetSocketAddress parseInetSocketAddress(Channel channel) {
+        InetSocketAddress destAddr = null;
+        if (channel.remoteAddress() instanceof InetSocketAddress) {
+            destAddr = (InetSocketAddress) channel.remoteAddress();
+        } else if (channel.remoteAddress() != null) {
+            String sendIp = channel.remoteAddress().toString();
+            destAddr = new InetSocketAddress(sendIp, 0);
+        } else {
+            destAddr = new InetSocketAddress("127.0.0.1", 0);
+        }
+        return destAddr;
+    }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index 357711edeb..4b85c54ec3 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -24,6 +24,7 @@ import org.apache.inlong.audit.util.SenderResult;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -50,17 +52,19 @@ public class SenderManager {
     public static final Long MAX_REQUEST_ID = 1000000000L;
     public static final int ALL_CONNECT_CHANNEL = -1;
     public static final int DEFAULT_CONNECT_CHANNEL = 2;
-    private static final Logger logger = 
LoggerFactory.getLogger(SenderManager.class);
+    public static final Logger LOG = 
LoggerFactory.getLogger(SenderManager.class);
     private static final int SEND_INTERVAL_MS = 20;
     private final SecureRandom sRandom = new 
SecureRandom(Long.toString(System.currentTimeMillis()).getBytes());
     private final AtomicLong requestIdSeq = new AtomicLong(0L);
     private final ConcurrentHashMap<Long, AuditData> dataMap = new 
ConcurrentHashMap<>();
+    private final LinkedBlockingQueue<Long> requestIdQueue = new 
LinkedBlockingQueue<>();
 
     private SenderGroup sender;
     private int maxConnectChannels = ALL_CONNECT_CHANNEL;
     // IPList
-    private HashSet<String> currentIpPorts = new HashSet<>();
+    private List<String> currentIpPorts = new ArrayList<>();
     private AuditConfig auditConfig;
+    private long lastCheckTime = System.currentTimeMillis();
 
     /**
      * Constructor
@@ -78,7 +82,7 @@ public class SenderManager {
             this.maxConnectChannels = maxConnectChannels;
             this.sender = new SenderGroup(this);
         } catch (Exception ex) {
-            logger.error(ex.getMessage());
+            LOG.error(ex.getMessage(), ex);
         }
     }
 
@@ -90,7 +94,9 @@ public class SenderManager {
             return;
         }
         this.sender.setHasSendError(false);
-        this.currentIpPorts = ipPortList;
+        List<String> newIpPorts = new ArrayList<>();
+        newIpPorts.addAll(ipPortList);
+        this.currentIpPorts = newIpPorts;
         int ipSize = ipPortList.size();
         int needNewSize;
         if (this.maxConnectChannels == ALL_CONNECT_CHANNEL || 
this.maxConnectChannels >= ipSize) {
@@ -99,7 +105,7 @@ public class SenderManager {
             needNewSize = maxConnectChannels;
         }
 
-        HashSet<String> updateConfigIpLists = new HashSet<>();
+        List<String> updateConfigIpLists = new ArrayList<>();
         List<String> availableIpLists = new ArrayList<>(ipPortList);
         for (int i = 0; i < needNewSize; i++) {
             int availableIpSize = availableIpLists.size();
@@ -107,6 +113,7 @@ public class SenderManager {
             String ipPort = availableIpLists.remove(newIpPortIndex);
             updateConfigIpLists.add(ipPort);
         }
+        LOG.info("needNewSize:{},updateConfigIpLists:{}", needNewSize, 
updateConfigIpLists);
         if (updateConfigIpLists.size() > 0) {
             this.sender.updateConfig(updateConfigIpLists);
         }
@@ -127,11 +134,29 @@ public class SenderManager {
     /**
      * Send data with command
      */
-    public void send(AuditApi.BaseCommand baseCommand) {
-        AuditData data = new AuditData(baseCommand);
+    public void send(AuditApi.BaseCommand baseCommand, AuditApi.AuditRequest 
auditRequest) {
+        AuditData data = new AuditData(baseCommand, auditRequest);
         // cache first
-        this.dataMap.putIfAbsent(baseCommand.getAuditRequest().getRequestId(), 
data);
+        Long requestId = baseCommand.getAuditRequest().getRequestId();
+        this.dataMap.putIfAbsent(requestId, data);
+        requestIdQueue.offer(requestId);
         this.sendData(data.getDataByte());
+        // resend
+        long newTime = System.currentTimeMillis() - 10000;
+        if (newTime > lastCheckTime) {
+            for (int i = 0; i < requestIdQueue.size(); i++) {
+                Long current = requestIdQueue.poll();
+                AuditData auditData = this.dataMap.get(current);
+                if (auditData == null) {
+                    continue;
+                } else {
+                    requestIdQueue.offer(current);
+                    if (newTime > auditData.getSendTime()) {
+                        this.sendData(auditData.getDataByte());
+                    }
+                }
+            }
+        }
     }
 
     /**
@@ -139,7 +164,7 @@ public class SenderManager {
      */
     private void sendData(byte[] data) {
         if (data == null || data.length <= 0) {
-            logger.warn("send data is empty!");
+            LOG.warn("send data is empty!");
             return;
         }
         ByteBuf dataBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
@@ -154,7 +179,7 @@ public class SenderManager {
      * Clean up the backlog of unsent message packets
      */
     public void clearBuffer() {
-        logger.info("audit failed cache size: {}", this.dataMap.size());
+        LOG.info("audit failed cache size: {}", this.dataMap.size());
         for (AuditData data : this.dataMap.values()) {
             this.sendData(data.getDataByte());
             this.sleep();
@@ -163,7 +188,7 @@ public class SenderManager {
             checkAuditFile();
         }
         if (this.dataMap.size() > auditConfig.getMaxCacheRow()) {
-            logger.info("failed cache size: {}>{}", this.dataMap.size(), 
auditConfig.getMaxCacheRow());
+            LOG.info("failed cache size: {}>{}", this.dataMap.size(), 
auditConfig.getMaxCacheRow());
             writeLocalFile();
             this.dataMap.clear();
         }
@@ -180,10 +205,10 @@ public class SenderManager {
             File file = new File(auditConfig.getDisasterFile());
             if (!file.exists()) {
                 if (!file.createNewFile()) {
-                    logger.error("create file {} failed", 
auditConfig.getDisasterFile());
+                    LOG.error("create file {} failed", 
auditConfig.getDisasterFile());
                     return;
                 }
-                logger.info("create file {} success", 
auditConfig.getDisasterFile());
+                LOG.info("create file {} success", 
auditConfig.getDisasterFile());
             }
             if (file.length() > auditConfig.getMaxFileSize()) {
                 file.delete();
@@ -195,7 +220,7 @@ public class SenderManager {
             objectOutputStream.close();
             fos.close();
         } catch (IOException e) {
-            logger.error("write local file error: ", e);
+            LOG.error("write local file error:{}", e.getMessage(), e);
         }
     }
 
@@ -208,7 +233,7 @@ public class SenderManager {
             if (!file.mkdirs()) {
                 return false;
             }
-            logger.info("create file {} success", auditConfig.getFilePath());
+            LOG.info("create file {} success", auditConfig.getFilePath());
         }
         return true;
     }
@@ -224,8 +249,8 @@ public class SenderManager {
             }
             FileInputStream inputStream = new 
FileInputStream(auditConfig.getDisasterFile());
             ObjectInputStream objectStream = new 
ObjectInputStream(inputStream);
-            ConcurrentHashMap<Long, AuditData> fileData =
-                    (ConcurrentHashMap<Long, AuditData>) 
objectStream.readObject();
+            ConcurrentHashMap<Long, AuditData> fileData = 
(ConcurrentHashMap<Long, AuditData>) objectStream
+                    .readObject();
             for (Map.Entry<Long, AuditData> entry : fileData.entrySet()) {
                 if (this.dataMap.size() < (auditConfig.getMaxCacheRow() / 2)) {
                     this.dataMap.putIfAbsent(entry.getKey(), entry.getValue());
@@ -237,7 +262,7 @@ public class SenderManager {
             inputStream.close();
             file.delete();
         } catch (IOException | ClassNotFoundException e) {
-            logger.error("check audit file error: ", e);
+            LOG.error("check audit file error:{}", e.getMessage(), e);
         }
     }
 
@@ -259,22 +284,23 @@ public class SenderManager {
             Long requestId = baseCommand.getAuditReply().getRequestId();
             AuditData data = this.dataMap.get(requestId);
             if (data == null) {
-                logger.error("can not find the request id onMessageReceived: " 
+ requestId);
+                LOG.error("can not find the request id onMessageReceived: " + 
requestId);
                 return;
             }
-
-            logger.info("audit-proxy response code: {}", 
baseCommand.getAuditReply().getRspCode());
+            // check resp
+            LOG.debug("audit-proxy response code: {}", 
baseCommand.getAuditReply().getRspCode());
             if 
(AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(baseCommand.getAuditReply().getRspCode()))
 {
                 this.dataMap.remove(requestId);
                 return;
             }
+            LOG.error("audit-proxy response code: {}", 
baseCommand.getAuditReply().getRspCode());
 
             int resendTimes = data.increaseResendTimes();
             if (resendTimes < SenderGroup.MAX_SEND_TIMES) {
                 this.sendData(data.getDataByte());
             }
         } catch (Throwable ex) {
-            logger.error("onMessageReceived exception: ", ex);
+            LOG.error("onMessageReceived exception:{}", ex.getMessage(), ex);
             this.sender.setHasSendError(true);
         }
     }
@@ -283,11 +309,11 @@ public class SenderManager {
      * Handle the packet return exception
      */
     public void onExceptionCaught(ChannelHandlerContext ctx, Throwable e) {
-        logger.error("channel context " + ctx + " occurred exception: ", e);
+        LOG.error("channel context " + ctx + " occurred exception: ", e);
         try {
             this.sender.setHasSendError(true);
         } catch (Throwable ex) {
-            logger.error("setHasSendError error: ", ex);
+            LOG.error("setHasSendError error:{}", ex.getMessage(), ex);
         }
     }
 
@@ -298,7 +324,7 @@ public class SenderManager {
         try {
             Thread.sleep(SEND_INTERVAL_MS);
         } catch (Throwable ex) {
-            logger.error("sleep error: ", ex);
+            LOG.error("sleep error:{}", ex.getMessage(), ex);
         }
     }
 
@@ -308,4 +334,17 @@ public class SenderManager {
     public void setAuditConfig(AuditConfig config) {
         auditConfig = config;
     }
+
+    public void release(Channel channel) {
+        this.sender.release(channel);
+    }
+
+    /**
+     * get dataMap
+     * @return the dataMap
+     */
+    public ConcurrentHashMap<Long, AuditData> getDataMap() {
+        return dataMap;
+    }
+
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
index a7064b9968..dabf01040e 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
@@ -25,15 +25,23 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class AuditData implements Serializable {
 
+    /**
+     * serialVersionUID long
+     */
+    private static final long serialVersionUID = 769061966948427460L;
     public static int HEAD_LENGTH = 4;
     private final AuditApi.BaseCommand content;
     private final AtomicInteger resendTimes = new AtomicInteger(0);
+    private String channelKey;
+    private long sendTime = System.currentTimeMillis();
+    private AuditApi.AuditRequest auditRequest;
 
     /**
      * Constructor
      */
-    public AuditData(AuditApi.BaseCommand content) {
+    public AuditData(AuditApi.BaseCommand content, AuditApi.AuditRequest 
auditRequest) {
         this.content = content;
+        this.auditRequest = auditRequest;
     }
 
     /**
@@ -62,4 +70,45 @@ public class AuditData implements Serializable {
         System.arraycopy(data2, 0, data3, data1.length, data2.length);
         return data3;
     }
+
+    /**
+     * get channelKey
+     * @return the channelKey
+     */
+    public String getChannelKey() {
+        return channelKey;
+    }
+
+    /**
+     * set channelKey
+     * @param channelKey the channelKey to set
+     */
+    public void setChannelKey(String channelKey) {
+        this.channelKey = channelKey;
+    }
+
+    /**
+     * get sendTime
+     * @return the sendTime
+     */
+    public long getSendTime() {
+        return sendTime;
+    }
+
+    /**
+     * set sendTime
+     * @param sendTime the sendTime to set
+     */
+    public void setSendTime(long sendTime) {
+        this.sendTime = sendTime;
+    }
+
+    /**
+     * get auditRequest
+     * @return the auditRequest
+     */
+    public AuditApi.AuditRequest getAuditRequest() {
+        return auditRequest;
+    }
+
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java
deleted file mode 100644
index 2b6208df32..0000000000
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.util;
-
-import io.netty.channel.Channel;
-import org.apache.commons.lang3.math.NumberUtils;
-
-import java.net.InetSocketAddress;
-
-public class IpPort {
-
-    public static final String SEPARATOR = ":";
-    public final String ip;
-    public final int port;
-    public final String key;
-    public final InetSocketAddress addr;
-
-    /**
-     * Constructor
-     *
-     * @param ip
-     * @param port
-     */
-    public IpPort(String ip, int port) {
-        this.ip = ip;
-        this.port = port;
-        this.key = getIpPortKey(ip, port);
-        this.addr = new InetSocketAddress(ip, port);
-    }
-
-    /**
-     * Constructor
-     *
-     * @param addr
-     */
-    public IpPort(InetSocketAddress addr) {
-        this.ip = addr.getHostName();
-        this.port = addr.getPort();
-        this.key = getIpPortKey(ip, port);
-        this.addr = addr;
-    }
-
-    /**
-     * get IpPort by key
-     *
-     * @param ip
-     * @param port
-     * @return
-     */
-    public static String getIpPortKey(String ip, int port) {
-        return ip + ":" + port;
-    }
-
-    /**
-     * parse sIpPort
-     *
-     * @param ipPort
-     * @return
-     */
-    public static IpPort parseIpPort(String ipPort) {
-        String[] splits = ipPort.split(SEPARATOR);
-        if (splits.length == 2) {
-            String strIp = splits[0];
-            String strPort = splits[1];
-            int port = NumberUtils.toInt(strPort, 0);
-            if (port > 0) {
-                return new IpPort(strIp, port);
-            }
-        }
-        return null;
-    }
-
-    /**
-     * parse InetSocketAddress
-     *
-     * @param channel
-     * @return
-     */
-    public static InetSocketAddress parseInetSocketAddress(Channel channel) {
-        InetSocketAddress destAddr = null;
-        if (channel.remoteAddress() instanceof InetSocketAddress) {
-            destAddr = (InetSocketAddress) channel.remoteAddress();
-        } else {
-            String sendIp = channel.remoteAddress().toString();
-            destAddr = new InetSocketAddress(sendIp, 0);
-        }
-        return destAddr;
-    }
-
-    /**
-     * hashCode
-     */
-    @Override
-    public int hashCode() {
-        int result = ip.hashCode();
-        result = 31 * result + port;
-        return result;
-    }
-
-    /**
-     * equals
-     */
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        if (!(o instanceof IpPort)) {
-            return false;
-        }
-
-        try {
-            IpPort ctp = (IpPort) o;
-            if (ip != null && ip.equals(ctp.port) && port == ctp.port) {
-                return true;
-            }
-        } catch (Exception e) {
-            return false;
-        }
-        return false;
-    }
-
-    /**
-     * toString
-     */
-    public String toString() {
-        return key;
-    }
-}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
index 69dcaf8f40..99632002ea 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
@@ -17,19 +17,21 @@
 
 package org.apache.inlong.audit.util;
 
+import java.net.InetSocketAddress;
+
 public class SenderResult {
 
-    public final IpPort ipPort;
+    public final InetSocketAddress addr;
     public boolean result;
 
     /**
      * Constructor
      *
-     * @param ipPort
+     * @param addr
      * @param result
      */
-    public SenderResult(IpPort ipPort, boolean result) {
-        this.ipPort = ipPort;
+    public SenderResult(InetSocketAddress addr, boolean result) {
+        this.addr = addr;
         this.result = result;
     }
 
@@ -41,7 +43,7 @@ public class SenderResult {
      * @param result
      */
     public SenderResult(String sendIp, int sendPort, boolean result) {
-        this.ipPort = new IpPort(sendIp, sendPort);
+        this.addr = new InetSocketAddress(sendIp, sendPort);
         this.result = result;
     }
 }
diff --git 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
index e10846e57b..dcc98fda33 100644
--- 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
+++ 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
@@ -28,7 +28,7 @@ public class AuditDataTest {
 
     @Test
     public void increaseResendTimes() {
-        AuditData test = new AuditData(null);
+        AuditData test = new AuditData(null, null);
         int resendTimes = test.increaseResendTimes();
         assertEquals(1, resendTimes);
         resendTimes = test.increaseResendTimes();
@@ -56,7 +56,7 @@ public class AuditDataTest {
         AuditApi.AuditRequest request = 
AuditApi.AuditRequest.newBuilder().setMsgHeader(headerBuilder.build())
                 .addMsgBody(bodyBuilder.build()).build();
         AuditApi.BaseCommand baseCommand = 
AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
-        AuditData test = new AuditData(baseCommand);
+        AuditData test = new AuditData(baseCommand, request);
         byte[] data = test.getDataByte();
         assertTrue(data.length > 0);
     }
diff --git 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java
 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java
deleted file mode 100644
index 65d2bcb3f6..0000000000
--- 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.util;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class IpPortTest {
-
-    private IpPort test = new IpPort("127.0.0.1", 80);
-
-    @Test
-    public void getIpPortKey() {
-        String ipPortKey = test.getIpPortKey("127.0.0.1", 80);
-        assertTrue(ipPortKey.equals("127.0.0.1:80"));
-    }
-
-    @Test
-    public void testHashCode() {
-        int hashCode = test.hashCode();
-        assertTrue(hashCode != 0);
-    }
-
-    @Test
-    public void testEquals() {
-        IpPort test1 = new IpPort("127.0.0.1", 81);
-        boolean ret = test.equals(test1);
-        assertFalse(ret);
-
-        IpPort test2 = new IpPort("127.0.0.1", 80);
-        ret = test.toString().equals(test2.toString());
-        assertTrue(ret);
-
-        IpPort test3 = test;
-        ret = test.equals(test3);
-        assertTrue(ret);
-    }
-}
\ No newline at end of file

Reply via email to