This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 21cbf7234c [INLONG-11689][SDK] Optimize user reporting information 
management (#11690)
21cbf7234c is described below

commit 21cbf7234c472500506305cdf19926d969d9150b
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Jan 20 14:10:29 2025 +0800

    [INLONG-11689][SDK] Optimize user reporting information management (#11690)
    
    * [INLONG-11689][SDK] Optimize user reporting information management
    
    * [INLONG-11689][SDK] Optimize user reporting information management
    
    ---------
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/sdk/dataproxy/common/EventInfo.java     | 148 +++++++++++++
 .../dataproxy/exception/ProxyEventException.java   |  42 ++++
 .../sdk/dataproxy/sender/http/HttpEventInfo.java   |  88 ++++++++
 .../sdk/dataproxy/sender/tcp/TcpEventInfo.java     | 113 ++++++++++
 .../inlong/sdk/dataproxy/utils/LogCounter.java     |   2 +-
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |  36 +++-
 .../apache/inlong/sdk/dataproxy/EventInfoTest.java | 229 +++++++++++++++++++++
 7 files changed, 651 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
new file mode 100644
index 0000000000..3d63c10039
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.common;
+
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Report Event information class
+ *
+ * Used to encapsulate the data information reported by the caller, including
+ *  grouId, streamId, dt, attributes, and body, auditVerison, msgUUID, etc.
+ * This class performs field value validity checks on the reported data and
+ *  throws ProxyEventException for data that does not meet the reporting 
requirements, including
+ *  mandatory fields that are empty, attribute sets that contain reserved 
words, attribute delimiters,
+ *  empty message bodies, etc.
+ * Since the TCP and HTTP reports supported by the SDK differ only in the
+ *  message body type, this class uses a Generics definition
+ */
+public abstract class EventInfo<T> {
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(EventInfo.class);
+    protected static final LogCounter exceptCnt = new LogCounter(10, 100000, 
60 * 1000L);
+
+    private final String groupId;
+    private final String streamId;
+    private final long dtMs;
+    private final Map<String, String> attrs = new HashMap<>();
+    protected int msgCnt = 0;
+    protected int bodySize = 0;
+    protected final List<T> bodyList = new ArrayList<>();
+
+    protected EventInfo(String groupId, String streamId, long dtMs, Long 
auditId, String msgUUID,
+            Map<String, String> attrs, boolean isSingle, List<T> bodyList) 
throws ProxyEventException {
+        // groupId
+        if (StringUtils.isBlank(groupId)) {
+            throw new ProxyEventException("groupId is blank!");
+        }
+        this.groupId = groupId.trim();
+        // streamId
+        if (StringUtils.isBlank(streamId)) {
+            throw new ProxyEventException("streamId is blank!");
+        }
+        this.streamId = streamId.trim();
+        // dtMs
+        this.dtMs = dtMs <= 0L ? System.currentTimeMillis() : dtMs;
+        // attrs
+        if (attrs != null && !attrs.isEmpty()) {
+            for (Map.Entry<String, String> entry : attrs.entrySet()) {
+                if (StringUtils.isBlank(entry.getKey())) {
+                    continue;
+                }
+                innSetAttr(entry.getKey().trim(), entry.getValue());
+            }
+        }
+        if (auditId != null && auditId != -1L) {
+            this.attrs.put(AttributeConstants.AUDIT_VERSION, 
String.valueOf(auditId));
+        }
+        if (StringUtils.isNotBlank(msgUUID)) {
+            this.attrs.put(AttributeConstants.MSG_UUID, msgUUID.trim());
+        }
+        // body
+        setBodyList(isSingle, bodyList);
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public long getDtMs() {
+        return dtMs;
+    }
+
+    public Map<String, String> getAttrs() {
+        return attrs;
+    }
+
+    public int getMsgCnt() {
+        return msgCnt;
+    }
+
+    public int getBodySize() {
+        return bodySize;
+    }
+
+    protected abstract void setBodyList(boolean isSingle, List<T> bodyList) 
throws ProxyEventException;
+
+    protected void innSetAttr(String key, String value) throws 
ProxyEventException {
+        if (ProxyUtils.SdkReservedWords.contains(key)) {
+            throw new ProxyEventException("Attribute key(" + key + ") is 
reserved word!");
+        }
+        if (key.contains(AttributeConstants.SEPARATOR)
+                || key.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+            if (exceptCnt.shouldPrint()) {
+                logger.warn(String.format("Attribute key(%s) include reserved 
word(%s or %s)",
+                        key, AttributeConstants.KEY_VALUE_SEPARATOR, 
AttributeConstants.KEY_VALUE_SEPARATOR));
+            }
+            throw new ProxyEventException("Attribute key(" + key + ") include 
reserved word("
+                    + AttributeConstants.KEY_VALUE_SEPARATOR + " or "
+                    + AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
+        }
+        String valValue = value;
+        if (valValue != null) {
+            valValue = valValue.trim();
+            if (valValue.contains(AttributeConstants.SEPARATOR)
+                    || 
valValue.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+                if (exceptCnt.shouldPrint()) {
+                    logger.warn(String.format("Attribute value(%s) include 
reserved word(%s or %s)",
+                            valValue, AttributeConstants.KEY_VALUE_SEPARATOR, 
AttributeConstants.KEY_VALUE_SEPARATOR));
+                }
+                throw new ProxyEventException("Attribute value(" + valValue + 
") include reserved word("
+                        + AttributeConstants.KEY_VALUE_SEPARATOR + " or "
+                        + AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
+            }
+        }
+        this.attrs.put(key, valValue);
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxyEventException.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxyEventException.java
new file mode 100644
index 0000000000..7392631d61
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxyEventException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.exception;
+
+/**
+ * Proxy Event Exception
+ *
+ * This exception is used specifically when an unacceptable situation when 
constructing an event.
+ * If this exception is thrown, the caller needs to solve the specified 
problem or discard the illegal message.
+ */
+public class ProxyEventException extends Exception {
+
+    public ProxyEventException() {
+    }
+
+    public ProxyEventException(String message) {
+        super(message);
+    }
+
+    public ProxyEventException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ProxyEventException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpEventInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpEventInfo.java
new file mode 100644
index 0000000000..f8ae172bba
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpEventInfo.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.http;
+
+import org.apache.inlong.sdk.dataproxy.common.EventInfo;
+import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * HTTP Event Information class
+ *
+ * Used to encapsulate the data information reported by HTTP
+ */
+public class HttpEventInfo extends EventInfo<String> {
+
+    public HttpEventInfo(String groupId, String streamId,
+            long dtMs, String body) throws ProxyEventException {
+        super(groupId, streamId, dtMs, null, null, null, true, 
Collections.singletonList(body));
+    }
+
+    public HttpEventInfo(String groupId, String streamId,
+            long dtMs, long auditId, String body) throws ProxyEventException {
+        super(groupId, streamId, dtMs, auditId, null, null, true, 
Collections.singletonList(body));
+    }
+
+    public HttpEventInfo(String groupId, String streamId,
+            long dtMs, List<String> bodyList) throws ProxyEventException {
+        super(groupId, streamId, dtMs, null, null, null, false, bodyList);
+    }
+
+    public HttpEventInfo(String groupId, String streamId,
+            long dtMs, long auditId, List<String> bodyList) throws 
ProxyEventException {
+        super(groupId, streamId, dtMs, auditId, null, null, false, bodyList);
+    }
+
+    public List<String> getBodyList() {
+        return bodyList;
+    }
+
+    @Override
+    protected void setBodyList(boolean isSingle, List<String> bodyList) throws 
ProxyEventException {
+        String tmpValue;
+        if (isSingle) {
+            if (StringUtils.isBlank(bodyList.get(0))) {
+                throw new ProxyEventException("body is null or empty!");
+            }
+            tmpValue = bodyList.get(0).trim();
+            this.bodyList.add(tmpValue);
+            this.bodySize = tmpValue.length();
+            this.msgCnt = 1;
+        } else {
+            if (bodyList == null || bodyList.isEmpty()) {
+                throw new ProxyEventException("bodyList is null or empty!");
+            }
+            for (String body : bodyList) {
+                if (StringUtils.isBlank(body)) {
+                    continue;
+                }
+                tmpValue = body.trim();
+                this.bodyList.add(tmpValue.trim());
+                this.bodySize += tmpValue.length();
+                this.msgCnt++;
+            }
+            if (this.bodyList.isEmpty()) {
+                throw new ProxyEventException("bodyList no valid content!");
+            }
+        }
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
new file mode 100644
index 0000000000..dfa255aab6
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.tcp;
+
+import org.apache.inlong.sdk.dataproxy.common.EventInfo;
+import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HTTP Event Information class
+ *
+ * Used to encapsulate the data information reported by TCP
+ */
+public class TcpEventInfo extends EventInfo<byte[]> {
+
+    public TcpEventInfo(String groupId, String streamId, long dtMs,
+            Map<String, String> attrs, byte[] body) throws ProxyEventException 
{
+        super(groupId, streamId, dtMs, null, null, attrs, true, 
Collections.singletonList(body));
+    }
+
+    public TcpEventInfo(String groupId, String streamId, long dtMs, long 
auditId,
+            Map<String, String> attrs, byte[] body) throws ProxyEventException 
{
+        super(groupId, streamId, dtMs, auditId, null, attrs, true, 
Collections.singletonList(body));
+    }
+
+    public TcpEventInfo(String groupId, String streamId, long dtMs, String 
msgUUID,
+            Map<String, String> attrs, byte[] body) throws ProxyEventException 
{
+        super(groupId, streamId, dtMs, null, msgUUID, attrs, true, 
Collections.singletonList(body));
+    }
+
+    public TcpEventInfo(String groupId, String streamId, long dtMs, long 
auditId, String msgUUID,
+            Map<String, String> attrs, byte[] body) throws ProxyEventException 
{
+        super(groupId, streamId, dtMs, auditId, msgUUID, attrs, true, 
Collections.singletonList(body));
+    }
+
+    public TcpEventInfo(String groupId, String streamId,
+            long dtMs, Map<String, String> attrs, List<byte[]> bodyList) 
throws ProxyEventException {
+        super(groupId, streamId, dtMs, null, null, attrs, false, bodyList);
+    }
+
+    public TcpEventInfo(String groupId, String streamId, long dtMs,
+            long auditId, Map<String, String> attrs, List<byte[]> bodyList) 
throws ProxyEventException {
+        super(groupId, streamId, dtMs, auditId, null, attrs, false, bodyList);
+    }
+
+    public TcpEventInfo(String groupId, String streamId, long dtMs,
+            String msgUUID, Map<String, String> attrs, List<byte[]> bodyList) 
throws ProxyEventException {
+        super(groupId, streamId, dtMs, null, msgUUID, attrs, false, bodyList);
+    }
+
+    public TcpEventInfo(String groupId, String streamId, long dtMs,
+            long auditId, String msgUUID, Map<String, String> attrs, 
List<byte[]> bodyList) throws ProxyEventException {
+        super(groupId, streamId, dtMs, auditId, msgUUID, attrs, false, 
bodyList);
+    }
+
+    public List<byte[]> getBodyList() {
+        return bodyList;
+    }
+
+    public void setAttr(String key, String value) throws ProxyEventException {
+        if (StringUtils.isBlank(key)) {
+            throw new ProxyEventException("Key is blank!");
+        }
+        innSetAttr(key.trim(), value);
+    }
+
+    @Override
+    protected void setBodyList(boolean isSingle, List<byte[]> bodyList) throws 
ProxyEventException {
+        if (isSingle) {
+            if (bodyList.get(0) == null || bodyList.get(0).length == 0) {
+                throw new ProxyEventException("body is null or empty!");
+            }
+            this.bodyList.add(bodyList.get(0));
+            this.bodySize = bodyList.get(0).length;
+            this.msgCnt = 1;
+        } else {
+            if (bodyList == null || bodyList.isEmpty()) {
+                throw new ProxyEventException("bodyList is null or empty!");
+            }
+            for (byte[] body : bodyList) {
+                if (body == null || body.length == 0) {
+                    continue;
+                }
+                this.bodyList.add(body);
+                this.bodySize += body.length;
+                this.msgCnt++;
+            }
+            if (this.bodyList.isEmpty()) {
+                throw new ProxyEventException("bodyList no valid content!");
+            }
+        }
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
index edb14c62b2..3fac799a3c 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
@@ -27,7 +27,7 @@ public class LogCounter {
     private long control = 100000L;
     private long reset = 60 * 1000L;
 
-    private AtomicLong lastLogTime = new 
AtomicLong(System.currentTimeMillis());
+    private final AtomicLong lastLogTime = new 
AtomicLong(System.currentTimeMillis());
 
     public LogCounter(long start, long control, long reset) {
         this.start = start;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 0367e6e06a..61dbb263df 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -38,11 +38,20 @@ import java.util.Set;
 
 public class ProxyUtils {
 
+    public static final String KEY_FILE_STATUS_CHECK = "_file_status_check";
+    public static final String KEY_SECRET_ID = "_secretId";
+    public static final String KEY_SIGNATURE = "_signature";
+    public static final String KEY_TIME_STAMP = "_timeStamp";
+    public static final String KEY_NONCE = "_nonce";
+    public static final String KEY_USERNAME = "_userName";
+    public static final String KEY_CLIENT_IP = "_clientIP";
+    public static final String KEY_ENCY_VERSION = "_encyVersion";
+    public static final String KEY_ENCY_AES_KEY = "_encyAesKey";
     private static final Logger logger = 
LoggerFactory.getLogger(ProxyUtils.class);
     private static final LogCounter exceptCounter = new LogCounter(10, 200000, 
60 * 1000L);
 
     private static final int TIME_LENGTH = 13;
-    private static final Set<String> invalidAttr = new HashSet<>();
+    public static final Set<String> SdkReservedWords = new HashSet<>();
     public static final Set<MsgType> SdkAllowedMsgType = new HashSet<>();
     private static String localHost;
     private static String sdkVersion;
@@ -50,10 +59,25 @@ public class ProxyUtils {
     static {
         localHost = getLocalIp();
         getJarVersion();
-        Collections.addAll(invalidAttr, "groupId", "streamId", "dt", 
"msgUUID", "cp",
-                "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", 
"_file_status_check", "_secretId",
-                "_signature", "_timeStamp", "_nonce", "_userName", 
"_clientIP", "_encyVersion", "_encyAesKey",
-                "proxySend", "errMsg", "errCode", 
AttributeConstants.MSG_RPT_TIME);
+        Collections.addAll(SdkReservedWords,
+                AttributeConstants.GROUP_ID, AttributeConstants.STREAM_ID,
+                AttributeConstants.DATA_TIME, AttributeConstants.MSG_UUID,
+                AttributeConstants.COMPRESS_TYPE, 
AttributeConstants.MESSAGE_COUNT,
+                AttributeConstants.MESSAGE_TYPE, AttributeConstants.METHOD,
+                AttributeConstants.SEQUENCE_ID, AttributeConstants.TIME_STAMP,
+                AttributeConstants.NODE_IP, AttributeConstants.MESSAGE_ID,
+                AttributeConstants.MESSAGE_IS_ACK, 
AttributeConstants.MESSAGE_PROXY_SEND,
+                AttributeConstants.MESSAGE_PROCESS_ERRCODE, 
AttributeConstants.MESSAGE_PROCESS_ERRMSG,
+                AttributeConstants.MSG_RPT_TIME, 
AttributeConstants.AUDIT_VERSION,
+                AttributeConstants.PROXY_SDK_VERSION, KEY_FILE_STATUS_CHECK,
+                KEY_SECRET_ID, KEY_SIGNATURE, KEY_TIME_STAMP, KEY_NONCE, 
KEY_USERNAME,
+                KEY_CLIENT_IP, KEY_ENCY_VERSION, KEY_ENCY_AES_KEY);
+        /*
+         * Collections.addAll(SdkReservedWords, "groupId", "streamId", "dt", 
"msgUUID", "cp", "cnt", "mt", "m", "sid",
+         * "t", "NodeIP", "messageId", "isAck", "proxySend", "errCode", 
"errMsg", "rtms", "sdkVersion", "auditVersion",
+         * "_file_status_check", "_secretId", "_signature", "_timeStamp", 
"_nonce", "_userName", "_clientIP",
+         * "_encyVersion", "_encyAesKey");
+         */
 
         Collections.addAll(SdkAllowedMsgType,
                 MsgType.MSG_ACK_SERVICE, MsgType.MSG_MULTI_BODY, 
MsgType.MSG_BIN_MULTI_BODY);
@@ -106,7 +130,7 @@ public class ProxyUtils {
             return false;
         }
         for (String key : attrsMap.keySet()) {
-            if (invalidAttr.contains(key)) {
+            if (SdkReservedWords.contains(key)) {
                 logger.error("the attributes is invalid ,please check ! {}", 
key);
                 return false;
             }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EventInfoTest.java
 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EventInfoTest.java
new file mode 100644
index 0000000000..c2af1eacb4
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EventInfoTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EventInfoTest {
+
+    @Test
+    public void testTcpEventInfo() throws Exception {
+        // case a1 normal data setting
+        String groupId = "groupId";
+        String streamId = "streamId";
+        long dtMs = System.currentTimeMillis();
+        Map<String, String> attrsA1 = null;
+        byte[] body = "test".getBytes(StandardCharsets.UTF_8);
+        TcpEventInfo eventInfoA1 =
+                new TcpEventInfo(groupId, streamId, dtMs, attrsA1, body);
+        Assert.assertEquals(groupId, eventInfoA1.getGroupId());
+        Assert.assertEquals(streamId, eventInfoA1.getStreamId());
+        Assert.assertEquals(dtMs, eventInfoA1.getDtMs());
+        Assert.assertNotNull(eventInfoA1.getAttrs());
+        Assert.assertEquals(0, eventInfoA1.getAttrs().size());
+        Assert.assertEquals(1, eventInfoA1.getMsgCnt());
+        Assert.assertEquals(body.length, eventInfoA1.getBodySize());
+        Assert.assertArrayEquals(body, eventInfoA1.getBodyList().get(0));
+        eventInfoA1.setAttr("a1key", "mmm");
+        Assert.assertEquals(1, eventInfoA1.getAttrs().size());
+        Assert.assertEquals(1, eventInfoA1.getMsgCnt());
+        // case a2 normal data setting
+        Map<String, String> attrsA2 = new HashMap<>();
+        List<byte[]> bodyListA2 = new ArrayList<>();
+        bodyListA2.add("test_msg_1".getBytes(StandardCharsets.UTF_8));
+        bodyListA2.add("test_msg_2".getBytes(StandardCharsets.UTF_8));
+        TcpEventInfo eventInfoA2 =
+                new TcpEventInfo(groupId, streamId, dtMs, attrsA2, bodyListA2);
+        Assert.assertEquals(groupId, eventInfoA2.getGroupId());
+        Assert.assertEquals(streamId, eventInfoA2.getStreamId());
+        Assert.assertEquals(dtMs, eventInfoA2.getDtMs());
+        Assert.assertEquals(attrsA2, eventInfoA2.getAttrs());
+        Assert.assertEquals(2, eventInfoA2.getMsgCnt());
+        int totalSize = 0;
+        List<byte[]> tgtListA2 = eventInfoA2.getBodyList();
+        for (byte[] b : bodyListA2) {
+            Assert.assertTrue(tgtListA2.contains(b));
+            totalSize += b.length;
+        }
+        Assert.assertEquals(totalSize, eventInfoA2.getBodySize());
+        eventInfoA2.setAttr("a2key", "ccc");
+        Assert.assertEquals(1, eventInfoA2.getAttrs().size());
+        // case A3 abnormal data setting
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(null, streamId, dtMs, attrsA1, body));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo("   ", streamId, dtMs, attrsA1, body));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, null, dtMs, attrsA1, body));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, "  ", dtMs, attrsA1, body));
+        byte[] bodyA301 = null;
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, 
bodyA301));
+        byte[] bodyA302 = "".getBytes(StandardCharsets.UTF_8);
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, 
bodyA302));
+        byte[] bodyA303 = "    ".getBytes(StandardCharsets.UTF_8);
+        TcpEventInfo eventInfoA303 =
+                new TcpEventInfo(groupId, streamId, dtMs, attrsA1, bodyA303);
+        Assert.assertEquals(1, eventInfoA303.getMsgCnt());
+        Assert.assertEquals(bodyA303.length, eventInfoA303.getBodySize());
+        List<byte[]> msgListA311 = null;
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, 
msgListA311));
+        List<byte[]> msgListA312 = new ArrayList<>();
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, 
msgListA312));
+        List<byte[]> msgListA313 = new ArrayList<>();
+        msgListA313.add(null);
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, 
msgListA313));
+        List<byte[]> msgListA314 = new ArrayList<>();
+        msgListA314.add("".getBytes(StandardCharsets.UTF_8));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, 
msgListA314));
+        List<byte[]> msgListA315 = new ArrayList<>();
+        msgListA315.add("".getBytes(StandardCharsets.UTF_8));
+        msgListA315.add("test".getBytes(StandardCharsets.UTF_8));
+        TcpEventInfo eventInfoA315 =
+                new TcpEventInfo(groupId, streamId, dtMs, attrsA1, 
msgListA315);
+        Assert.assertEquals(1, eventInfoA315.getMsgCnt());
+        Assert.assertEquals("test".getBytes(StandardCharsets.UTF_8).length, 
eventInfoA315.getBodySize());
+        // case A4 normal attributes setting
+        Map<String, String> attrsA41 = new HashMap<>();
+        attrsA41.put("aaa&mmm", "value");
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA41, 
body));
+        Map<String, String> attrsA42 = new HashMap<>();
+        attrsA42.put("aaa=mmm", "value");
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA42, 
body));
+        Map<String, String> attrsA43 = new HashMap<>();
+        attrsA43.put("groupId", "value");
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA43, 
body));
+        Map<String, String> attrsA44 = new HashMap<>();
+        attrsA44.put("testA44", "va&lue");
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA44, 
body));
+        Map<String, String> attrsA45 = new HashMap<>();
+        attrsA45.put("testA45", "va=lue");
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA45, 
body));
+        TcpEventInfo eventInfoA46 =
+                new TcpEventInfo(groupId, streamId, dtMs, attrsA1, body);
+        Assert.assertThrows(ProxyEventException.class,
+                () -> eventInfoA46.setAttr("aaa&mmm", "value"));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> eventInfoA46.setAttr("streamId", "value"));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> eventInfoA46.setAttr("kkk=mmm", "value"));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> eventInfoA46.setAttr("aaaa", "va=lue"));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> eventInfoA46.setAttr("aaaa", "va&lue"));
+        // case 5, set uuid, auditVersion
+        String uuid = "uuid";
+        long auditVer = 32L;
+        TcpEventInfo eventInfoA51 =
+                new TcpEventInfo(groupId, streamId, dtMs, auditVer, attrsA1, 
body);
+        Assert.assertNotNull(eventInfoA51.getAttrs());
+        Assert.assertEquals(1, eventInfoA51.getAttrs().size());
+        TcpEventInfo eventInfoA52 =
+                new TcpEventInfo(groupId, streamId, dtMs, uuid, attrsA1, body);
+        Assert.assertNotNull(eventInfoA52.getAttrs());
+        Assert.assertEquals(1, eventInfoA52.getAttrs().size());
+        TcpEventInfo eventInfoA53 =
+                new TcpEventInfo(groupId, streamId, dtMs, auditVer, uuid, 
attrsA1, body);
+        Assert.assertNotNull(eventInfoA53.getAttrs());
+        Assert.assertEquals(2, eventInfoA53.getAttrs().size());
+    }
+
+    @Test
+    public void testHttpEventInfo() throws Exception {
+        // case a1 normal data setting
+        String groupId = "groupId";
+        String streamId = "streamId";
+        long dtMs = System.currentTimeMillis();
+        String body = "test";
+        HttpEventInfo eventInfoA1 =
+                new HttpEventInfo(groupId, streamId, dtMs, body);
+        Assert.assertEquals(groupId, eventInfoA1.getGroupId());
+        Assert.assertEquals(streamId, eventInfoA1.getStreamId());
+        Assert.assertEquals(dtMs, eventInfoA1.getDtMs());
+        Assert.assertNotNull(eventInfoA1.getAttrs());
+        Assert.assertTrue(eventInfoA1.getAttrs().isEmpty());
+        Assert.assertEquals(1, eventInfoA1.getMsgCnt());
+        Assert.assertEquals(body.length(), eventInfoA1.getBodySize());
+        Assert.assertEquals(body, eventInfoA1.getBodyList().get(0));
+        // case A2 normal setting
+        List<String> bodyListA2 = new ArrayList<>();
+        bodyListA2.add("test_body_1");
+        bodyListA2.add("test_body_2");
+        HttpEventInfo eventInfoA2 =
+                new HttpEventInfo(groupId, streamId, dtMs, bodyListA2);
+        Assert.assertEquals(groupId, eventInfoA2.getGroupId());
+        Assert.assertEquals(streamId, eventInfoA2.getStreamId());
+        Assert.assertEquals(dtMs, eventInfoA2.getDtMs());
+        Assert.assertNotNull(eventInfoA2.getAttrs());
+        Assert.assertTrue(eventInfoA2.getAttrs().isEmpty());
+        Assert.assertEquals(2, eventInfoA2.getMsgCnt());
+        int totalSize = 0;
+        List<String> tgtA2 = eventInfoA2.getBodyList();
+        for (String item : tgtA2) {
+            Assert.assertTrue(bodyListA2.contains(item));
+            totalSize += item.length();
+        }
+        Assert.assertEquals(totalSize, eventInfoA2.getBodySize());
+        // case A3 auditVer
+        long auditVer = 1000L;
+        HttpEventInfo eventInfoA3 =
+                new HttpEventInfo(groupId, streamId, dtMs, auditVer, body);
+        Assert.assertNotNull(eventInfoA3.getAttrs());
+        Assert.assertEquals(1, eventInfoA3.getAttrs().size());
+        // case A4 abnormal setting
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new HttpEventInfo(null, streamId, dtMs, body));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new HttpEventInfo("   ", streamId, dtMs, body));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new HttpEventInfo(groupId, null, dtMs, body));
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new HttpEventInfo(groupId, "  ", dtMs, body));
+        String bodyA401 = null;
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new HttpEventInfo(groupId, streamId, dtMs, bodyA401));
+        String bodyA402 = "";
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new HttpEventInfo(groupId, streamId, dtMs, bodyA402));
+        String bodyA403 = "      ";
+        Assert.assertThrows(ProxyEventException.class,
+                () -> new HttpEventInfo(groupId, streamId, dtMs, bodyA403));
+    }
+}


Reply via email to