This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new feb0ad5309 [INLONG-10463][SDK] Optimization of ultra-long field
processing in InlongSDK (#11119)
feb0ad5309 is described below
commit feb0ad5309db9f05060edad4efcde783d3c142a3
Author: Haotian Ma <[email protected]>
AuthorDate: Sat Oct 12 09:41:25 2024 +0800
[INLONG-10463][SDK] Optimization of ultra-long field processing in
InlongSDK (#11119)
---
.../inlong/sdk/dataproxy/ConfigConstants.java | 3 ++
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 46 ++++++++++++++++++++++
.../sdk/dataproxy/config/ProxyConfigEntry.java | 12 +++++-
.../sdk/dataproxy/config/ProxyConfigManager.java | 3 +-
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 43 ++++++++++++++++++++
5 files changed, 105 insertions(+), 2 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index 45b7b4056d..40d71d2859 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -76,4 +76,7 @@ public class ConfigConstants {
public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;
+ /* Reserved attribute data size(bytes). */
+ public static int RESERVED_ATTRIBUTE_LENGTH = 10000;
+
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 8a833a69cb..ec3eff3bad 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -65,6 +65,7 @@ public class DefaultMessageSender implements MessageSender {
private boolean isGroupIdTransfer = false;
private boolean isReport = false;
private boolean isSupportLF = false;
+ private int maxPacketLength;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
private final int senderMaxAttempt;
@@ -119,6 +120,7 @@ public class DefaultMessageSender implements MessageSender {
} else {
DefaultMessageSender tmpMessageSender =
new DefaultMessageSender(configure, selfDefineFactory);
+ tmpMessageSender.setMaxPacketLength(entry.getMaxPacketLength());
CACHE_SENDER.put(entry.getClusterId(), tmpMessageSender);
return tmpMessageSender;
}
@@ -190,6 +192,14 @@ public class DefaultMessageSender implements MessageSender
{
this.groupId = groupId;
}
+ public int getMaxPacketLength() {
+ return maxPacketLength;
+ }
+
+ public void setMaxPacketLength(int maxPacketLength) {
+ this.maxPacketLength = maxPacketLength;
+ }
+
public String getSDKVersion() {
return ConfigConstants.PROXY_SDK_VERSION;
}
@@ -253,6 +263,9 @@ public class DefaultMessageSender implements MessageSender {
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
}
+ if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+ return SendResult.BODY_EXCEED_MAX_LEN;
+ }
addIndexCnt(groupId, streamId, 1);
String proxySend = "";
@@ -322,6 +335,9 @@ public class DefaultMessageSender implements MessageSender {
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES;
}
+ if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+ return SendResult.BODY_EXCEED_MAX_LEN;
+ }
addIndexCnt(groupId, streamId, 1);
if (isProxySend) {
@@ -382,6 +398,9 @@ public class DefaultMessageSender implements MessageSender {
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
}
+ if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+ return SendResult.BODY_EXCEED_MAX_LEN;
+ }
addIndexCnt(groupId, streamId, bodyList.size());
String proxySend = "";
@@ -448,6 +467,9 @@ public class DefaultMessageSender implements MessageSender {
extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES;
}
+ if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+ return SendResult.BODY_EXCEED_MAX_LEN;
+ }
addIndexCnt(groupId, streamId, bodyList.size());
if (isProxySend) {
extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
@@ -514,6 +536,9 @@ public class DefaultMessageSender implements MessageSender {
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
+ if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+ throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+ }
addIndexCnt(groupId, streamId, 1);
String proxySend = "";
@@ -576,6 +601,9 @@ public class DefaultMessageSender implements MessageSender {
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
+ if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+ throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+ }
addIndexCnt(groupId, streamId, 1);
if (isProxySend) {
extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
@@ -630,6 +658,9 @@ public class DefaultMessageSender implements MessageSender {
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
+ if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+ throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+ }
addIndexCnt(groupId, streamId, bodyList.size());
String proxySend = "";
if (isProxySend) {
@@ -693,6 +724,9 @@ public class DefaultMessageSender implements MessageSender {
extraAttrMap)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
+ if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+ throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+ }
addIndexCnt(groupId, streamId, bodyList.size());
if (isProxySend) {
extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
@@ -808,6 +842,9 @@ public class DefaultMessageSender implements MessageSender {
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
+ if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+ throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+ }
addIndexCnt(groupId, streamId, bodyList.size());
StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
@@ -828,6 +865,9 @@ public class DefaultMessageSender implements MessageSender {
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
+ if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+ throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+ }
boolean isCompressEnd = false;
if (msgtype == 7 || msgtype == 8) {
sender.asyncSendMessageIndex(new EncodeObject(body, msgtype,
isCompressEnd,
@@ -858,6 +898,9 @@ public class DefaultMessageSender implements MessageSender {
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES.toString();
}
+ if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+ return SendResult.BODY_EXCEED_MAX_LEN.toString();
+ }
addIndexCnt(groupId, streamId, bodyList.size());
StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
@@ -881,6 +924,9 @@ public class DefaultMessageSender implements MessageSender {
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES.toString();
}
+ if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+ return SendResult.BODY_EXCEED_MAX_LEN.toString();
+ }
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype, false,
isReport,
isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "",
messageKey, ip);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
index 349617152c..441c62abe5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
@@ -28,6 +28,15 @@ public class ProxyConfigEntry implements
java.io.Serializable {
private int load;
private int switchStat;
private boolean isInterVisit;
+ private int maxPacketLength;
+
+ public int getMaxPacketLength() {
+ return maxPacketLength;
+ }
+
+ public void setMaxPacketLength(int maxPacketLength) {
+ this.maxPacketLength = maxPacketLength;
+ }
public int getLoad() {
return load;
@@ -81,7 +90,8 @@ public class ProxyConfigEntry implements java.io.Serializable
{
@Override
public String toString() {
return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ",
size=" + size + ", isInterVisit="
- + isInterVisit + ", groupId=" + groupId + ", switch=" +
switchStat + "]";
+ + isInterVisit + ", groupId=" + groupId + ", switch=" +
switchStat + ", maxPacketLength="
+ + maxPacketLength + "]";
}
public int getClusterId() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 25c93d6a60..13ef45479b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -632,7 +632,6 @@ public class ProxyConfigManager extends Thread {
if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
isSwitch = proxyCluster.getIsSwitch();
}
-
ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
proxyEntry.setClusterId(clusterId);
proxyEntry.setGroupId(clientConfig.getInlongGroupId());
@@ -641,6 +640,8 @@ public class ProxyConfigManager extends Thread {
proxyEntry.setSwitchStat(isSwitch);
proxyEntry.setLoad(load);
proxyEntry.setSize(nodeList.size());
+ proxyEntry.setMaxPacketLength(
+ proxyCluster.getMaxPacketLength() != null ?
proxyCluster.getMaxPacketLength() : -1);
return proxyEntry;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 1626f0bbf9..b3b455b9d3 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.dataproxy.utils;
import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.sdk.dataproxy.ConfigConstants;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.network.Utils;
@@ -92,6 +93,48 @@ public class ProxyUtils {
return true;
}
+ /**
+ * Check if the body length exceeds the maximum limit, if the maximum
limit is less than 0, it is not checked
+ * @param body
+ * @param maxLen
+ * @return
+ */
+ public static boolean isBodyLengthValid(byte[] body, int maxLen) {
+ // Not valid if the maximum limit is less than or equal to 0
+ if (maxLen < 0) {
+ return true;
+ }
+ // Reserve space for attribute
+ if (body.length > maxLen - ConfigConstants.RESERVED_ATTRIBUTE_LENGTH) {
+ logger.debug("body length is too long, max length is {}", maxLen);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Check if the total body length exceeds the maximum limit, if the
maximum limit is less than 0, it is not checked
+ * @param bodyList
+ * @param maxLen
+ * @return
+ */
+ public static boolean isBodyLengthValid(List<byte[]> bodyList, int maxLen)
{
+ // Not valid if the maximum limit is less than or equal to 0
+ if (maxLen < 0) {
+ return true;
+ }
+ int size = 0;
+ for (byte[] body : bodyList) {
+ size += body.length;
+ // Reserve space for attribute
+ if (size > maxLen - ConfigConstants.RESERVED_ATTRIBUTE_LENGTH) {
+ logger.debug("body length is too long, max length is {}",
maxLen);
+ return false;
+ }
+ }
+ return true;
+ }
+
public static long covertZeroDt(long dt) {
if (dt == 0) {
return System.currentTimeMillis();