This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 583e658b20 [INLONG-11675][SDK] Optimize IpUtils class related
implementation (#11676)
583e658b20 is described below
commit 583e658b20f4fce04819ff180d9802c1a1b24a3a
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Jan 16 09:49:47 2025 +0800
[INLONG-11675][SDK] Optimize IpUtils class related implementation (#11676)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 7 +-
.../sdk/dataproxy/codec/ProtocolEncoder.java | 7 +-
.../sdk/dataproxy/config/ProxyConfigManager.java | 4 +-
.../sdk/dataproxy/example/HttpClientExample.java | 7 +-
.../sdk/dataproxy/example/TcpClientExample.java | 8 +-
.../inlong/sdk/dataproxy/network/ClientMgr.java | 3 +-
.../inlong/sdk/dataproxy/network/IpUtils.java | 139 ---------------------
.../inlong/sdk/dataproxy/network/Sender.java | 5 +-
.../inlong/sdk/dataproxy/network/SequentialID.java | 4 +-
.../sdk/dataproxy/threads/MetricWorkerThread.java | 4 +-
.../inlong/sdk/dataproxy/utils/AuthzUtils.java | 51 ++++++++
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 24 ++++
.../{UtilsTest.java => ProxyUtilsTest.java} | 6 +-
.../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 3 +-
14 files changed, 102 insertions(+), 170 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index fedf925aa1..efb5ac940b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -19,7 +19,6 @@ package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
-import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
@@ -98,11 +97,8 @@ public class ProxyClientConfig {
private int senderMaxAttempt = ConfigConstants.DEFAULT_SENDER_MAX_ATTEMPT;
/* pay attention to the last url parameter ip */
- public ProxyClientConfig(String localHost, boolean visitManagerByHttp,
String managerIp,
+ public ProxyClientConfig(boolean visitManagerByHttp, String managerIp,
int managerPort, String inlongGroupId, String authSecretId, String
authSecretKey) throws ProxySdkException {
- if (StringUtils.isBlank(localHost)) {
- throw new ProxySdkException("localHost is blank!");
- }
if (StringUtils.isBlank(managerIp)) {
throw new ProxySdkException("managerIp is Blank!");
}
@@ -116,7 +112,6 @@ public class ProxyClientConfig {
this.visitManagerByHttp = visitManagerByHttp;
this.managerPort = managerPort;
this.managerIP = managerIp;
- IpUtils.validLocalIp(localHost);
this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE;
this.proxyHttpUpdateIntervalMinutes =
ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
index 1a20766e55..bf1cc99502 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
@@ -20,9 +20,10 @@ package org.apache.inlong.sdk.dataproxy.codec;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.EncryptInfo;
-import org.apache.inlong.sdk.dataproxy.network.IpUtils;
+import org.apache.inlong.sdk.dataproxy.utils.AuthzUtils;
import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -84,8 +85,8 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
}
long timestamp = System.currentTimeMillis();
int nonce = new
SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE);
- endAttr = endAttr + "_userName=" + object.getUserName() +
"&_clientIP=" + IpUtils.getLocalIp()
- + "&_signature=" +
IpUtils.generateSignature(object.getUserName(),
+ endAttr = endAttr + "_userName=" + object.getUserName() +
"&_clientIP=" + ProxyUtils.getLocalIp()
+ + "&_signature=" +
AuthzUtils.generateSignature(object.getUserName(),
timestamp, nonce, object.getSecretKey())
+ "&_timeStamp=" + timestamp + "&_nonce=" + nonce;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 8ff1dc0ea9..27cc4ee8ef 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -23,8 +23,8 @@ import org.apache.inlong.common.util.BasicAuth;
import org.apache.inlong.sdk.dataproxy.ConfigConstants;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
-import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import com.google.gson.Gson;
@@ -850,7 +850,7 @@ public class ProxyConfigManager extends Thread {
private List<BasicNameValuePair> buildProxyNodeQueryParams() {
ArrayList<BasicNameValuePair> params = new ArrayList<>();
- params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp()));
+ params.add(new BasicNameValuePair("ip", ProxyUtils.getLocalIp()));
params.add(new BasicNameValuePair("protocolType",
clientConfig.getProtocolType()));
return params;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
index c22c3aed98..d370623f8d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
@@ -34,24 +34,23 @@ public class HttpClientExample {
String configBasePath = "";
String inLongManagerAddr = "127.0.0.1";
String inLongManagerPort = "8083";
- String localIP = "127.0.0.1";
String messageBody = "inlong message body!";
- HttpProxySender sender = getMessageSender(localIP, inLongManagerAddr,
+ HttpProxySender sender = getMessageSender(inLongManagerAddr,
inLongManagerPort, inlongGroupId, true, false,
configBasePath);
sendHttpMessage(sender, inlongGroupId, inlongStreamId, messageBody);
sender.close(); // close the sender
}
- public static HttpProxySender getMessageSender(String localIP, String
inLongManagerAddr,
+ public static HttpProxySender getMessageSender(String inLongManagerAddr,
String inLongManagerPort, String inlongGroupId,
boolean requestByHttp, boolean isReadProxyIPFromLocal,
String configBasePath) {
ProxyClientConfig proxyConfig = null;
HttpProxySender sender = null;
try {
- proxyConfig = new ProxyClientConfig(localIP, requestByHttp,
inLongManagerAddr,
+ proxyConfig = new ProxyClientConfig(requestByHttp,
inLongManagerAddr,
Integer.valueOf(inLongManagerPort),
inlongGroupId, "admin", "inlong");// user and password of
manager
proxyConfig.setConfigStoreBasePath(configBasePath);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index 85012af172..a85086ac38 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -32,8 +32,6 @@ public class TcpClientExample {
private static final Logger logger =
LoggerFactory.getLogger(TcpClientExample.class);
- public static String localIP = "127.0.0.1";
-
/**
* Example of client tcp.
*/
@@ -54,20 +52,20 @@ public class TcpClientExample {
TcpClientExample tcpClientExample = new TcpClientExample();
DefaultMessageSender sender = tcpClientExample
- .getMessageSender(localIP, inLongManagerAddr,
inLongManagerPort,
+ .getMessageSender(inLongManagerAddr, inLongManagerPort,
inlongGroupId, true, false, configBasePath, msgType);
tcpClientExample.sendTcpMessage(sender, inlongGroupId, inlongStreamId,
messageBody, System.currentTimeMillis());
sender.close(); // close the sender
}
- public DefaultMessageSender getMessageSender(String localIP, String
inLongManagerAddr, String inLongManagerPort,
+ public DefaultMessageSender getMessageSender(String inLongManagerAddr,
String inLongManagerPort,
String inlongGroupId, boolean requestByHttp, boolean
isReadProxyIPFromLocal,
String configBasePath, int msgType) {
ProxyClientConfig dataProxyConfig = null;
DefaultMessageSender messageSender = null;
try {
- dataProxyConfig = new ProxyClientConfig(localIP, requestByHttp,
inLongManagerAddr,
+ dataProxyConfig = new ProxyClientConfig(requestByHttp,
inLongManagerAddr,
Integer.valueOf(inLongManagerPort), inlongGroupId,
"admin", "inlong");
if (StringUtils.isNotEmpty(configBasePath)) {
dataProxyConfig.setConfigStoreBasePath(configBasePath);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index 27003f6411..608fcf67d4 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -26,6 +26,7 @@ import
org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import io.netty.bootstrap.Bootstrap;
@@ -55,7 +56,7 @@ public class ClientMgr {
private static final LogCounter logCounter = new LogCounter(10, 100000, 60
* 1000L);
private static final LogCounter updConExptCnt = new LogCounter(10, 100000,
60 * 1000L);
private static final LogCounter exptCounter = new LogCounter(10, 100000,
60 * 1000L);
- private static final byte[] hbMsgBody =
IpUtils.getLocalIp().getBytes(StandardCharsets.UTF_8);
+ private static final byte[] hbMsgBody =
ProxyUtils.getLocalIp().getBytes(StandardCharsets.UTF_8);
private final Sender sender;
private final ProxyClientConfig configure;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java
deleted file mode 100644
index 90a0716772..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.network;
-
-import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.codec.digest.HmacUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.URLEncoder;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-public class IpUtils {
-
- private static final Logger logger =
LoggerFactory.getLogger(IpUtils.class);
- private static String userIp;
-
- static {
- userIp = getLocalIp();
- }
-
- public static String getLocalIp() {
- if (userIp != null) {
- return userIp;
- }
- String ip = "127.0.0.1";
- try (DatagramSocket socket = new DatagramSocket()) {
- socket.connect(InetAddress.getByName("8.8.8.8"), 10002);
- ip = socket.getLocalAddress().getHostAddress();
- } catch (Exception ignored) {
- logger.warn("getLocalIp ", ignored);
- }
- userIp = ip;
- return ip;
- }
-
- public static boolean validLocalIp(String currLocalHost) throws
ProxySdkException {
- String ip = "127.0.0.1";
- try (DatagramSocket socket = new DatagramSocket()) {
- socket.connect(InetAddress.getByName("8.8.8.8"), 10002);
- ip = socket.getLocalAddress().getHostAddress();
- } catch (Exception ex) {
- logger.error("error while get local ip", ex);
- }
- if (!ip.equals(currLocalHost)) {
- logger.warn("ip is not equal {} {}", currLocalHost, ip);
- }
- userIp = ip;
- return true;
- }
-
- public static byte[] toBytes(String ipAddr) {
- byte[] ret = new byte[4];
- try {
- String[] ipArr = ipAddr.split("\\.");
- ret[0] = (byte) (Integer.parseInt(ipArr[0]) & 0xFF);
- ret[1] = (byte) (Integer.parseInt(ipArr[1]) & 0xFF);
- ret[2] = (byte) (Integer.parseInt(ipArr[2]) & 0xFF);
- ret[3] = (byte) (Integer.parseInt(ipArr[3]) & 0xFF);
- return ret;
- } catch (Exception e) {
- throw new IllegalArgumentException(ipAddr + " is invalid IP");
- }
- }
-
- public static int bytesToInt(byte[] bytes) {
- int addr = bytes[3] & 0xFF;
- addr |= ((bytes[2] << 8) & 0xFF00);
- addr |= ((bytes[1] << 16) & 0xFF0000);
- addr |= ((bytes[0] << 24) & 0xFF000000);
- return addr;
- }
-
- public static String convertListToString(List<String> list, Character ch) {
- if (list == null || list.isEmpty()) {
- return "";
- }
- StringBuilder sb = new StringBuilder();
- Iterator itr = list.iterator();
- sb.append(itr.next());
- while (itr.hasNext()) {
- sb.append(ch).append(itr.next());
- }
- return sb.toString();
- }
-
- public static String generateSignature(String secureId, long timestamp,
int randomValue, String secureKey) {
- Base64 base64 = new Base64();
- byte[] baseStr = base64.encode(HmacUtils.hmacSha1(secureKey, secureId
+ timestamp + randomValue));
- String result = "";
- try {
- result = URLEncoder.encode(new String(baseStr), "UTF-8");
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- return result;
- }
-
- public static String getAuthorizenInfo(final String secretId, final String
secretKey, long timestamp, int nonce) {
- String signature = generateSignature(secretId, timestamp, nonce,
secretKey);
- return "manager " + secretId + " " + timestamp + " " + nonce + " " +
signature;
- }
-
- public static String convertSetToString(Set<String> list, Character ch) {
- if (list == null || list.isEmpty()) {
- return "";
- }
- StringBuilder sb = new StringBuilder();
- Iterator itr = list.iterator();
- sb.append(itr.next());
- while (itr.hasNext()) {
- sb.append(ch).append(itr.next());
- }
- return sb.toString();
- }
-
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 088ecb3d60..4ba988b19f 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -26,6 +26,7 @@ import
org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
import org.apache.inlong.sdk.dataproxy.threads.TimeoutScanThread;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import io.netty.channel.Channel;
@@ -167,7 +168,7 @@ public class Sender {
}
if (configure.isEnableMetric()) {
metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(),
- encodeObject.getStreamId(), IpUtils.getLocalIp(),
encodeObject.getDt(),
+ encodeObject.getStreamId(), ProxyUtils.getLocalIp(),
encodeObject.getDt(),
encodeObject.getPackageTime(), encodeObject.getRealCnt());
}
SendResult message;
@@ -320,7 +321,7 @@ public class Sender {
}
if (configure.isEnableMetric()) {
metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(),
- encodeObject.getStreamId(), IpUtils.getLocalIp(),
encodeObject.getPackageTime(),
+ encodeObject.getStreamId(), ProxyUtils.getLocalIp(),
encodeObject.getPackageTime(),
encodeObject.getDt(), encodeObject.getRealCnt());
}
// send message package time
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
index 0ad2813448..f4a396b940 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.dataproxy.network;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,7 +26,7 @@ public class SequentialID {
private static final SecureRandom sRandom = new SecureRandom(
Long.toString(System.nanoTime()).getBytes());
- private final String ip = IpUtils.getLocalIp();
+ private final String ip = ProxyUtils.getLocalIp();
private final AtomicInteger id = new AtomicInteger(sRandom.nextInt());
public SequentialID() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index 4d5459b8e1..a974adc803 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -24,9 +24,9 @@ import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.metric.MessageRecord;
import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
import org.apache.inlong.sdk.dataproxy.metric.MetricTimeNumSummary;
-import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -207,7 +207,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
EncodeObject encodeObject = new
EncodeObject(Collections.singletonList(line.getBytes()), 7,
false, false, false,
dtTime, idGenerator.getNextInt(),
- metricConfig.getMetricGroupId(), streamId, "", "",
IpUtils.getLocalIp());
+ metricConfig.getMetricGroupId(), streamId, "", "",
ProxyUtils.getLocalIp());
MetricSendCallBack callBack = new MetricSendCallBack(encodeObject);
tryToSendMetricToManager(encodeObject, callBack);
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/AuthzUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/AuthzUtils.java
new file mode 100644
index 0000000000..a5ed8d5fbe
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/AuthzUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.utils;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.digest.HmacUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Authenticate Utils class
+ *
+ * Used to place public processing functions related to authentication and
signature
+ */
+public class AuthzUtils {
+
+ private static final Logger logger =
LoggerFactory.getLogger(AuthzUtils.class);
+ private static final LogCounter exptCounter = new LogCounter(10, 200000,
60 * 1000L);
+
+ public static String generateSignature(String secureId, long timestamp,
int randomValue, String secureKey) {
+ Base64 base64 = new Base64();
+ byte[] baseStr = base64.encode(HmacUtils.hmacSha1(secureKey, secureId
+ timestamp + randomValue));
+ String result = "";
+ try {
+ result = URLEncoder.encode(new String(baseStr),
StandardCharsets.UTF_8.toString());
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("Generate signature throw exception", ex);
+ }
+ }
+ return result;
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index b7bd42ab2b..6da97d4750 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -34,16 +36,38 @@ import java.util.Set;
public class ProxyUtils {
private static final Logger logger =
LoggerFactory.getLogger(ProxyUtils.class);
+ private static final LogCounter exceptCounter = new LogCounter(10, 200000,
60 * 1000L);
+
private static final int TIME_LENGTH = 13;
private static final Set<String> invalidAttr = new HashSet<>();
+ private static String localHost;
+
static {
+ localHost = getLocalIp();
Collections.addAll(invalidAttr, "groupId", "streamId", "dt",
"msgUUID", "cp",
"cnt", "mt", "m", "sid", "t", "NodeIP", "messageId",
"_file_status_check", "_secretId",
"_signature", "_timeStamp", "_nonce", "_userName",
"_clientIP", "_encyVersion", "_encyAesKey",
"proxySend", "errMsg", "errCode",
AttributeConstants.MSG_RPT_TIME);
}
+ public static String getLocalIp() {
+ if (localHost != null) {
+ return localHost;
+ }
+ String ip = "127.0.0.1";
+ try (DatagramSocket socket = new DatagramSocket()) {
+ socket.connect(InetAddress.getByName("8.8.8.8"), 10002);
+ ip = socket.getLocalAddress().getHostAddress();
+ } catch (Throwable ex) {
+ if (exceptCounter.shouldPrint()) {
+ logger.error("DataProxy-SDK get local IP failure", ex);
+ }
+ }
+ localHost = ip;
+ return ip;
+ }
+
public static boolean isAttrKeysValid(Map<String, String> attrsMap) {
if (attrsMap == null || attrsMap.size() == 0) {
return false;
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
similarity index 88%
rename from
inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java
rename to
inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
index b1a0ccd7bb..5755aac08d 100644
---
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
@@ -17,16 +17,16 @@
package org.apache.inlong.sdk.dataproxy;
-import org.apache.inlong.sdk.dataproxy.network.IpUtils;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.junit.Assert;
import org.junit.Test;
-public class UtilsTest {
+public class ProxyUtilsTest {
@Test
public void getLocalIp() {
- String ip = IpUtils.getLocalIp();
+ String ip = ProxyUtils.getLocalIp();
Assert.assertNotNull(ip);
}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 1965ef37e3..b029392a49 100644
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -27,7 +27,6 @@ import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import java.net.InetAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -60,7 +59,7 @@ public class InlongSdkDirtySender {
Preconditions.checkNotNull(authKey, "authKey cannot be null");
ProxyClientConfig proxyClientConfig =
- new
ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true,
+ new ProxyClientConfig(true,
inlongManagerAddr, inlongManagerPort, inlongGroupId,
authId, authKey);
proxyClientConfig.setOnlyUseLocalProxyConfig(false);
proxyClientConfig.setAsyncCallbackSize(maxCallbackSize);