This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8a54dc480c [INLONG-11863][SDK] Enhance Event Attribute Validation and
Decoding Logic in SDK (#11864)
8a54dc480c is described below
commit 8a54dc480ce568c85b5db7552a0b0710fdb7b683
Author: Goson Zhang <[email protected]>
AuthorDate: Thu May 15 10:38:24 2025 +0800
[INLONG-11863][SDK] Enhance Event Attribute Validation and Decoding Logic
in SDK (#11864)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/BaseMsgSenderFactory.java | 2 +-
.../inlong/sdk/dataproxy/common/EventInfo.java | 28 ++++++++++----------
.../inlong/sdk/dataproxy/metric/MetaSyncInfo.java | 5 ++++
.../sdk/dataproxy/metric/MetricDataHolder.java | 10 ++++++++
.../inlong/sdk/dataproxy/metric/TimeCostInfo.java | 5 ++++
.../sdk/dataproxy/network/tcp/TcpClientMgr.java | 8 +++---
.../dataproxy/network/tcp/codec/DecodeObject.java | 27 ++++++++++++++-----
.../dataproxy/network/tcp/codec/EncodeObject.java | 5 ++--
.../sdk/dataproxy/sender/tcp/TcpEventInfo.java | 7 +++--
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 24 ++++++++---------
.../inlong/sdk/dataproxy/ProxyUtilsTest.java | 30 ++++++++++++++++++++++
11 files changed, 108 insertions(+), 43 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index ce276b9cf6..42e1af0024 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -324,7 +324,7 @@ public class BaseMsgSenderFactory {
private int releaseAllGroupIdSenders(Map<String, BaseSender> senderMap) {
int totalSenderCnt = 0;
for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
- if (entry == null || entry.getValue() == null) {
+ if (entry == null || entry.getKey() == null || entry.getValue() ==
null) {
continue;
}
try {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
index 3d63c10039..4d6834678a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
@@ -73,10 +73,12 @@ public abstract class EventInfo<T> {
// attrs
if (attrs != null && !attrs.isEmpty()) {
for (Map.Entry<String, String> entry : attrs.entrySet()) {
- if (StringUtils.isBlank(entry.getKey())) {
+ if (entry == null
+ || StringUtils.isBlank(entry.getKey())
+ || entry.getValue() == null) {
continue;
}
- innSetAttr(entry.getKey().trim(), entry.getValue());
+ innSetAttr(entry.getKey().trim(), entry.getValue().trim());
}
}
if (auditId != null && auditId != -1L) {
@@ -129,20 +131,16 @@ public abstract class EventInfo<T> {
+ AttributeConstants.KEY_VALUE_SEPARATOR + " or "
+ AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
}
- String valValue = value;
- if (valValue != null) {
- valValue = valValue.trim();
- if (valValue.contains(AttributeConstants.SEPARATOR)
- ||
valValue.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
- if (exceptCnt.shouldPrint()) {
- logger.warn(String.format("Attribute value(%s) include
reserved word(%s or %s)",
- valValue, AttributeConstants.KEY_VALUE_SEPARATOR,
AttributeConstants.KEY_VALUE_SEPARATOR));
- }
- throw new ProxyEventException("Attribute value(" + valValue +
") include reserved word("
- + AttributeConstants.KEY_VALUE_SEPARATOR + " or "
- + AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
+ if (value.contains(AttributeConstants.SEPARATOR)
+ || value.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+ if (exceptCnt.shouldPrint()) {
+ logger.warn(String.format("Attribute value(%s) include
reserved word(%s or %s)",
+ value, AttributeConstants.KEY_VALUE_SEPARATOR,
AttributeConstants.KEY_VALUE_SEPARATOR));
}
+ throw new ProxyEventException("Attribute value(" + value + ")
include reserved word("
+ + AttributeConstants.KEY_VALUE_SEPARATOR + " or "
+ + AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
}
- this.attrs.put(key, valValue);
+ this.attrs.put(key, value);
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
index b574eff2ac..5d036d982d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
@@ -53,6 +53,11 @@ public class MetaSyncInfo {
long curCnt = 0;
strBuff.append("\"ms\":{\"errT\":{");
for (Map.Entry<Integer, LongAdder> entry : syncErrInfo.entrySet())
{
+ if (entry == null
+ || entry.getKey() == null
+ || entry.getValue() == null) {
+ continue;
+ }
if (curCnt++ > 0) {
strBuff.append(",");
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
index dc8cc11dad..1b9c8f0dea 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
@@ -421,6 +421,11 @@ public class MetricDataHolder implements Runnable {
metaSyncInfo.getAndResetValue(strBuff);
strBuff.append(",\"tr\":[");
for (Map.Entry<String, TrafficInfo> entry : trafficMap.entrySet())
{
+ if (entry == null
+ || entry.getKey() == null
+ || entry.getValue() == null) {
+ continue;
+ }
if (count++ > 0) {
strBuff.append(",");
}
@@ -429,6 +434,11 @@ public class MetricDataHolder implements Runnable {
strBuff.append("],\"errs\":{");
count = 0;
for (Map.Entry<Integer, LongAdder> entry : errCodeMap.entrySet()) {
+ if (entry == null
+ || entry.getKey() == null
+ || entry.getValue() == null) {
+ continue;
+ }
if (count++ > 0) {
strBuff.append(",");
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
index 5a5ad7deb5..5f14a01bb7 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
@@ -78,6 +78,11 @@ public class TimeCostInfo {
long bucketCnt = 0;
strBuff.append("\"").append(name).append("\":{\"bkts\":{");
for (Map.Entry<String, LongAdder> entry :
sendTimeBucketT.entrySet()) {
+ if (entry == null
+ || entry.getKey() == null
+ || entry.getValue() == null) {
+ continue;
+ }
if (bucketCnt++ > 0) {
strBuff.append(",");
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
index 0ad5e44e44..8e2f428df2 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
@@ -275,22 +275,22 @@ public class TcpClientMgr implements ClientMgr {
timerObj.newTimeout(new
TimeoutTask(encObject.getMessageId()),
tcpConfig.getRequestTimeoutMs(),
TimeUnit.MILLISECONDS));
if (!client.write(clientTerm, encObject, procResult)) {
- Timeout timeout = reqTimeouts.remove(encObject.getMessageId());
+ Timeout timeout = reqTimeouts.remove(newFuture.getMessageId());
if (timeout != null) {
timeout.cancel();
}
- rmvMsgStubInfo(encObject.getMessageId());
+ rmvMsgStubInfo(newFuture.getMessageId());
}
return procResult.isSuccess();
} else {
// process sync report
if (!client.write(clientTerm, encObject, procResult)) {
- rmvMsgStubInfo(encObject.getMessageId());
+ rmvMsgStubInfo(newFuture.getMessageId());
return false;
}
boolean retValue = newFuture.get(procResult,
tcpConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
- if (rmvMsgStubInfo(encObject.getMessageId())) {
+ if (rmvMsgStubInfo(newFuture.getMessageId())) {
if (procResult.getErrCode() ==
ErrorCode.SEND_WAIT_TIMEOUT.getErrCode()) {
client.setBusy(clientTerm);
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
index d9b24b0aae..4bd7be0202 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
@@ -23,7 +23,6 @@ import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
-import com.google.common.base.Splitter;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
@@ -36,10 +35,6 @@ import java.util.Map;
*/
public class DecodeObject {
- private static final Splitter.MapSplitter MAP_SPLITTER =
- Splitter.on(AttributeConstants.SEPARATOR).trimResults()
-
.withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
-
private final MsgType msgType;
private int messageId;
private String dpIp;
@@ -87,12 +82,30 @@ public class DecodeObject {
this.procResult = new ProcessResult(ErrorCode.OK);
return;
}
- retAttr = new HashMap<>(MAP_SPLITTER.split(attributes));
+ // decode attribute string
+ this.retAttr = new HashMap<>();
+ String[] keyValSet = attributes.split(AttributeConstants.SEPARATOR);
+ for (String keyVal : keyValSet) {
+ if (StringUtils.isBlank(keyVal)) {
+ continue;
+ }
+ String[] keyValSplit =
keyVal.split(AttributeConstants.KEY_VALUE_SEPARATOR);
+ if (keyValSplit.length == 1) {
+ if (StringUtils.isBlank(keyValSplit[0])) {
+ continue;
+ }
+ retAttr.put(keyValSplit[0].trim(), "");
+ } else {
+ if (StringUtils.isBlank(keyValSplit[0]) || keyValSplit[1] ==
null) {
+ continue;
+ }
+ retAttr.put(keyValSplit[0].trim(), keyValSplit[1].trim());
+ }
+ }
if (retAttr.containsKey(AttributeConstants.MESSAGE_ID)) {
this.messageId =
Integer.parseInt(retAttr.get(AttributeConstants.MESSAGE_ID));
}
dpIp = retAttr.get(AttributeConstants.MESSAGE_DP_IP);
-
String errCode =
retAttr.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
// errCode is empty or equals 0 -> success
if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
index 3b44f9a3ce..ecab7eba76 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.MsgType;
import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@@ -86,10 +87,10 @@ public class EncodeObject {
this.aesKey = aesKey;
if (tgtAttrs != null && !tgtAttrs.isEmpty()) {
for (Map.Entry<String, String> entry : tgtAttrs.entrySet()) {
- if (entry == null || entry.getKey() == null) {
+ if (entry == null || StringUtils.isBlank(entry.getKey()) ||
entry.getValue() == null) {
continue;
}
- this.attrMap.put(entry.getKey(), entry.getValue());
+ this.attrMap.put(entry.getKey().trim(),
entry.getValue().trim());
}
String preAttrStr = mapJoiner.join(this.attrMap);
this.attrData = preAttrStr.getBytes(StandardCharsets.UTF_8);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
index a51f41a105..4d8d6b6449 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
@@ -79,9 +79,12 @@ public class TcpEventInfo extends EventInfo<byte[]> {
public void setAttr(String key, String value) throws ProxyEventException {
if (StringUtils.isBlank(key)) {
- throw new ProxyEventException("Key is blank!");
+ throw new ProxyEventException("Parameter key is blank!");
}
- innSetAttr(key.trim(), value);
+ if (value == null) {
+ throw new ProxyEventException("Parameter value is null!");
+ }
+ innSetAttr(key.trim(), value.trim());
}
@Override
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index e6eeab12b0..7b307eaf12 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -165,25 +165,25 @@ public class ProxyUtils {
if (attrsMap == null || attrsMap.isEmpty()) {
return attrsMap;
}
+ String tmpKey;
String tmpValue;
Map<String, String> validAttrsMap = new HashMap<>();
for (Map.Entry<String, String> entry : attrsMap.entrySet()) {
- if (StringUtils.isBlank(entry.getKey())
- || entry.getKey().contains(AttributeConstants.SEPARATOR)
- ||
entry.getKey().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+ if (entry == null
+ || StringUtils.isBlank(entry.getKey())
+ || entry.getValue() == null) {
continue;
}
- tmpValue = entry.getKey().trim();
- if (ProxyUtils.SdkReservedWords.contains(tmpValue)) {
+ tmpKey = entry.getKey().trim();
+ tmpValue = entry.getValue().trim();
+ if (tmpKey.contains(AttributeConstants.SEPARATOR)
+ || tmpKey.contains(AttributeConstants.KEY_VALUE_SEPARATOR)
+ || ProxyUtils.SdkReservedWords.contains(tmpKey)
+ || tmpValue.contains(AttributeConstants.SEPARATOR)
+ ||
tmpValue.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
continue;
}
- if (entry.getValue() != null) {
- if (entry.getValue().contains(AttributeConstants.SEPARATOR)
- ||
entry.getValue().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
- continue;
- }
- }
- validAttrsMap.put(tmpValue, entry.getValue());
+ validAttrsMap.put(tmpKey, tmpValue);
}
return validAttrsMap;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
index 5755aac08d..83f191117d 100644
---
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java
@@ -17,11 +17,15 @@
package org.apache.inlong.sdk.dataproxy;
+import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+
public class ProxyUtilsTest {
@Test
@@ -30,4 +34,30 @@ public class ProxyUtilsTest {
Assert.assertNotNull(ip);
}
+ @Test
+ public void testGetValidAttrs() {
+ Map<String, String> attrsMap = new HashMap<>();
+ attrsMap.put("first", " ");
+ attrsMap.put("second", "");
+ attrsMap.put("third", " stst");
+ attrsMap.put("fourth", null);
+ attrsMap.put("fifth", " fifth ");
+ attrsMap.put("", "sixth");
+ attrsMap.put(null, "seventh");
+ attrsMap.put(null, "seventh");
+ attrsMap.put("eighth", "eig&hth");
+ attrsMap.put("ninth", "=ninth");
+ attrsMap.put(AttributeConstants.DATA_TIME, "tenth");
+ Map<String, String> tgtMap = ProxyUtils.getValidAttrs(attrsMap);
+ Assert.assertNotNull(tgtMap);
+ Assert.assertEquals(tgtMap.size(), 4);
+ Assert.assertNotNull(tgtMap.get("first"));
+ Assert.assertNotNull(tgtMap.get("second"));
+ Assert.assertNotNull(tgtMap.get("third"));
+ Assert.assertNull(tgtMap.get("fourth"));
+ Assert.assertNotNull(tgtMap.get("fifth"));
+ Assert.assertNull(tgtMap.get("eighth"));
+ Assert.assertNull(tgtMap.get("ninth"));
+ Assert.assertNull(tgtMap.get(AttributeConstants.DATA_TIME));
+ }
}