This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new feb0ad5309 [INLONG-10463][SDK] Optimization of ultra-long field 
processing in InlongSDK (#11119)
feb0ad5309 is described below

commit feb0ad5309db9f05060edad4efcde783d3c142a3
Author: Haotian Ma <[email protected]>
AuthorDate: Sat Oct 12 09:41:25 2024 +0800

    [INLONG-10463][SDK] Optimization of ultra-long field processing in 
InlongSDK (#11119)
---
 .../inlong/sdk/dataproxy/ConfigConstants.java      |  3 ++
 .../inlong/sdk/dataproxy/DefaultMessageSender.java | 46 ++++++++++++++++++++++
 .../sdk/dataproxy/config/ProxyConfigEntry.java     | 12 +++++-
 .../sdk/dataproxy/config/ProxyConfigManager.java   |  3 +-
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     | 43 ++++++++++++++++++++
 5 files changed, 105 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index 45b7b4056d..40d71d2859 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -76,4 +76,7 @@ public class ConfigConstants {
 
     public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;
 
+    /* Reserved attribute data size(bytes). */
+    public static int RESERVED_ATTRIBUTE_LENGTH = 10000;
+
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 8a833a69cb..ec3eff3bad 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -65,6 +65,7 @@ public class DefaultMessageSender implements MessageSender {
     private boolean isGroupIdTransfer = false;
     private boolean isReport = false;
     private boolean isSupportLF = false;
+    private int maxPacketLength;
     private int cpsSize = ConfigConstants.COMPRESS_SIZE;
     private final int senderMaxAttempt;
 
@@ -119,6 +120,7 @@ public class DefaultMessageSender implements MessageSender {
         } else {
             DefaultMessageSender tmpMessageSender =
                     new DefaultMessageSender(configure, selfDefineFactory);
+            tmpMessageSender.setMaxPacketLength(entry.getMaxPacketLength());
             CACHE_SENDER.put(entry.getClusterId(), tmpMessageSender);
             return tmpMessageSender;
         }
@@ -190,6 +192,14 @@ public class DefaultMessageSender implements MessageSender 
{
         this.groupId = groupId;
     }
 
+    public int getMaxPacketLength() {
+        return maxPacketLength;
+    }
+
+    public void setMaxPacketLength(int maxPacketLength) {
+        this.maxPacketLength = maxPacketLength;
+    }
+
     public String getSDKVersion() {
         return ConfigConstants.PROXY_SDK_VERSION;
     }
@@ -253,6 +263,9 @@ public class DefaultMessageSender implements MessageSender {
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
+        if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+            return SendResult.BODY_EXCEED_MAX_LEN;
+        }
         addIndexCnt(groupId, streamId, 1);
 
         String proxySend = "";
@@ -322,6 +335,9 @@ public class DefaultMessageSender implements MessageSender {
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || 
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
+        if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+            return SendResult.BODY_EXCEED_MAX_LEN;
+        }
         addIndexCnt(groupId, streamId, 1);
 
         if (isProxySend) {
@@ -382,6 +398,9 @@ public class DefaultMessageSender implements MessageSender {
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
+        if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+            return SendResult.BODY_EXCEED_MAX_LEN;
+        }
         addIndexCnt(groupId, streamId, bodyList.size());
 
         String proxySend = "";
@@ -448,6 +467,9 @@ public class DefaultMessageSender implements MessageSender {
                 extraAttrMap)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
+        if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+            return SendResult.BODY_EXCEED_MAX_LEN;
+        }
         addIndexCnt(groupId, streamId, bodyList.size());
         if (isProxySend) {
             extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
@@ -514,6 +536,9 @@ public class DefaultMessageSender implements MessageSender {
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
+        if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+            throw new 
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+        }
         addIndexCnt(groupId, streamId, 1);
 
         String proxySend = "";
@@ -576,6 +601,9 @@ public class DefaultMessageSender implements MessageSender {
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || 
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
+        if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+            throw new 
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+        }
         addIndexCnt(groupId, streamId, 1);
         if (isProxySend) {
             extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
@@ -630,6 +658,9 @@ public class DefaultMessageSender implements MessageSender {
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
+        if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+            throw new 
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+        }
         addIndexCnt(groupId, streamId, bodyList.size());
         String proxySend = "";
         if (isProxySend) {
@@ -693,6 +724,9 @@ public class DefaultMessageSender implements MessageSender {
                 extraAttrMap)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
+        if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+            throw new 
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+        }
         addIndexCnt(groupId, streamId, bodyList.size());
         if (isProxySend) {
             extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
@@ -808,6 +842,9 @@ public class DefaultMessageSender implements MessageSender {
                 || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
+        if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+            throw new 
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+        }
         addIndexCnt(groupId, streamId, bodyList.size());
 
         StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
@@ -828,6 +865,9 @@ public class DefaultMessageSender implements MessageSender {
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
+        if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+            throw new 
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+        }
         boolean isCompressEnd = false;
         if (msgtype == 7 || msgtype == 8) {
             sender.asyncSendMessageIndex(new EncodeObject(body, msgtype, 
isCompressEnd,
@@ -858,6 +898,9 @@ public class DefaultMessageSender implements MessageSender {
                 || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             return SendResult.INVALID_ATTRIBUTES.toString();
         }
+        if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
+            return SendResult.BODY_EXCEED_MAX_LEN.toString();
+        }
         addIndexCnt(groupId, streamId, bodyList.size());
 
         StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
@@ -881,6 +924,9 @@ public class DefaultMessageSender implements MessageSender {
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES.toString();
         }
+        if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
+            return SendResult.BODY_EXCEED_MAX_LEN.toString();
+        }
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, msgtype, false, 
isReport,
                     isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", 
messageKey, ip);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
index 349617152c..441c62abe5 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
@@ -28,6 +28,15 @@ public class ProxyConfigEntry implements 
java.io.Serializable {
     private int load;
     private int switchStat;
     private boolean isInterVisit;
+    private int maxPacketLength;
+
+    public int getMaxPacketLength() {
+        return maxPacketLength;
+    }
+
+    public void setMaxPacketLength(int maxPacketLength) {
+        this.maxPacketLength = maxPacketLength;
+    }
 
     public int getLoad() {
         return load;
@@ -81,7 +90,8 @@ public class ProxyConfigEntry implements java.io.Serializable 
{
     @Override
     public String toString() {
         return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", 
size=" + size + ", isInterVisit="
-                + isInterVisit + ", groupId=" + groupId + ", switch=" + 
switchStat + "]";
+                + isInterVisit + ", groupId=" + groupId + ", switch=" + 
switchStat + ", maxPacketLength="
+                + maxPacketLength + "]";
     }
 
     public int getClusterId() {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 25c93d6a60..13ef45479b 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -632,7 +632,6 @@ public class ProxyConfigManager extends Thread {
         if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
             isSwitch = proxyCluster.getIsSwitch();
         }
-
         ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
         proxyEntry.setClusterId(clusterId);
         proxyEntry.setGroupId(clientConfig.getInlongGroupId());
@@ -641,6 +640,8 @@ public class ProxyConfigManager extends Thread {
         proxyEntry.setSwitchStat(isSwitch);
         proxyEntry.setLoad(load);
         proxyEntry.setSize(nodeList.size());
+        proxyEntry.setMaxPacketLength(
+                proxyCluster.getMaxPacketLength() != null ? 
proxyCluster.getMaxPacketLength() : -1);
         return proxyEntry;
     }
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 1626f0bbf9..b3b455b9d3 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.dataproxy.utils;
 
 import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.sdk.dataproxy.ConfigConstants;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.network.Utils;
 
@@ -92,6 +93,48 @@ public class ProxyUtils {
         return true;
     }
 
+    /**
+     * Check if the body length exceeds the maximum limit, if the maximum 
limit is less than 0, it is not checked
+     * @param body
+     * @param maxLen
+     * @return
+     */
+    public static boolean isBodyLengthValid(byte[] body, int maxLen) {
+        // Not valid if the maximum limit is less than or equal to 0
+        if (maxLen < 0) {
+            return true;
+        }
+        // Reserve space for attribute
+        if (body.length > maxLen - ConfigConstants.RESERVED_ATTRIBUTE_LENGTH) {
+            logger.debug("body length is too long, max length is {}", maxLen);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Check if the total body length exceeds the maximum limit, if the 
maximum limit is less than 0, it is not checked
+     * @param bodyList
+     * @param maxLen
+     * @return
+     */
+    public static boolean isBodyLengthValid(List<byte[]> bodyList, int maxLen) 
{
+        // Not valid if the maximum limit is less than or equal to 0
+        if (maxLen < 0) {
+            return true;
+        }
+        int size = 0;
+        for (byte[] body : bodyList) {
+            size += body.length;
+            // Reserve space for attribute
+            if (size > maxLen - ConfigConstants.RESERVED_ATTRIBUTE_LENGTH) {
+                logger.debug("body length is too long, max length is {}", 
maxLen);
+                return false;
+            }
+        }
+        return true;
+    }
+
     public static long covertZeroDt(long dt) {
         if (dt == 0) {
             return System.currentTimeMillis();

Reply via email to