This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 293ab1ab3e [INLONG-9766][Audit] Support user-defined SocketAddress
loader getting AuditProxy (#9767)
293ab1ab3e is described below
commit 293ab1ab3e539e9c46f1740cbebbd0b6f623d05a
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Mar 6 12:00:51 2024 +0800
[INLONG-9766][Audit] Support user-defined SocketAddress loader getting
AuditProxy (#9767)
---
.../org/apache/inlong/audit/AuditOperator.java | 95 ++++++++++---
.../audit/DefaultISocketAddressListLoader.java | 63 +++++++++
.../inlong/audit/DnsSocketAddressListLoader.java | 77 +++++++++++
...derResult.java => SocketAddressListLoader.java} | 36 ++---
.../apache/inlong/audit/send/SenderChannel.java | 76 ++++++++---
.../org/apache/inlong/audit/send/SenderGroup.java | 134 +++++++++++--------
.../apache/inlong/audit/send/SenderHandler.java | 26 ++++
.../apache/inlong/audit/send/SenderManager.java | 89 +++++++++----
.../org/apache/inlong/audit/util/AuditData.java | 51 ++++++-
.../java/org/apache/inlong/audit/util/IpPort.java | 148 ---------------------
.../org/apache/inlong/audit/util/SenderResult.java | 12 +-
.../apache/inlong/audit/util/AuditDataTest.java | 4 +-
.../org/apache/inlong/audit/util/IpPortTest.java | 55 --------
13 files changed, 512 insertions(+), 354 deletions(-)
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
index e720a75cb4..4ff2436a32 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
@@ -23,6 +23,8 @@ import org.apache.inlong.audit.util.AuditConfig;
import org.apache.inlong.audit.util.Config;
import org.apache.inlong.audit.util.StatInfo;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,9 +35,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;
@@ -58,24 +61,14 @@ public class AuditOperator implements Serializable {
private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new
ConcurrentHashMap<>();
private final List<String> deleteKeyList = new ArrayList<>();
private final Config config = new Config();
- private final Timer timer = new Timer();
private int packageId = 1;
private int dataId = 0;
private boolean initialized = false;
private SenderManager manager;
- private final TimerTask timerTask = new TimerTask() {
-
- @Override
- public void run() {
- try {
- send();
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- }
- }
- };
+ private final ScheduledExecutorService timeoutExecutor =
Executors.newSingleThreadScheduledExecutor();
private AuditConfig auditConfig = null;
+ private SocketAddressListLoader loader = null;
/**
* Not support create from outer.
@@ -99,13 +92,71 @@ public class AuditOperator implements Serializable {
return;
}
config.init();
- timer.schedule(timerTask, PERIOD, PERIOD);
+ timeoutExecutor.scheduleWithFixedDelay(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ loadIpPortList();
+ send();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+
+ }, PERIOD, PERIOD, TimeUnit.MILLISECONDS);
if (auditConfig == null) {
auditConfig = new AuditConfig();
}
this.manager = new SenderManager(auditConfig);
}
+ private void loadIpPortList() {
+ if (loader == null) {
+ return;
+ }
+ try {
+ List<String> ipPortList = loader.loadSocketAddressList();
+ if (ipPortList != null && ipPortList.size() > 0) {
+ HashSet<String> ipPortSet = new HashSet<>();
+ ipPortSet.addAll(ipPortList);
+ this.setAuditProxy(ipPortSet);
+ }
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+
+ /**
+ * set loader
+ * @param loader the loader to set
+ */
+ public void setLoader(SocketAddressListLoader loader) {
+ this.loader = loader;
+ }
+
+ /**
+ * setLoaderClass
+ * @param loaderClassName
+ */
+ public void setLoaderClass(String loaderClassName) {
+ if (StringUtils.isEmpty(loaderClassName)) {
+ return;
+ }
+ try {
+ Class<?> loaderClass = ClassUtils.getClass(loaderClassName);
+ Object loaderObject =
loaderClass.getDeclaredConstructor().newInstance();
+ if (loaderObject instanceof SocketAddressListLoader) {
+ SocketAddressListLoader loader = (SocketAddressListLoader)
loaderObject;
+ this.loader = loader;
+ LOGGER.info("audit IpPortListLoader loaderClass:{}",
loaderClassName);
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Fail to init
IpPortListLoader,loaderClass:{},error:{}",
+ loaderClassName, t.getMessage(), t);
+ }
+ }
+
/**
* Set AuditProxy from the ip
*/
@@ -142,6 +193,16 @@ public class AuditOperator implements Serializable {
public void add(int auditID, String auditTag, String inlongGroupID, String
inlongStreamID, Long logTime,
long count, long size) {
long delayTime = System.currentTimeMillis() - logTime;
+ add(auditID, auditTag, inlongGroupID, inlongStreamID, logTime, count,
size, delayTime * count);
+ }
+
+ public void add(int auditID, String inlongGroupID, String inlongStreamID,
Long logTime, long count, long size,
+ long delayTime) {
+ add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID,
logTime, count, size, delayTime);
+ }
+
+ public void add(int auditID, String auditTag, String inlongGroupID, String
inlongStreamID, Long logTime,
+ long count, long size, long delayTime) {
String key = (logTime / PERIOD) + FIELD_SEPARATORS + inlongGroupID +
FIELD_SEPARATORS
+ inlongStreamID + FIELD_SEPARATORS + auditID +
FIELD_SEPARATORS + auditTag;
addByKey(key, count, size, delayTime);
@@ -156,7 +217,7 @@ public class AuditOperator implements Serializable {
}
countMap.get(key).count.addAndGet(count);
countMap.get(key).size.addAndGet(size);
- countMap.get(key).delay.addAndGet(delayTime * count);
+ countMap.get(key).delay.addAndGet(delayTime);
}
/**
@@ -240,7 +301,7 @@ public class AuditOperator implements Serializable {
private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
AuditApi.BaseCommand.Builder baseCommand =
AuditApi.BaseCommand.newBuilder();
baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
- manager.send(baseCommand.build());
+ manager.send(baseCommand.build(), auditRequest);
}
/**
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java
new file mode 100644
index 0000000000..c6629e6de2
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DefaultISocketAddressListLoader
+ */
+public class DefaultISocketAddressListLoader implements
SocketAddressListLoader {
+
+ public static final String KEY_AUDIT_PROXYS = "audit.proxys";
+
+ private Map<String, String> commonProperties;
+
+ @Override
+ public void setCommonProperties(Map<String, String> commonProperties) {
+ this.commonProperties = commonProperties;
+ }
+
+ /**
+ * loadSocketAddressList
+ * @return
+ */
+ @Override
+ public List<String> loadSocketAddressList() {
+ List<String> ipPortList = new ArrayList<>();
+ String strAuditProxys = commonProperties.get(KEY_AUDIT_PROXYS);
+ if (strAuditProxys == null) {
+ return ipPortList;
+ }
+ String[] ipPorts = strAuditProxys.split("\\s+");
+ for (String tmpIPPort : ipPorts) {
+ if (StringUtils.isBlank(tmpIPPort)) {
+ continue;
+ }
+ ipPortList.add(tmpIPPort.trim());
+ }
+ Collections.sort(ipPortList);
+ return ipPortList;
+ }
+
+}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java
new file mode 100644
index 0000000000..95c9d969d3
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DnsIpPortListLoader
+ */
+public class DnsSocketAddressListLoader implements SocketAddressListLoader {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DnsSocketAddressListLoader.class);
+ public static final String KEY_DNS_ADDRESS = "audit.dns.address";
+ public static final String KEY_DNS_PORT = "audit.dns.port";
+
+ private Map<String, String> commonProperties;
+
+ @Override
+ public void setCommonProperties(Map<String, String> commonProperties) {
+ this.commonProperties = commonProperties;
+ }
+
+ @Override
+ public List<String> loadSocketAddressList() {
+ if (commonProperties == null) {
+ return null;
+ }
+ List<String> ipPortList = new ArrayList<>();
+ String dns = commonProperties.get(KEY_DNS_ADDRESS);
+ String dnsPort = commonProperties.get(KEY_DNS_PORT);
+ if (!StringUtils.isEmpty(dns) && !StringUtils.isEmpty(dnsPort)) {
+ try {
+ InetAddress[] addrs = InetAddress.getAllByName(dns);
+ for (InetAddress addr : addrs) {
+ ipPortList.add(addr.getHostAddress() + ":" + dnsPort);
+ }
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+ }
+ Collections.sort(ipPortList);
+ return ipPortList;
+ }
+
+ public static void main(String[] args) {
+
+ try {
+ InetAddress[] addrs = InetAddress.getAllByName("inlong.woa.com");
+ System.out.println(addrs);
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+ }
+}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java
similarity index 57%
copy from
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
copy to
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java
index 69dcaf8f40..0902964a7d 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java
@@ -15,33 +15,19 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.util;
+package org.apache.inlong.audit;
-public class SenderResult {
+import java.util.List;
+import java.util.Map;
- public final IpPort ipPort;
- public boolean result;
+/**
+ * SocketAddressListLoader
+ */
+public interface SocketAddressListLoader {
+
+ String KEY_ADDRESS_LOADER = "audit.address.loader";
- /**
- * Constructor
- *
- * @param ipPort
- * @param result
- */
- public SenderResult(IpPort ipPort, boolean result) {
- this.ipPort = ipPort;
- this.result = result;
- }
+ void setCommonProperties(Map<String, String> commonProperties);
- /**
- * Constructor
- *
- * @param sendIp
- * @param sendPort
- * @param result
- */
- public SenderResult(String sendIp, int sendPort, boolean result) {
- this.ipPort = new IpPort(sendIp, sendPort);
- this.result = result;
- }
+ List<String> loadSocketAddressList();
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
index 3f45294889..8977221ec7 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
@@ -18,17 +18,18 @@
package org.apache.inlong.audit.send;
import org.apache.inlong.audit.util.EventLoopUtil;
-import org.apache.inlong.audit.util.IpPort;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.util.Attribute;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
@@ -40,8 +41,9 @@ public class SenderChannel {
public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
- private IpPort ipPort;
+ private InetSocketAddress addr;
private Channel channel;
+ private String channelKey;
private Semaphore packToken;
private Bootstrap client;
private SenderManager senderManager;
@@ -49,10 +51,10 @@ public class SenderChannel {
/**
* Constructor
*
- * @param ipPort
+ * @param addr
*/
- public SenderChannel(IpPort ipPort, int maxSynchRequest, SenderManager
senderManager) {
- this.ipPort = ipPort;
+ public SenderChannel(InetSocketAddress addr, int maxSynchRequest,
SenderManager senderManager) {
+ this.addr = addr;
this.packToken = new Semaphore(maxSynchRequest);
this.senderManager = senderManager;
}
@@ -66,6 +68,20 @@ public class SenderChannel {
return packToken.tryAcquire();
}
+ /**
+ * Try acquire channel
+ *
+ * @return
+ */
+ public boolean acquire() {
+ try {
+ packToken.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return true;
+ }
+
/**
* release channel
*/
@@ -78,25 +94,23 @@ public class SenderChannel {
*/
@Override
public String toString() {
- return ipPort.key;
+ return addr.toString();
}
/**
- * get ipPort
- *
- * @return the ipPort
+ * get addr
+ * @return the addr
*/
- public IpPort getIpPort() {
- return ipPort;
+ public InetSocketAddress getAddr() {
+ return addr;
}
/**
- * set ipPort
- *
- * @param ipPort the ipPort to set
+ * set addr
+ * @param addr the addr to set
*/
- public void setIpPort(IpPort ipPort) {
- this.ipPort = ipPort;
+ public void setAddr(InetSocketAddress addr) {
+ this.addr = addr;
}
/**
@@ -115,10 +129,28 @@ public class SenderChannel {
*/
public void setChannel(Channel channel) {
this.channel = channel;
+ Attribute<String> attr = this.channel.attr(SenderGroup.CHANNEL_KEY);
+ attr.set(channelKey);
+ }
+
+ /**
+ * get channelKey
+ * @return the channelKey
+ */
+ public String getChannelKey() {
+ return channelKey;
+ }
+
+ /**
+ * set channelKey
+ * @param channelKey the channelKey to set
+ */
+ public void setChannelKey(String channelKey) {
+ this.channelKey = channelKey;
}
private void init() {
- ThreadFactory selfDefineFactory = new
DefaultThreadFactory("audit-client-io",
+ ThreadFactory selfDefineFactory = new
DefaultThreadFactory("audit-client-io" + Thread.currentThread().getId(),
Thread.currentThread().isDaemon());
EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(DEFAULT_SEND_THREADNUM,
@@ -127,8 +159,8 @@ public class SenderChannel {
client.group(eventLoopGroup);
client.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
client.option(ChannelOption.SO_KEEPALIVE, true);
- client.option(ChannelOption.TCP_NODELAY, true);
- client.option(ChannelOption.SO_REUSEADDR, true);
+ client.option(ChannelOption.TCP_NODELAY, false);
+ client.option(ChannelOption.SO_REUSEADDR, false);
client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
client.handler(new ClientPipelineFactory(senderManager));
@@ -149,11 +181,13 @@ public class SenderChannel {
}
synchronized (client) {
- ChannelFuture future = client.connect(this.ipPort.addr).sync();
+ ChannelFuture future = client.connect(this.addr).sync();
this.channel = future.channel();
+ Attribute<String> attr =
this.channel.attr(SenderGroup.CHANNEL_KEY);
+ attr.set(channelKey);
}
} catch (Throwable e) {
- LOG.error("connect {} failed. {}", this.getIpPort(),
e.getMessage());
+ LOG.error("connect {} failed. {}", this.getAddr(), e.getMessage());
return false;
}
return true;
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
index c28e0531ff..4443e58c2f 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
@@ -17,25 +17,28 @@
package org.apache.inlong.audit.send;
-import org.apache.inlong.audit.util.IpPort;
import org.apache.inlong.audit.util.SenderResult;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
public class SenderGroup {
- private static final Logger logger =
LoggerFactory.getLogger(SenderGroup.class);
+ public static final Logger LOG =
LoggerFactory.getLogger(SenderGroup.class);
+ public static final AttributeKey<String> CHANNEL_KEY =
AttributeKey.newInstance("channelKey");
// maximum number of sending
public static final int MAX_SEND_TIMES = 3;
public static final int DEFAULT_WAIT_TIMES = 10000;
@@ -55,6 +58,8 @@ public class SenderGroup {
private SenderManager senderManager;
+ private AtomicLong channelId = new AtomicLong(0);
+
/**
* constructor
*
@@ -80,18 +85,17 @@ public class SenderGroup {
SenderChannel channel = null;
try {
if (channels.size() <= 0) {
- logger.error("channels is empty");
+ LOG.error("channels is empty");
dataBuf.release();
return new SenderResult("channels is empty", 0, false);
}
boolean isOk = false;
+ // tryAcquire
for (int tryIndex = 0; tryIndex < MAX_SEND_TIMES; tryIndex++) {
- int random = RANDOM_MIN + (int) (Math.random() *
(channels.size() - RANDOM_MIN));
- channels = channelGroups.get(mIndex);
for (int i = 0; i < channels.size(); i++) {
channel = channels.poll();
if (channel.tryAcquire()) {
- if (random == i && channel.connect()) {
+ if (channel.connect()) {
isOk = true;
break;
}
@@ -108,11 +112,26 @@ public class SenderGroup {
try {
Thread.sleep(waitChannelIntervalMs);
} catch (Throwable e) {
- logger.error(e.getMessage());
+ LOG.error(e.getMessage());
}
}
+ // acquire
if (channel == null) {
- logger.error("can not get a channel");
+ for (int i = 0; i < channels.size(); i++) {
+ channel = channels.poll();
+ if (!channel.connect()) {
+ channels.offer(channel);
+ channel = null;
+ continue;
+ }
+ if (channel.acquire()) {
+ break;
+ }
+ }
+ }
+ // error
+ if (channel == null) {
+ LOG.error("can not get a channel");
dataBuf.release();
return new SenderResult("can not get a channel", 0, false);
}
@@ -129,39 +148,27 @@ public class SenderGroup {
} else {
dataBuf.release();
}
- return new SenderResult(channel.getIpPort().ip,
channel.getIpPort().port, t.isSuccess());
+ return new SenderResult(channel.getAddr().getHostString(),
channel.getAddr().getPort(), t.isSuccess());
} catch (Throwable ex) {
- logger.error(ex.getMessage());
+ LOG.error(ex.getMessage(), ex);
this.setHasSendError(true);
- return new SenderResult(ex.getMessage(), 0, false);
+ return new SenderResult("127.0.0.1", 0, false);
} finally {
if (channel != null) {
- channel.release();
channels.offer(channel);
}
}
}
- /**
- * release channel
- */
- public void release(String ipPort) {
- SenderChannel channel = this.totalChannels.get(ipPort);
- if (channel != null) {
- channel.release();
+ public void release(Channel channel) {
+ Attribute<String> attr = channel.attr(CHANNEL_KEY);
+ String key = attr.get();
+ if (key == null) {
+ return;
}
- }
-
- /**
- * release channel
- */
- public void release(InetSocketAddress addr) {
- String destIp = addr.getHostName();
- int destPort = addr.getPort();
- String ipPort = IpPort.getIpPortKey(destIp, destPort);
- SenderChannel channel = this.totalChannels.get(ipPort);
- if (channel != null) {
- channel.release();
+ SenderChannel senderChannel = this.totalChannels.get(key);
+ if (senderChannel != null) {
+ senderChannel.release();
}
}
@@ -170,47 +177,64 @@ public class SenderGroup {
*
* @param ipLists
*/
- public void updateConfig(Set<String> ipLists) {
+ public void updateConfig(List<String> ipLists) {
try {
for (SenderChannel dc : deleteChannels) {
- dc.getChannel().disconnect();
- dc.getChannel().close();
+ if (dc.getChannel() != null) {
+ try {
+ dc.getChannel().disconnect();
+ dc.getChannel().close();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
}
deleteChannels.clear();
int newIndex = mIndex ^ 0x01;
LinkedBlockingQueue<SenderChannel> newChannels =
this.channelGroups.get(newIndex);
newChannels.clear();
+ List<String> waitingDeleteChannelKey = new
ArrayList<>(totalChannels.size());
+ waitingDeleteChannelKey.addAll(totalChannels.keySet());
for (String ipPort : ipLists) {
- SenderChannel channel = totalChannels.get(ipPort);
- if (channel != null) {
- newChannels.add(channel);
- continue;
- }
try {
- IpPort ipPortObj = IpPort.parseIpPort(ipPort);
- if (ipPortObj == null) {
+ InetSocketAddress addr = parseAddress(ipPort);
+ if (addr == null) {
continue;
}
- channel = new SenderChannel(ipPortObj, maxSynchRequest,
senderManager);
+ String key = String.valueOf(channelId.getAndIncrement());
+ SenderChannel channel = new SenderChannel(addr,
maxSynchRequest, senderManager);
+ channel.setChannelKey(key);
newChannels.add(channel);
- totalChannels.put(ipPort, channel);
+ totalChannels.put(key, channel);
} catch (Exception e) {
- logger.error(e.getMessage());
+ LOG.error(e.getMessage(), e);
}
}
- for (Entry<String, SenderChannel> entry :
totalChannels.entrySet()) {
- if (!ipLists.contains(entry.getKey())) {
- deleteChannels.add(entry.getValue());
- }
- }
- for (SenderChannel dc : deleteChannels) {
- totalChannels.remove(dc.getIpPort().key);
- }
+ waitingDeleteChannelKey.forEach(v ->
deleteChannels.add(totalChannels.remove(v)));
this.mIndex = newIndex;
} catch (Throwable e) {
- logger.error("Update Sender Ip Failed." + e.getMessage());
+ LOG.error("Update Sender Ip Failed." + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * parseAddress
+ *
+ * @param InetSocketAddress
+ * @return
+ */
+ private static InetSocketAddress parseAddress(String ipPort) {
+ String[] splits = ipPort.split(":");
+ if (splits.length == 2) {
+ String strIp = splits[0];
+ String strPort = splits[1];
+ int port = NumberUtils.toInt(strPort, 0);
+ if (port > 0) {
+ return new InetSocketAddress(strIp, port);
+ }
}
+ return null;
}
/**
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
index e2f8fc5b90..c5f5c481e9 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
@@ -17,11 +17,14 @@
package org.apache.inlong.audit.send;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
+
public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
private static final Logger LOGGER =
LoggerFactory.getLogger(SenderHandler.class);
@@ -40,6 +43,7 @@ public class SenderHandler extends
SimpleChannelInboundHandler<byte[]> {
@Override
public void channelRead0(io.netty.channel.ChannelHandlerContext ctx,
byte[] e) {
try {
+ manager.release(ctx.channel());
manager.onMessageReceived(ctx, e);
} catch (Throwable ex) {
LOGGER.error("channelRead0 error: ", ex);
@@ -52,6 +56,7 @@ public class SenderHandler extends
SimpleChannelInboundHandler<byte[]> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
try {
+ manager.release(ctx.channel());
manager.onExceptionCaught(ctx, e);
} catch (Throwable ex) {
LOGGER.error("caught exception: ", ex);
@@ -64,6 +69,7 @@ public class SenderHandler extends
SimpleChannelInboundHandler<byte[]> {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
try {
+ manager.release(ctx.channel());
super.channelInactive(ctx);
} catch (Throwable ex) {
LOGGER.error("channelInactive error: ", ex);
@@ -76,9 +82,29 @@ public class SenderHandler extends
SimpleChannelInboundHandler<byte[]> {
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws
Exception {
try {
+ manager.release(ctx.channel());
super.channelUnregistered(ctx);
} catch (Throwable ex) {
LOGGER.error("channelUnregistered error: ", ex);
}
}
+
+ /**
+ * parseInetSocketAddress
+ *
+ * @param channel
+ * @return
+ */
+ public static InetSocketAddress parseInetSocketAddress(Channel channel) {
+ InetSocketAddress destAddr = null;
+ if (channel.remoteAddress() instanceof InetSocketAddress) {
+ destAddr = (InetSocketAddress) channel.remoteAddress();
+ } else if (channel.remoteAddress() != null) {
+ String sendIp = channel.remoteAddress().toString();
+ destAddr = new InetSocketAddress(sendIp, 0);
+ } else {
+ destAddr = new InetSocketAddress("127.0.0.1", 0);
+ }
+ return destAddr;
+ }
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index 357711edeb..4b85c54ec3 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -24,6 +24,7 @@ import org.apache.inlong.audit.util.SenderResult;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -50,17 +52,19 @@ public class SenderManager {
public static final Long MAX_REQUEST_ID = 1000000000L;
public static final int ALL_CONNECT_CHANNEL = -1;
public static final int DEFAULT_CONNECT_CHANNEL = 2;
- private static final Logger logger =
LoggerFactory.getLogger(SenderManager.class);
+ public static final Logger LOG =
LoggerFactory.getLogger(SenderManager.class);
private static final int SEND_INTERVAL_MS = 20;
private final SecureRandom sRandom = new
SecureRandom(Long.toString(System.currentTimeMillis()).getBytes());
private final AtomicLong requestIdSeq = new AtomicLong(0L);
private final ConcurrentHashMap<Long, AuditData> dataMap = new
ConcurrentHashMap<>();
+ private final LinkedBlockingQueue<Long> requestIdQueue = new
LinkedBlockingQueue<>();
private SenderGroup sender;
private int maxConnectChannels = ALL_CONNECT_CHANNEL;
// IPList
- private HashSet<String> currentIpPorts = new HashSet<>();
+ private List<String> currentIpPorts = new ArrayList<>();
private AuditConfig auditConfig;
+ private long lastCheckTime = System.currentTimeMillis();
/**
* Constructor
@@ -78,7 +82,7 @@ public class SenderManager {
this.maxConnectChannels = maxConnectChannels;
this.sender = new SenderGroup(this);
} catch (Exception ex) {
- logger.error(ex.getMessage());
+ LOG.error(ex.getMessage(), ex);
}
}
@@ -90,7 +94,9 @@ public class SenderManager {
return;
}
this.sender.setHasSendError(false);
- this.currentIpPorts = ipPortList;
+ List<String> newIpPorts = new ArrayList<>();
+ newIpPorts.addAll(ipPortList);
+ this.currentIpPorts = newIpPorts;
int ipSize = ipPortList.size();
int needNewSize;
if (this.maxConnectChannels == ALL_CONNECT_CHANNEL ||
this.maxConnectChannels >= ipSize) {
@@ -99,7 +105,7 @@ public class SenderManager {
needNewSize = maxConnectChannels;
}
- HashSet<String> updateConfigIpLists = new HashSet<>();
+ List<String> updateConfigIpLists = new ArrayList<>();
List<String> availableIpLists = new ArrayList<>(ipPortList);
for (int i = 0; i < needNewSize; i++) {
int availableIpSize = availableIpLists.size();
@@ -107,6 +113,7 @@ public class SenderManager {
String ipPort = availableIpLists.remove(newIpPortIndex);
updateConfigIpLists.add(ipPort);
}
+ LOG.info("needNewSize:{},updateConfigIpLists:{}", needNewSize,
updateConfigIpLists);
if (updateConfigIpLists.size() > 0) {
this.sender.updateConfig(updateConfigIpLists);
}
@@ -127,11 +134,29 @@ public class SenderManager {
/**
* Send data with command
*/
- public void send(AuditApi.BaseCommand baseCommand) {
- AuditData data = new AuditData(baseCommand);
+ public void send(AuditApi.BaseCommand baseCommand, AuditApi.AuditRequest
auditRequest) {
+ AuditData data = new AuditData(baseCommand, auditRequest);
// cache first
- this.dataMap.putIfAbsent(baseCommand.getAuditRequest().getRequestId(),
data);
+ Long requestId = baseCommand.getAuditRequest().getRequestId();
+ this.dataMap.putIfAbsent(requestId, data);
+ requestIdQueue.offer(requestId);
this.sendData(data.getDataByte());
+ // resend
+ long newTime = System.currentTimeMillis() - 10000;
+ if (newTime > lastCheckTime) {
+ for (int i = 0; i < requestIdQueue.size(); i++) {
+ Long current = requestIdQueue.poll();
+ AuditData auditData = this.dataMap.get(current);
+ if (auditData == null) {
+ continue;
+ } else {
+ requestIdQueue.offer(current);
+ if (newTime > auditData.getSendTime()) {
+ this.sendData(auditData.getDataByte());
+ }
+ }
+ }
+ }
}
/**
@@ -139,7 +164,7 @@ public class SenderManager {
*/
private void sendData(byte[] data) {
if (data == null || data.length <= 0) {
- logger.warn("send data is empty!");
+ LOG.warn("send data is empty!");
return;
}
ByteBuf dataBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
@@ -154,7 +179,7 @@ public class SenderManager {
* Clean up the backlog of unsent message packets
*/
public void clearBuffer() {
- logger.info("audit failed cache size: {}", this.dataMap.size());
+ LOG.info("audit failed cache size: {}", this.dataMap.size());
for (AuditData data : this.dataMap.values()) {
this.sendData(data.getDataByte());
this.sleep();
@@ -163,7 +188,7 @@ public class SenderManager {
checkAuditFile();
}
if (this.dataMap.size() > auditConfig.getMaxCacheRow()) {
- logger.info("failed cache size: {}>{}", this.dataMap.size(),
auditConfig.getMaxCacheRow());
+ LOG.info("failed cache size: {}>{}", this.dataMap.size(),
auditConfig.getMaxCacheRow());
writeLocalFile();
this.dataMap.clear();
}
@@ -180,10 +205,10 @@ public class SenderManager {
File file = new File(auditConfig.getDisasterFile());
if (!file.exists()) {
if (!file.createNewFile()) {
- logger.error("create file {} failed",
auditConfig.getDisasterFile());
+ LOG.error("create file {} failed",
auditConfig.getDisasterFile());
return;
}
- logger.info("create file {} success",
auditConfig.getDisasterFile());
+ LOG.info("create file {} success",
auditConfig.getDisasterFile());
}
if (file.length() > auditConfig.getMaxFileSize()) {
file.delete();
@@ -195,7 +220,7 @@ public class SenderManager {
objectOutputStream.close();
fos.close();
} catch (IOException e) {
- logger.error("write local file error: ", e);
+ LOG.error("write local file error:{}", e.getMessage(), e);
}
}
@@ -208,7 +233,7 @@ public class SenderManager {
if (!file.mkdirs()) {
return false;
}
- logger.info("create file {} success", auditConfig.getFilePath());
+ LOG.info("create file {} success", auditConfig.getFilePath());
}
return true;
}
@@ -224,8 +249,8 @@ public class SenderManager {
}
FileInputStream inputStream = new
FileInputStream(auditConfig.getDisasterFile());
ObjectInputStream objectStream = new
ObjectInputStream(inputStream);
- ConcurrentHashMap<Long, AuditData> fileData =
- (ConcurrentHashMap<Long, AuditData>)
objectStream.readObject();
+ ConcurrentHashMap<Long, AuditData> fileData =
(ConcurrentHashMap<Long, AuditData>) objectStream
+ .readObject();
for (Map.Entry<Long, AuditData> entry : fileData.entrySet()) {
if (this.dataMap.size() < (auditConfig.getMaxCacheRow() / 2)) {
this.dataMap.putIfAbsent(entry.getKey(), entry.getValue());
@@ -237,7 +262,7 @@ public class SenderManager {
inputStream.close();
file.delete();
} catch (IOException | ClassNotFoundException e) {
- logger.error("check audit file error: ", e);
+ LOG.error("check audit file error:{}", e.getMessage(), e);
}
}
@@ -259,22 +284,23 @@ public class SenderManager {
Long requestId = baseCommand.getAuditReply().getRequestId();
AuditData data = this.dataMap.get(requestId);
if (data == null) {
- logger.error("can not find the request id onMessageReceived: "
+ requestId);
+ LOG.error("can not find the request id onMessageReceived: " +
requestId);
return;
}
-
- logger.info("audit-proxy response code: {}",
baseCommand.getAuditReply().getRspCode());
+ // check resp
+ LOG.debug("audit-proxy response code: {}",
baseCommand.getAuditReply().getRspCode());
if
(AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(baseCommand.getAuditReply().getRspCode()))
{
this.dataMap.remove(requestId);
return;
}
+ LOG.error("audit-proxy response code: {}",
baseCommand.getAuditReply().getRspCode());
int resendTimes = data.increaseResendTimes();
if (resendTimes < SenderGroup.MAX_SEND_TIMES) {
this.sendData(data.getDataByte());
}
} catch (Throwable ex) {
- logger.error("onMessageReceived exception: ", ex);
+ LOG.error("onMessageReceived exception:{}", ex.getMessage(), ex);
this.sender.setHasSendError(true);
}
}
@@ -283,11 +309,11 @@ public class SenderManager {
* Handle the packet return exception
*/
public void onExceptionCaught(ChannelHandlerContext ctx, Throwable e) {
- logger.error("channel context " + ctx + " occurred exception: ", e);
+ LOG.error("channel context " + ctx + " occurred exception: ", e);
try {
this.sender.setHasSendError(true);
} catch (Throwable ex) {
- logger.error("setHasSendError error: ", ex);
+ LOG.error("setHasSendError error:{}", ex.getMessage(), ex);
}
}
@@ -298,7 +324,7 @@ public class SenderManager {
try {
Thread.sleep(SEND_INTERVAL_MS);
} catch (Throwable ex) {
- logger.error("sleep error: ", ex);
+ LOG.error("sleep error:{}", ex.getMessage(), ex);
}
}
@@ -308,4 +334,17 @@ public class SenderManager {
public void setAuditConfig(AuditConfig config) {
auditConfig = config;
}
+
+ public void release(Channel channel) {
+ this.sender.release(channel);
+ }
+
+ /**
+ * get dataMap
+ * @return the dataMap
+ */
+ public ConcurrentHashMap<Long, AuditData> getDataMap() {
+ return dataMap;
+ }
+
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
index a7064b9968..dabf01040e 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
@@ -25,15 +25,23 @@ import java.util.concurrent.atomic.AtomicInteger;
public class AuditData implements Serializable {
+ /**
+ * serialVersionUID long
+ */
+ private static final long serialVersionUID = 769061966948427460L;
public static int HEAD_LENGTH = 4;
private final AuditApi.BaseCommand content;
private final AtomicInteger resendTimes = new AtomicInteger(0);
+ private String channelKey;
+ private long sendTime = System.currentTimeMillis();
+ private AuditApi.AuditRequest auditRequest;
/**
* Constructor
*/
- public AuditData(AuditApi.BaseCommand content) {
+ public AuditData(AuditApi.BaseCommand content, AuditApi.AuditRequest
auditRequest) {
this.content = content;
+ this.auditRequest = auditRequest;
}
/**
@@ -62,4 +70,45 @@ public class AuditData implements Serializable {
System.arraycopy(data2, 0, data3, data1.length, data2.length);
return data3;
}
+
+ /**
+ * get channelKey
+ * @return the channelKey
+ */
+ public String getChannelKey() {
+ return channelKey;
+ }
+
+ /**
+ * set channelKey
+ * @param channelKey the channelKey to set
+ */
+ public void setChannelKey(String channelKey) {
+ this.channelKey = channelKey;
+ }
+
+ /**
+ * get sendTime
+ * @return the sendTime
+ */
+ public long getSendTime() {
+ return sendTime;
+ }
+
+ /**
+ * set sendTime
+ * @param sendTime the sendTime to set
+ */
+ public void setSendTime(long sendTime) {
+ this.sendTime = sendTime;
+ }
+
+ /**
+ * get auditRequest
+ * @return the auditRequest
+ */
+ public AuditApi.AuditRequest getAuditRequest() {
+ return auditRequest;
+ }
+
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java
deleted file mode 100644
index 2b6208df32..0000000000
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.util;
-
-import io.netty.channel.Channel;
-import org.apache.commons.lang3.math.NumberUtils;
-
-import java.net.InetSocketAddress;
-
-public class IpPort {
-
- public static final String SEPARATOR = ":";
- public final String ip;
- public final int port;
- public final String key;
- public final InetSocketAddress addr;
-
- /**
- * Constructor
- *
- * @param ip
- * @param port
- */
- public IpPort(String ip, int port) {
- this.ip = ip;
- this.port = port;
- this.key = getIpPortKey(ip, port);
- this.addr = new InetSocketAddress(ip, port);
- }
-
- /**
- * Constructor
- *
- * @param addr
- */
- public IpPort(InetSocketAddress addr) {
- this.ip = addr.getHostName();
- this.port = addr.getPort();
- this.key = getIpPortKey(ip, port);
- this.addr = addr;
- }
-
- /**
- * get IpPort by key
- *
- * @param ip
- * @param port
- * @return
- */
- public static String getIpPortKey(String ip, int port) {
- return ip + ":" + port;
- }
-
- /**
- * parse sIpPort
- *
- * @param ipPort
- * @return
- */
- public static IpPort parseIpPort(String ipPort) {
- String[] splits = ipPort.split(SEPARATOR);
- if (splits.length == 2) {
- String strIp = splits[0];
- String strPort = splits[1];
- int port = NumberUtils.toInt(strPort, 0);
- if (port > 0) {
- return new IpPort(strIp, port);
- }
- }
- return null;
- }
-
- /**
- * parse InetSocketAddress
- *
- * @param channel
- * @return
- */
- public static InetSocketAddress parseInetSocketAddress(Channel channel) {
- InetSocketAddress destAddr = null;
- if (channel.remoteAddress() instanceof InetSocketAddress) {
- destAddr = (InetSocketAddress) channel.remoteAddress();
- } else {
- String sendIp = channel.remoteAddress().toString();
- destAddr = new InetSocketAddress(sendIp, 0);
- }
- return destAddr;
- }
-
- /**
- * hashCode
- */
- @Override
- public int hashCode() {
- int result = ip.hashCode();
- result = 31 * result + port;
- return result;
- }
-
- /**
- * equals
- */
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- if (!(o instanceof IpPort)) {
- return false;
- }
-
- try {
- IpPort ctp = (IpPort) o;
- if (ip != null && ip.equals(ctp.port) && port == ctp.port) {
- return true;
- }
- } catch (Exception e) {
- return false;
- }
- return false;
- }
-
- /**
- * toString
- */
- public String toString() {
- return key;
- }
-}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
index 69dcaf8f40..99632002ea 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
@@ -17,19 +17,21 @@
package org.apache.inlong.audit.util;
+import java.net.InetSocketAddress;
+
public class SenderResult {
- public final IpPort ipPort;
+ public final InetSocketAddress addr;
public boolean result;
/**
* Constructor
*
- * @param ipPort
+ * @param addr
* @param result
*/
- public SenderResult(IpPort ipPort, boolean result) {
- this.ipPort = ipPort;
+ public SenderResult(InetSocketAddress addr, boolean result) {
+ this.addr = addr;
this.result = result;
}
@@ -41,7 +43,7 @@ public class SenderResult {
* @param result
*/
public SenderResult(String sendIp, int sendPort, boolean result) {
- this.ipPort = new IpPort(sendIp, sendPort);
+ this.addr = new InetSocketAddress(sendIp, sendPort);
this.result = result;
}
}
diff --git
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
index e10846e57b..dcc98fda33 100644
---
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
+++
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
@@ -28,7 +28,7 @@ public class AuditDataTest {
@Test
public void increaseResendTimes() {
- AuditData test = new AuditData(null);
+ AuditData test = new AuditData(null, null);
int resendTimes = test.increaseResendTimes();
assertEquals(1, resendTimes);
resendTimes = test.increaseResendTimes();
@@ -56,7 +56,7 @@ public class AuditDataTest {
AuditApi.AuditRequest request =
AuditApi.AuditRequest.newBuilder().setMsgHeader(headerBuilder.build())
.addMsgBody(bodyBuilder.build()).build();
AuditApi.BaseCommand baseCommand =
AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
- AuditData test = new AuditData(baseCommand);
+ AuditData test = new AuditData(baseCommand, request);
byte[] data = test.getDataByte();
assertTrue(data.length > 0);
}
diff --git
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java
deleted file mode 100644
index 65d2bcb3f6..0000000000
---
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.util;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class IpPortTest {
-
- private IpPort test = new IpPort("127.0.0.1", 80);
-
- @Test
- public void getIpPortKey() {
- String ipPortKey = test.getIpPortKey("127.0.0.1", 80);
- assertTrue(ipPortKey.equals("127.0.0.1:80"));
- }
-
- @Test
- public void testHashCode() {
- int hashCode = test.hashCode();
- assertTrue(hashCode != 0);
- }
-
- @Test
- public void testEquals() {
- IpPort test1 = new IpPort("127.0.0.1", 81);
- boolean ret = test.equals(test1);
- assertFalse(ret);
-
- IpPort test2 = new IpPort("127.0.0.1", 80);
- ret = test.toString().equals(test2.toString());
- assertTrue(ret);
-
- IpPort test3 = test;
- ret = test.equals(test3);
- assertTrue(ret);
- }
-}
\ No newline at end of file