This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a54dc480c [INLONG-11863][SDK] Enhance Event Attribute Validation and 
Decoding Logic in SDK (#11864)
8a54dc480c is described below

commit 8a54dc480ce568c85b5db7552a0b0710fdb7b683
Author: Goson Zhang <[email protected]>
AuthorDate: Thu May 15 10:38:24 2025 +0800

    [INLONG-11863][SDK] Enhance Event Attribute Validation and Decoding Logic 
in SDK (#11864)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/sdk/dataproxy/BaseMsgSenderFactory.java |  2 +-
 .../inlong/sdk/dataproxy/common/EventInfo.java     | 28 ++++++++++----------
 .../inlong/sdk/dataproxy/metric/MetaSyncInfo.java  |  5 ++++
 .../sdk/dataproxy/metric/MetricDataHolder.java     | 10 ++++++++
 .../inlong/sdk/dataproxy/metric/TimeCostInfo.java  |  5 ++++
 .../sdk/dataproxy/network/tcp/TcpClientMgr.java    |  8 +++---
 .../dataproxy/network/tcp/codec/DecodeObject.java  | 27 ++++++++++++++-----
 .../dataproxy/network/tcp/codec/EncodeObject.java  |  5 ++--
 .../sdk/dataproxy/sender/tcp/TcpEventInfo.java     |  7 +++--
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     | 24 ++++++++---------
 .../inlong/sdk/dataproxy/ProxyUtilsTest.java       | 30 ++++++++++++++++++++++
 11 files changed, 108 insertions(+), 43 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index ce276b9cf6..42e1af0024 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -324,7 +324,7 @@ public class BaseMsgSenderFactory {
     private int releaseAllGroupIdSenders(Map<String, BaseSender> senderMap) {
         int totalSenderCnt = 0;
         for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
-            if (entry == null || entry.getValue() == null) {
+            if (entry == null || entry.getKey() == null || entry.getValue() == 
null) {
                 continue;
             }
             try {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
index 3d63c10039..4d6834678a 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
@@ -73,10 +73,12 @@ public abstract class EventInfo<T> {
         // attrs
         if (attrs != null && !attrs.isEmpty()) {
             for (Map.Entry<String, String> entry : attrs.entrySet()) {
-                if (StringUtils.isBlank(entry.getKey())) {
+                if (entry == null
+                        || StringUtils.isBlank(entry.getKey())
+                        || entry.getValue() == null) {
                     continue;
                 }
-                innSetAttr(entry.getKey().trim(), entry.getValue());
+                innSetAttr(entry.getKey().trim(), entry.getValue().trim());
             }
         }
         if (auditId != null && auditId != -1L) {
@@ -129,20 +131,16 @@ public abstract class EventInfo<T> {
                     + AttributeConstants.KEY_VALUE_SEPARATOR + " or "
                     + AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
         }
-        String valValue = value;
-        if (valValue != null) {
-            valValue = valValue.trim();
-            if (valValue.contains(AttributeConstants.SEPARATOR)
-                    || 
valValue.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
-                if (exceptCnt.shouldPrint()) {
-                    logger.warn(String.format("Attribute value(%s) include 
reserved word(%s or %s)",
-                            valValue, AttributeConstants.KEY_VALUE_SEPARATOR, 
AttributeConstants.KEY_VALUE_SEPARATOR));
-                }
-                throw new ProxyEventException("Attribute value(" + valValue + 
") include reserved word("
-                        + AttributeConstants.KEY_VALUE_SEPARATOR + " or "
-                        + AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
+        if (value.contains(AttributeConstants.SEPARATOR)
+                || value.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+            if (exceptCnt.shouldPrint()) {
+                logger.warn(String.format("Attribute value(%s) include 
reserved word(%s or %s)",
+                        value, AttributeConstants.KEY_VALUE_SEPARATOR, 
AttributeConstants.KEY_VALUE_SEPARATOR));
             }
+            throw new ProxyEventException("Attribute value(" + value + ") 
include reserved word("
+                    + AttributeConstants.KEY_VALUE_SEPARATOR + " or "
+                    + AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
         }
-        this.attrs.put(key, valValue);
+        this.attrs.put(key, value);
     }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
index b574eff2ac..5d036d982d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
@@ -53,6 +53,11 @@ public class MetaSyncInfo {
             long curCnt = 0;
             strBuff.append("\"ms\":{\"errT\":{");
             for (Map.Entry<Integer, LongAdder> entry : syncErrInfo.entrySet()) 
{
+                if (entry == null
+                        || entry.getKey() == null
+                        || entry.getValue() == null) {
+                    continue;
+                }
                 if (curCnt++ > 0) {
                     strBuff.append(",");
                 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
index dc8cc11dad..1b9c8f0dea 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
@@ -421,6 +421,11 @@ public class MetricDataHolder implements Runnable {
             metaSyncInfo.getAndResetValue(strBuff);
             strBuff.append(",\"tr\":[");
             for (Map.Entry<String, TrafficInfo> entry : trafficMap.entrySet()) 
{
+                if (entry == null
+                        || entry.getKey() == null
+                        || entry.getValue() == null) {
+                    continue;
+                }
                 if (count++ > 0) {
                     strBuff.append(",");
                 }
@@ -429,6 +434,11 @@ public class MetricDataHolder implements Runnable {
             strBuff.append("],\"errs\":{");
             count = 0;
             for (Map.Entry<Integer, LongAdder> entry : errCodeMap.entrySet()) {
+                if (entry == null
+                        || entry.getKey() == null
+                        || entry.getValue() == null) {
+                    continue;
+                }
                 if (count++ > 0) {
                     strBuff.append(",");
                 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
index 5a5ad7deb5..5f14a01bb7 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
@@ -78,6 +78,11 @@ public class TimeCostInfo {
             long bucketCnt = 0;
             strBuff.append("\"").append(name).append("\":{\"bkts\":{");
             for (Map.Entry<String, LongAdder> entry : 
sendTimeBucketT.entrySet()) {
+                if (entry == null
+                        || entry.getKey() == null
+                        || entry.getValue() == null) {
+                    continue;
+                }
                 if (bucketCnt++ > 0) {
                     strBuff.append(",");
                 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
index 0ad5e44e44..8e2f428df2 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
@@ -275,22 +275,22 @@ public class TcpClientMgr implements ClientMgr {
                     timerObj.newTimeout(new 
TimeoutTask(encObject.getMessageId()),
                             tcpConfig.getRequestTimeoutMs(), 
TimeUnit.MILLISECONDS));
             if (!client.write(clientTerm, encObject, procResult)) {
-                Timeout timeout = reqTimeouts.remove(encObject.getMessageId());
+                Timeout timeout = reqTimeouts.remove(newFuture.getMessageId());
                 if (timeout != null) {
                     timeout.cancel();
                 }
-                rmvMsgStubInfo(encObject.getMessageId());
+                rmvMsgStubInfo(newFuture.getMessageId());
             }
             return procResult.isSuccess();
         } else {
             // process sync report
             if (!client.write(clientTerm, encObject, procResult)) {
-                rmvMsgStubInfo(encObject.getMessageId());
+                rmvMsgStubInfo(newFuture.getMessageId());
                 return false;
             }
             boolean retValue = newFuture.get(procResult,
                     tcpConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
-            if (rmvMsgStubInfo(encObject.getMessageId())) {
+            if (rmvMsgStubInfo(newFuture.getMessageId())) {
                 if (procResult.getErrCode() == 
ErrorCode.SEND_WAIT_TIMEOUT.getErrCode()) {
                     client.setBusy(clientTerm);
                 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
index d9b24b0aae..4bd7be0202 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
@@ -23,7 +23,6 @@ import org.apache.inlong.common.msg.MsgType;
 import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 
-import com.google.common.base.Splitter;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.HashMap;
@@ -36,10 +35,6 @@ import java.util.Map;
  */
 public class DecodeObject {
 
-    private static final Splitter.MapSplitter MAP_SPLITTER =
-            Splitter.on(AttributeConstants.SEPARATOR).trimResults()
-                    
.withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
-
     private final MsgType msgType;
     private int messageId;
     private String dpIp;
@@ -87,12 +82,30 @@ public class DecodeObject {
             this.procResult = new ProcessResult(ErrorCode.OK);
             return;
         }
-        retAttr = new HashMap<>(MAP_SPLITTER.split(attributes));
+        // decode attribute string
+        this.retAttr = new HashMap<>();
+        String[] keyValSet = attributes.split(AttributeConstants.SEPARATOR);
+        for (String keyVal : keyValSet) {
+            if (StringUtils.isBlank(keyVal)) {
+                continue;
+            }
+            String[] keyValSplit = 
keyVal.split(AttributeConstants.KEY_VALUE_SEPARATOR);
+            if (keyValSplit.length == 1) {
+                if (StringUtils.isBlank(keyValSplit[0])) {
+                    continue;
+                }
+                retAttr.put(keyValSplit[0].trim(), "");
+            } else {
+                if (StringUtils.isBlank(keyValSplit[0]) || keyValSplit[1] == 
null) {
+                    continue;
+                }
+                retAttr.put(keyValSplit[0].trim(), keyValSplit[1].trim());
+            }
+        }
         if (retAttr.containsKey(AttributeConstants.MESSAGE_ID)) {
             this.messageId = 
Integer.parseInt(retAttr.get(AttributeConstants.MESSAGE_ID));
         }
         dpIp = retAttr.get(AttributeConstants.MESSAGE_DP_IP);
-
         String errCode = 
retAttr.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
         // errCode is empty or equals 0 -> success
         if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
index 3b44f9a3ce..ecab7eba76 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.MsgType;
 
 import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
 
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
@@ -86,10 +87,10 @@ public class EncodeObject {
         this.aesKey = aesKey;
         if (tgtAttrs != null && !tgtAttrs.isEmpty()) {
             for (Map.Entry<String, String> entry : tgtAttrs.entrySet()) {
-                if (entry == null || entry.getKey() == null) {
+                if (entry == null || StringUtils.isBlank(entry.getKey()) || 
entry.getValue() == null) {
                     continue;
                 }
-                this.attrMap.put(entry.getKey(), entry.getValue());
+                this.attrMap.put(entry.getKey().trim(), 
entry.getValue().trim());
             }
             String preAttrStr = mapJoiner.join(this.attrMap);
             this.attrData = preAttrStr.getBytes(StandardCharsets.UTF_8);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
index a51f41a105..4d8d6b6449 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
@@ -79,9 +79,12 @@ public class TcpEventInfo extends EventInfo<byte[]> {
 
     public void setAttr(String key, String value) throws ProxyEventException {
         if (StringUtils.isBlank(key)) {
-            throw new ProxyEventException("Key is blank!");
+            throw new ProxyEventException("Parameter key is blank!");
         }
-        innSetAttr(key.trim(), value);
+        if (value == null) {
+            throw new ProxyEventException("Parameter value is null!");
+        }
+        innSetAttr(key.trim(), value.trim());
     }
 
     @Override
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index e6eeab12b0..7b307eaf12 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -165,25 +165,25 @@ public class ProxyUtils {
         if (attrsMap == null || attrsMap.isEmpty()) {
             return attrsMap;
         }
+        String tmpKey;
         String tmpValue;
         Map<String, String> validAttrsMap = new HashMap<>();
         for (Map.Entry<String, String> entry : attrsMap.entrySet()) {
-            if (StringUtils.isBlank(entry.getKey())
-                    || entry.getKey().contains(AttributeConstants.SEPARATOR)
-                    || 
entry.getKey().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+            if (entry == null
+                    || StringUtils.isBlank(entry.getKey())
+                    || entry.getValue() == null) {
                 continue;
             }
-            tmpValue = entry.getKey().trim();
-            if (ProxyUtils.SdkReservedWords.contains(tmpValue)) {
+            tmpKey = entry.getKey().trim();
+            tmpValue = entry.getValue().trim();
+            if (tmpKey.contains(AttributeConstants.SEPARATOR)
+                    || tmpKey.contains(AttributeConstants.KEY_VALUE_SEPARATOR)
+                    || ProxyUtils.SdkReservedWords.contains(tmpKey)
+                    || tmpValue.contains(AttributeConstants.SEPARATOR)
+                    || 
tmpValue.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
                 continue;
             }
-            if (entry.getValue() != null) {
-                if (entry.getValue().contains(AttributeConstants.SEPARATOR)
-                        || 
entry.getValue().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
-                    continue;
-                }
-            }
-            validAttrsMap.put(tmpValue, entry.getValue());
+            validAttrsMap.put(tmpKey, tmpValue);
         }
         return validAttrsMap;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
index 5755aac08d..83f191117d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
@@ -17,11 +17,15 @@
 
 package org.apache.inlong.sdk.dataproxy;
 
+import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class ProxyUtilsTest {
 
     @Test
@@ -30,4 +34,30 @@ public class ProxyUtilsTest {
         Assert.assertNotNull(ip);
     }
 
+    @Test
+    public void testGetValidAttrs() {
+        Map<String, String> attrsMap = new HashMap<>();
+        attrsMap.put("first", "       ");
+        attrsMap.put("second", "");
+        attrsMap.put("third", " stst");
+        attrsMap.put("fourth", null);
+        attrsMap.put("fifth", " fifth ");
+        attrsMap.put("", "sixth");
+        attrsMap.put(null, "seventh");
+        attrsMap.put(null, "seventh");
+        attrsMap.put("eighth", "eig&hth");
+        attrsMap.put("ninth", "=ninth");
+        attrsMap.put(AttributeConstants.DATA_TIME, "tenth");
+        Map<String, String> tgtMap = ProxyUtils.getValidAttrs(attrsMap);
+        Assert.assertNotNull(tgtMap);
+        Assert.assertEquals(tgtMap.size(), 4);
+        Assert.assertNotNull(tgtMap.get("first"));
+        Assert.assertNotNull(tgtMap.get("second"));
+        Assert.assertNotNull(tgtMap.get("third"));
+        Assert.assertNull(tgtMap.get("fourth"));
+        Assert.assertNotNull(tgtMap.get("fifth"));
+        Assert.assertNull(tgtMap.get("eighth"));
+        Assert.assertNull(tgtMap.get("ninth"));
+        Assert.assertNull(tgtMap.get(AttributeConstants.DATA_TIME));
+    }
 }

Reply via email to