This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 853419d08f [INLONG-11565][SDK] Optimize the implementation of the
Utils.java class (#11566)
853419d08f is described below
commit 853419d08f8a48305cf034dbf61e992b07a471d2
Author: Goson Zhang <[email protected]>
AuthorDate: Sun Dec 1 15:39:33 2024 +0800
[INLONG-11565][SDK] Optimize the implementation of the Utils.java class
(#11566)
---
.../pojo/dataproxy/DataProxyNodeResponse.java | 1 -
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 8 ++----
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 22 +++++++--------
.../sdk/dataproxy/codec/ProtocolEncoder.java | 33 +++++++++++-----------
.../sdk/dataproxy/config/ProxyConfigManager.java | 28 +++++++++---------
.../sdk/dataproxy/example/UdpClientExample.java | 12 ++++----
.../sdk/dataproxy/http/InternalHttpSender.java | 3 +-
.../inlong/sdk/dataproxy/network/ClientMgr.java | 2 +-
.../sdk/dataproxy/network/HttpProxySender.java | 3 +-
.../dataproxy/network/{Utils.java => IpUtils.java} | 21 ++------------
.../inlong/sdk/dataproxy/network/Sender.java | 4 +--
.../inlong/sdk/dataproxy/network/SequentialID.java | 5 ++--
.../sdk/dataproxy/pb/context/SinkContext.java | 2 +-
.../dataproxy/{utils => pb/network}/IpUtils.java | 2 +-
.../sdk/dataproxy/threads/MetricWorkerThread.java | 6 ++--
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 20 +++++++------
.../sdk/dataproxy/utils/ServiceDiscoveryUtils.java | 6 ++--
.../sdk/dataproxy/ProxyConfigManagerTest.java | 3 +-
.../org/apache/inlong/sdk/dataproxy/UtilsTest.java | 4 +--
19 files changed, 81 insertions(+), 104 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeResponse.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeResponse.java
index 0d7237fdfe..a48219a930 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeResponse.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeResponse.java
@@ -30,7 +30,6 @@ public class DataProxyNodeResponse {
/**
* DataProxy cluster id
*/
- @Deprecated
private Integer clusterId;
private String reportSourceType;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 153c43b8db..6fa6f6ff9a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -28,7 +28,6 @@ import
org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.threads.ManagerFetcherThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
@@ -54,7 +53,7 @@ public class DefaultMessageSender implements MessageSender {
new ConcurrentHashMap<>();
private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new
AtomicBoolean(false);
private static ManagerFetcherThread managerFetcherThread;
- private static final SequentialID idGenerator = new
SequentialID(Utils.getLocalIp());
+ private static final SequentialID idGenerator = new SequentialID();
private final Sender sender;
private final IndexCollectThread indexCol;
/* Store index <groupId_streamId,cnt> */
@@ -65,7 +64,7 @@ public class DefaultMessageSender implements MessageSender {
private boolean isGroupIdTransfer = false;
private boolean isReport = false;
private boolean isSupportLF = false;
- private int maxPacketLength;
+ private int maxPacketLength = -1;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
private final int senderMaxAttempt;
@@ -110,8 +109,7 @@ public class DefaultMessageSender implements MessageSender {
}
LOGGER.info("Initial tcp sender, configure is {}", configure);
// initial sender object
- ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(configure,
- Utils.getLocalIp(), null);
+ ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(configure, null);
proxyConfigManager.setInlongGroupId(configure.getInlongGroupId());
ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
DefaultMessageSender sender = CACHE_SENDER.get(entry.getClusterId());
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index 3338d866c2..e7a0f6a3e6 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
+import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
@@ -98,13 +98,13 @@ public class ProxyClientConfig {
public ProxyClientConfig(String localHost, boolean requestByHttp, String
managerIp,
int managerPort, String inlongGroupId, String authSecretId, String
authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws
ProxysdkException {
- if (Utils.isBlank(localHost)) {
+ if (StringUtils.isBlank(localHost)) {
throw new ProxysdkException("localHost is blank!");
}
- if (Utils.isBlank(managerIp)) {
+ if (StringUtils.isBlank(managerIp)) {
throw new IllegalArgumentException("managerIp is Blank!");
}
- if (Utils.isBlank(inlongGroupId)) {
+ if (StringUtils.isBlank(inlongGroupId)) {
throw new ProxysdkException("groupId is blank!");
}
this.inlongGroupId = inlongGroupId;
@@ -114,7 +114,7 @@ public class ProxyClientConfig {
this.managerAddress = getManagerAddress(managerIp, managerPort,
requestByHttp);
this.managerUrl =
getManagerUrl(managerAddress, inlongGroupId);
- Utils.validLocalIp(localHost);
+ IpUtils.validLocalIp(localHost);
this.aliveConnections = ConfigConstants.ALIVE_CONNECTIONS;
this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE;
@@ -131,11 +131,11 @@ public class ProxyClientConfig {
/* pay attention to the last url parameter ip */
public ProxyClientConfig(String managerAddress, String inlongGroupId,
String authSecretId, String authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws
ProxysdkException {
- if (Utils.isBlank(managerAddress) ||
(!managerAddress.startsWith(ConfigConstants.HTTP)
+ if (StringUtils.isBlank(managerAddress) ||
(!managerAddress.startsWith(ConfigConstants.HTTP)
&& !managerAddress.startsWith(ConfigConstants.HTTPS))) {
throw new ProxysdkException("managerAddress is blank or missing
http/https protocol ");
}
- if (Utils.isBlank(inlongGroupId)) {
+ if (StringUtils.isBlank(inlongGroupId)) {
throw new ProxysdkException("groupId is blank!");
}
if (managerAddress.startsWith(ConfigConstants.HTTPS)) {
@@ -348,10 +348,10 @@ public class ProxyClientConfig {
this.needAuthentication = needAuthentication;
this.isNeedDataEncry = needDataEncry;
if (this.needAuthentication || this.isNeedDataEncry) {
- if (Utils.isBlank(userName)) {
+ if (StringUtils.isBlank(userName)) {
throw new IllegalArgumentException("userName is Blank!");
}
- if (Utils.isBlank(secretKey)) {
+ if (StringUtils.isBlank(secretKey)) {
throw new IllegalArgumentException("secretKey is Blank!");
}
}
@@ -360,10 +360,10 @@ public class ProxyClientConfig {
}
public void setHttpsInfo(String tlsServerCertFilePathAndName, String
tlsServerKey) {
- if (Utils.isBlank(tlsServerCertFilePathAndName)) {
+ if (StringUtils.isBlank(tlsServerCertFilePathAndName)) {
throw new IllegalArgumentException("tlsServerCertFilePathAndName
is Blank!");
}
- if (Utils.isBlank(tlsServerKey)) {
+ if (StringUtils.isBlank(tlsServerKey)) {
throw new IllegalArgumentException("tlsServerKey is Blank!");
}
this.tlsServerKey = tlsServerKey;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
index ecc1e1de91..c3d463987b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sdk.dataproxy.codec;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.EncryptInfo;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
+import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
@@ -78,18 +79,18 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
try {
String endAttr = object.getCommonattr();
if (object.isAuth()) {
- if (Utils.isNotBlank(endAttr)) {
+ if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
long timestamp = System.currentTimeMillis();
int nonce = new
SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE);
- endAttr = endAttr + "_userName=" + object.getUserName() +
"&_clientIP=" + Utils.getLocalIp()
- + "&_signature=" +
Utils.generateSignature(object.getUserName(),
+ endAttr = endAttr + "_userName=" + object.getUserName() +
"&_clientIP=" + IpUtils.getLocalIp()
+ + "&_signature=" +
IpUtils.generateSignature(object.getUserName(),
timestamp, nonce, object.getSecretKey())
+ "&_timeStamp=" + timestamp + "&_nonce=" + nonce;
}
- if (Utils.isNotBlank(object.getMsgUUID())) {
- if (Utils.isNotBlank(endAttr)) {
+ if (StringUtils.isNotBlank(object.getMsgUUID())) {
+ if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
endAttr = endAttr + "msgUUID=" + object.getMsgUUID();
@@ -129,7 +130,7 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
if (object.isEncrypt()) {
EncryptConfigEntry encryptEntry = object.getEncryptEntry();
if (encryptEntry != null) {
- if (Utils.isNotBlank(endAttr)) {
+ if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
EncryptInfo encryptInfo = encryptEntry.getRsaEncryptInfo();
@@ -140,13 +141,13 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
}
}
if (!object.isGroupIdTransfer()) {
- if (Utils.isNotBlank(endAttr)) {
+ if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
endAttr = (endAttr + "groupId=" + object.getGroupId() +
"&streamId=" + object.getStreamId());
}
- if (Utils.isNotBlank(object.getMsgUUID())) {
- if (Utils.isNotBlank(endAttr)) {
+ if (StringUtils.isNotBlank(object.getMsgUUID())) {
+ if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
endAttr = endAttr + "msgUUID=" + object.getMsgUUID();
@@ -258,7 +259,7 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
if (object.isEncrypt()) {
EncryptConfigEntry encryptEntry = object.getEncryptEntry();
if (encryptEntry != null) {
- if (Utils.isNotBlank(msgAttrs)) {
+ if (StringUtils.isNotBlank(msgAttrs)) {
msgAttrs = msgAttrs + "&";
}
EncryptInfo encryptInfo =
encryptEntry.getRsaEncryptInfo();
@@ -268,8 +269,8 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
body = EncryptUtil.aesEncrypt(body,
encryptInfo.getAesKey());
}
}
- if (Utils.isNotBlank(object.getMsgUUID())) {
- if (Utils.isNotBlank(msgAttrs)) {
+ if (StringUtils.isNotBlank(object.getMsgUUID())) {
+ if (StringUtils.isNotBlank(msgAttrs)) {
msgAttrs = msgAttrs + "&";
}
msgAttrs = msgAttrs + "msgUUID=" + object.getMsgUUID();
@@ -322,7 +323,7 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
if (object.isEncrypt()) {
EncryptConfigEntry encryptEntry = object.getEncryptEntry();
if (encryptEntry != null) {
- if (Utils.isNotBlank(msgAttrs)) {
+ if (StringUtils.isNotBlank(msgAttrs)) {
msgAttrs = msgAttrs + "&";
}
EncryptInfo encryptInfo =
encryptEntry.getRsaEncryptInfo();
@@ -332,8 +333,8 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
body = EncryptUtil.aesEncrypt(body,
encryptInfo.getAesKey());
}
}
- if (Utils.isNotBlank(object.getMsgUUID())) {
- if (Utils.isNotBlank(msgAttrs)) {
+ if (StringUtils.isNotBlank(object.getMsgUUID())) {
+ if (StringUtils.isNotBlank(msgAttrs)) {
msgAttrs = msgAttrs + "&";
}
msgAttrs = msgAttrs + "msgUUID=" + object.getMsgUUID();
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 13ef45479b..b10e533037 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -25,7 +25,7 @@ import org.apache.inlong.sdk.dataproxy.LoadBalance;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
import org.apache.inlong.sdk.dataproxy.network.HashRing;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
+import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
@@ -93,7 +93,6 @@ public class ProxyConfigManager extends Thread {
public static final String APPLICATION_JSON = "application/json";
private static final Logger LOGGER =
LoggerFactory.getLogger(ProxyConfigManager.class);
private final ProxyClientConfig clientConfig;
- private final String localIP;
private final ClientMgr clientManager;
private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private final JsonParser jsonParser = new JsonParser();
@@ -108,9 +107,8 @@ public class ProxyConfigManager extends Thread {
private long doworkTime = 0;
private EncryptConfigEntry userEncryConfigEntry;
- public ProxyConfigManager(final ProxyClientConfig configure, final String
localIP, final ClientMgr clientManager) {
+ public ProxyConfigManager(final ProxyClientConfig configure, final
ClientMgr clientManager) {
this.clientConfig = configure;
- this.localIP = localIP;
this.clientManager = clientManager;
this.hashRing.setVirtualNode(configure.getVirtualNode());
}
@@ -358,7 +356,7 @@ public class ProxyConfigManager extends Thread {
}
public EncryptConfigEntry getEncryptConfigEntry(final String userName) {
- if (Utils.isBlank(userName)) {
+ if (StringUtils.isBlank(userName)) {
return null;
}
EncryptConfigEntry encryptEntry = this.userEncryConfigEntry;
@@ -397,7 +395,7 @@ public class ProxyConfigManager extends Thread {
}
private void updateEncryptConfigEntry() {
- if (Utils.isBlank(this.clientConfig.getUserName())) {
+ if (StringUtils.isBlank(this.clientConfig.getUserName())) {
return;
}
int retryCount = 0;
@@ -422,7 +420,7 @@ public class ProxyConfigManager extends Thread {
}
private EncryptConfigEntry getStoredPubKeyEntry(String userName) {
- if (Utils.isBlank(userName)) {
+ if (StringUtils.isBlank(userName)) {
LOGGER.warn(" userName(" + userName + ") is not available");
return null;
}
@@ -509,7 +507,7 @@ public class ProxyConfigManager extends Thread {
}
private EncryptConfigEntry requestPubKey(String pubKeyUrl, String
userName, boolean needGet) {
- if (Utils.isBlank(userName)) {
+ if (StringUtils.isBlank(userName)) {
LOGGER.error("Queried userName is null!");
return null;
}
@@ -517,7 +515,7 @@ public class ProxyConfigManager extends Thread {
params.add(new BasicNameValuePair("operation", "query"));
params.add(new BasicNameValuePair("username", userName));
String returnStr = requestConfiguration(pubKeyUrl, params);
- if (Utils.isBlank(returnStr)) {
+ if (StringUtils.isBlank(returnStr)) {
LOGGER.info("No public key information returned from manager");
return null;
}
@@ -543,15 +541,15 @@ public class ProxyConfigManager extends Thread {
JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
if (resultData != null) {
String publicKey = resultData.get("publicKey").getAsString();
- if (Utils.isBlank(publicKey)) {
+ if (StringUtils.isBlank(publicKey)) {
return null;
}
String username = resultData.get("username").getAsString();
- if (Utils.isBlank(username)) {
+ if (StringUtils.isBlank(username)) {
return null;
}
String versionStr = resultData.get("version").getAsString();
- if (Utils.isBlank(versionStr)) {
+ if (StringUtils.isBlank(versionStr)) {
return null;
}
return new EncryptConfigEntry(username, versionStr, publicKey);
@@ -591,7 +589,7 @@ public class ProxyConfigManager extends Thread {
public ProxyConfigEntry requestProxyList(String url) {
ArrayList<BasicNameValuePair> params = new
ArrayList<BasicNameValuePair>();
- params.add(new BasicNameValuePair("ip", this.localIP));
+ params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp()));
params.add(new BasicNameValuePair("protocolType",
clientConfig.getProtocolType()));
LOGGER.info("Begin to get configure from manager {}, param is {}",
url, params);
@@ -700,7 +698,7 @@ public class ProxyConfigManager extends Thread {
/* Request new configurations from Manager. */
private String requestConfiguration(String url, List<BasicNameValuePair>
params) {
- if (Utils.isBlank(url)) {
+ if (StringUtils.isBlank(url)) {
LOGGER.error("request url is null");
return null;
}
@@ -746,7 +744,7 @@ public class ProxyConfigManager extends Thread {
httpPost.setEntity(urlEncodedFormEntity);
HttpResponse response = httpClient.execute(httpPost);
returnStr = EntityUtils.toString(response.getEntity());
- if (Utils.isNotBlank(returnStr)
+ if (StringUtils.isNotBlank(returnStr)
&& response.getStatusLine().getStatusCode() ==
HttpStatus.SC_OK) {
LOGGER.info("Get configure from manager is " + returnStr);
return returnStr;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
index 863f197353..5a564b96cb 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.EncryptInfo;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
import io.netty.bootstrap.Bootstrap;
@@ -34,6 +33,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
@@ -55,7 +55,7 @@ public class UdpClientExample {
private static final Logger logger =
LoggerFactory.getLogger(UdpClientExample.class);
- private static SequentialID idGenerator = new
SequentialID(Utils.getLocalIp());
+ private static SequentialID idGenerator = new SequentialID();
private static SecureRandom random = new SecureRandom();
@@ -180,7 +180,7 @@ public class UdpClientExample {
if (object.isEncrypt()) {
EncryptConfigEntry encryptEntry = object.getEncryptEntry();
if (encryptEntry != null) {
- if (Utils.isNotBlank(endAttr)) {
+ if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
EncryptInfo encryptInfo =
encryptEntry.getRsaEncryptInfo();
@@ -191,14 +191,14 @@ public class UdpClientExample {
}
}
if (!object.isGroupIdTransfer()) {
- if (Utils.isNotBlank(endAttr)) {
+ if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
endAttr = (endAttr + "groupId=" + object.getGroupId() +
"&streamId="
+ object.getStreamId());
}
- if (Utils.isNotBlank(object.getMsgUUID())) {
- if (Utils.isNotBlank(endAttr)) {
+ if (StringUtils.isNotBlank(object.getMsgUUID())) {
+ if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
endAttr = endAttr + "msgUUID=" + object.getMsgUUID();
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
index 62f1e2289d..7c2738e37d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
@@ -22,7 +22,6 @@ import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.network.HttpMessage;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
import com.google.gson.JsonElement;
@@ -179,7 +178,7 @@ public class InternalHttpSender {
String returnStr = EntityUtils.toString(response.getEntity());
int returnCode = response.getStatusLine().getStatusCode();
- if (Utils.isBlank(returnStr) || HttpStatus.SC_OK != returnCode) {
+ if (StringUtils.isBlank(returnStr) || HttpStatus.SC_OK !=
returnCode) {
throw new Exception("get config from manager failed, result: "
+ returnStr + ", code: " + returnCode);
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index e1412a9936..45c95e042d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -115,7 +115,7 @@ public class ClientMgr {
bootstrap.option(ChannelOption.SO_SNDBUF,
ConfigConstants.DEFAULT_SEND_BUFFER_SIZE);
bootstrap.handler(new ClientPipelineFactory(this, sender));
/* ready to Start the thread which refreshes the proxy list. */
- ipManager = new ProxyConfigManager(configure, Utils.getLocalIp(),
this);
+ ipManager = new ProxyConfigManager(configure, this);
ipManager.setName("proxyConfigManager");
if (configure.getInlongGroupId() != null) {
ipManager.setInlongGroupId(configure.getInlongGroupId());
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index 4ef884ae8f..5d31f05f8a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -75,8 +75,7 @@ public class HttpProxySender extends Thread {
private void initTDMClientAndRequest(ProxyClientConfig configure) throws
Exception {
try {
- proxyConfigManager = new ProxyConfigManager(configure,
- Utils.getLocalIp(), null);
+ proxyConfigManager = new ProxyConfigManager(configure, null);
proxyConfigManager.setInlongGroupId(configure.getInlongGroupId());
ProxyConfigEntry proxyConfigEntry = retryGettingProxyConfig();
hostList.addAll(proxyConfigEntry.getHostMap().values());
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Utils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java
similarity index 91%
rename from
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Utils.java
rename to
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java
index 3e2fec9250..f3fbab04aa 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Utils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java
@@ -30,9 +30,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
-public class Utils {
+public class IpUtils {
- private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+ private static final Logger logger =
LoggerFactory.getLogger(IpUtils.class);
private static String userIp;
static {
@@ -69,23 +69,6 @@ public class Utils {
return true;
}
- public static boolean isNotBlank(String str) {
- return !isBlank(str);
- }
-
- public static boolean isBlank(String str) {
- int strLen;
- if (str == null || (strLen = str.length()) == 0) {
- return true;
- }
- for (int i = 0; i < strLen; i++) {
- if ((!Character.isWhitespace(str.charAt(i)))) {
- return false;
- }
- }
- return true;
- }
-
public static byte[] toBytes(String ipAddr) {
byte[] ret = new byte[4];
try {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 01ac56a53b..9f001205e3 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -224,7 +224,7 @@ public class Sender {
public SendResult syncSendMessage(EncodeObject encodeObject, String
msgUUID) {
if (configure.isEnableMetric()) {
metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(),
- encodeObject.getStreamId(), Utils.getLocalIp(),
encodeObject.getDt(),
+ encodeObject.getStreamId(), IpUtils.getLocalIp(),
encodeObject.getDt(),
encodeObject.getPackageTime(), encodeObject.getRealCnt());
}
NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(),
encodeObject);
@@ -328,7 +328,7 @@ public class Sender {
long timeout, TimeUnit timeUnit) throws ProxysdkException {
if (configure.isEnableMetric()) {
metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(),
- encodeObject.getStreamId(), Utils.getLocalIp(),
encodeObject.getPackageTime(),
+ encodeObject.getStreamId(), IpUtils.getLocalIp(),
encodeObject.getPackageTime(),
encodeObject.getDt(), encodeObject.getRealCnt());
}
// send message package time
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
index a577f0038c..0ad2813448 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
@@ -24,11 +24,10 @@ public class SequentialID {
private static final SecureRandom sRandom = new SecureRandom(
Long.toString(System.nanoTime()).getBytes());
- private final String ip;
+ private final String ip = IpUtils.getLocalIp();
private final AtomicInteger id = new AtomicInteger(sRandom.nextInt());
- public SequentialID(String theIp) {
- ip = theIp;
+ public SequentialID() {
}
public String getNextId() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/context/SinkContext.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/context/SinkContext.java
index 4c2cd285e8..366bf5ef54 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/context/SinkContext.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/context/SinkContext.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sdk.dataproxy.pb.context;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.pb.metrics.SdkMetricItem;
import org.apache.inlong.sdk.dataproxy.pb.metrics.SdkMetricItemSet;
-import org.apache.inlong.sdk.dataproxy.utils.IpUtils;
+import org.apache.inlong.sdk.dataproxy.pb.network.IpUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/IpUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/IpUtils.java
similarity index 99%
rename from
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/IpUtils.java
rename to
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/IpUtils.java
index 930f2ecc98..cea17bf9d5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/IpUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/IpUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.dataproxy.utils;
+package org.apache.inlong.sdk.dataproxy.pb.network;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index 8d79119dec..39f8562b52 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -24,9 +24,9 @@ import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.metric.MessageRecord;
import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
import org.apache.inlong.sdk.dataproxy.metric.MetricTimeNumSummary;
+import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
private static final String DEFAULT_KEY_SPLITTER = "#";
private final Logger logger =
LoggerFactory.getLogger(MetricWorkerThread.class);
- private final SequentialID idGenerator = new
SequentialID(Utils.getLocalIp());
+ private final SequentialID idGenerator = new SequentialID();
private final ConcurrentHashMap<String, MessageRecord> metricValueCache =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, MetricTimeNumSummary>
metricPackTimeMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, MetricTimeNumSummary> metricDtMap
= new ConcurrentHashMap<>();
@@ -208,7 +208,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
EncodeObject encodeObject = new
EncodeObject(Collections.singletonList(line.getBytes()), 7,
false, false, false,
dtTime, idGenerator.getNextInt(),
- metricConfig.getMetricGroupId(), streamId, "", "",
Utils.getLocalIp());
+ metricConfig.getMetricGroupId(), streamId, "", "",
IpUtils.getLocalIp());
MetricSendCallBack callBack = new MetricSendCallBack(encodeObject);
tryToSendMetricToManager(encodeObject, callBack);
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index b3b455b9d3..371dcf661c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -20,8 +20,8 @@ package org.apache.inlong.sdk.dataproxy.utils;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.ConfigConstants;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,7 +106,8 @@ public class ProxyUtils {
}
// Reserve space for attribute
if (body.length > maxLen - ConfigConstants.RESERVED_ATTRIBUTE_LENGTH) {
- logger.debug("body length is too long, max length is {}", maxLen);
+ logger.debug("body length({}) > max length({}) - fixed attribute
length({})",
+ body.length, maxLen,
ConfigConstants.RESERVED_ATTRIBUTE_LENGTH);
return false;
}
return true;
@@ -126,11 +127,12 @@ public class ProxyUtils {
int size = 0;
for (byte[] body : bodyList) {
size += body.length;
- // Reserve space for attribute
- if (size > maxLen - ConfigConstants.RESERVED_ATTRIBUTE_LENGTH) {
- logger.debug("body length is too long, max length is {}",
maxLen);
- return false;
- }
+ }
+ // Reserve space for attribute
+ if (size > maxLen - ConfigConstants.RESERVED_ATTRIBUTE_LENGTH) {
+ logger.debug("bodyList length({}) > max length({}) - fixed
attribute length({})",
+ size, maxLen, ConfigConstants.RESERVED_ATTRIBUTE_LENGTH);
+ return false;
}
return true;
}
@@ -149,10 +151,10 @@ public class ProxyUtils {
*/
public static void validClientConfig(ProxyClientConfig clientConfig) {
if (clientConfig.isNeedAuthentication()) {
- if (Utils.isBlank(clientConfig.getUserName())) {
+ if (StringUtils.isBlank(clientConfig.getUserName())) {
throw new IllegalArgumentException("Authentication require
userName not Blank!");
}
- if (Utils.isBlank(clientConfig.getSecretKey())) {
+ if (StringUtils.isBlank(clientConfig.getSecretKey())) {
throw new IllegalArgumentException("Authentication require
secretKey not Blank!");
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java
index 8287608549..6352e25308 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sdk.dataproxy.utils;
import org.apache.inlong.common.util.BasicAuth;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
+import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
@@ -187,13 +187,13 @@ public class ServiceDiscoveryUtils {
long timestamp = System.currentTimeMillis();
int nonce = new
SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE);
httpPost.setHeader(BasicAuth.BASIC_AUTH_HEADER,
-
Utils.getAuthorizenInfo(proxyClientConfig.getUserName(),
+
IpUtils.getAuthorizenInfo(proxyClientConfig.getUserName(),
proxyClientConfig.getSecretKey(), timestamp,
nonce));
}
httpPost.setEntity(new UrlEncodedFormEntity(params));
HttpResponse response = httpClient.execute(httpPost);
String returnStr = EntityUtils.toString(response.getEntity());
- if (Utils.isNotBlank(returnStr) &&
response.getStatusLine().getStatusCode() == 200) {
+ if (StringUtils.isNotBlank(returnStr) &&
response.getStatusLine().getStatusCode() == 200) {
log.info("Get configure from manager is " + returnStr);
JsonParser jsonParser = new JsonParser();
JsonObject jb = jsonParser.parse(returnStr).getAsJsonObject();
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
index 75b6230a97..5f3ba92c75 100644
---
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
@@ -36,8 +36,7 @@ public class ProxyConfigManagerTest {
.toString();
private final ProxyClientConfig clientConfig =
PowerMockito.mock(ProxyClientConfig.class);
private final ClientMgr clientMgr = PowerMockito.mock(ClientMgr.class);
- private final ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(clientConfig, "127.0.0.1",
- clientMgr);
+ private final ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(clientConfig, clientMgr);
public ProxyConfigManagerTest() throws URISyntaxException {
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java
index 434e3410df..b1a0ccd7bb 100644
---
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sdk.dataproxy;
-import org.apache.inlong.sdk.dataproxy.network.Utils;
+import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -26,7 +26,7 @@ public class UtilsTest {
@Test
public void getLocalIp() {
- String ip = Utils.getLocalIp();
+ String ip = IpUtils.getLocalIp();
Assert.assertNotNull(ip);
}