This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 21cbf7234c [INLONG-11689][SDK] Optimize user reporting information
management (#11690)
21cbf7234c is described below
commit 21cbf7234c472500506305cdf19926d969d9150b
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Jan 20 14:10:29 2025 +0800
[INLONG-11689][SDK] Optimize user reporting information management (#11690)
* [INLONG-11689][SDK] Optimize user reporting information management
* [INLONG-11689][SDK] Optimize user reporting information management
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/common/EventInfo.java | 148 +++++++++++++
.../dataproxy/exception/ProxyEventException.java | 42 ++++
.../sdk/dataproxy/sender/http/HttpEventInfo.java | 88 ++++++++
.../sdk/dataproxy/sender/tcp/TcpEventInfo.java | 113 ++++++++++
.../inlong/sdk/dataproxy/utils/LogCounter.java | 2 +-
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 36 +++-
.../apache/inlong/sdk/dataproxy/EventInfoTest.java | 229 +++++++++++++++++++++
7 files changed, 651 insertions(+), 7 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
new file mode 100644
index 0000000000..3d63c10039
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.common;
+
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Report Event information class
+ *
+ * Used to encapsulate the data information reported by the caller, including
+ * grouId, streamId, dt, attributes, and body, auditVerison, msgUUID, etc.
+ * This class performs field value validity checks on the reported data and
+ * throws ProxyEventException for data that does not meet the reporting
requirements, including
+ * mandatory fields that are empty, attribute sets that contain reserved
words, attribute delimiters,
+ * empty message bodies, etc.
+ * Since the TCP and HTTP reports supported by the SDK differ only in the
+ * message body type, this class uses a Generics definition
+ */
+public abstract class EventInfo<T> {
+
+ protected static final Logger logger =
LoggerFactory.getLogger(EventInfo.class);
+ protected static final LogCounter exceptCnt = new LogCounter(10, 100000,
60 * 1000L);
+
+ private final String groupId;
+ private final String streamId;
+ private final long dtMs;
+ private final Map<String, String> attrs = new HashMap<>();
+ protected int msgCnt = 0;
+ protected int bodySize = 0;
+ protected final List<T> bodyList = new ArrayList<>();
+
+ protected EventInfo(String groupId, String streamId, long dtMs, Long
auditId, String msgUUID,
+ Map<String, String> attrs, boolean isSingle, List<T> bodyList)
throws ProxyEventException {
+ // groupId
+ if (StringUtils.isBlank(groupId)) {
+ throw new ProxyEventException("groupId is blank!");
+ }
+ this.groupId = groupId.trim();
+ // streamId
+ if (StringUtils.isBlank(streamId)) {
+ throw new ProxyEventException("streamId is blank!");
+ }
+ this.streamId = streamId.trim();
+ // dtMs
+ this.dtMs = dtMs <= 0L ? System.currentTimeMillis() : dtMs;
+ // attrs
+ if (attrs != null && !attrs.isEmpty()) {
+ for (Map.Entry<String, String> entry : attrs.entrySet()) {
+ if (StringUtils.isBlank(entry.getKey())) {
+ continue;
+ }
+ innSetAttr(entry.getKey().trim(), entry.getValue());
+ }
+ }
+ if (auditId != null && auditId != -1L) {
+ this.attrs.put(AttributeConstants.AUDIT_VERSION,
String.valueOf(auditId));
+ }
+ if (StringUtils.isNotBlank(msgUUID)) {
+ this.attrs.put(AttributeConstants.MSG_UUID, msgUUID.trim());
+ }
+ // body
+ setBodyList(isSingle, bodyList);
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public long getDtMs() {
+ return dtMs;
+ }
+
+ public Map<String, String> getAttrs() {
+ return attrs;
+ }
+
+ public int getMsgCnt() {
+ return msgCnt;
+ }
+
+ public int getBodySize() {
+ return bodySize;
+ }
+
+ protected abstract void setBodyList(boolean isSingle, List<T> bodyList)
throws ProxyEventException;
+
+ protected void innSetAttr(String key, String value) throws
ProxyEventException {
+ if (ProxyUtils.SdkReservedWords.contains(key)) {
+ throw new ProxyEventException("Attribute key(" + key + ") is
reserved word!");
+ }
+ if (key.contains(AttributeConstants.SEPARATOR)
+ || key.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+ if (exceptCnt.shouldPrint()) {
+ logger.warn(String.format("Attribute key(%s) include reserved
word(%s or %s)",
+ key, AttributeConstants.KEY_VALUE_SEPARATOR,
AttributeConstants.KEY_VALUE_SEPARATOR));
+ }
+ throw new ProxyEventException("Attribute key(" + key + ") include
reserved word("
+ + AttributeConstants.KEY_VALUE_SEPARATOR + " or "
+ + AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
+ }
+ String valValue = value;
+ if (valValue != null) {
+ valValue = valValue.trim();
+ if (valValue.contains(AttributeConstants.SEPARATOR)
+ ||
valValue.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+ if (exceptCnt.shouldPrint()) {
+ logger.warn(String.format("Attribute value(%s) include
reserved word(%s or %s)",
+ valValue, AttributeConstants.KEY_VALUE_SEPARATOR,
AttributeConstants.KEY_VALUE_SEPARATOR));
+ }
+ throw new ProxyEventException("Attribute value(" + valValue +
") include reserved word("
+ + AttributeConstants.KEY_VALUE_SEPARATOR + " or "
+ + AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
+ }
+ }
+ this.attrs.put(key, valValue);
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxyEventException.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxyEventException.java
new file mode 100644
index 0000000000..7392631d61
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxyEventException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.exception;
+
+/**
+ * Proxy Event Exception
+ *
+ * This exception is used specifically when an unacceptable situation when
constructing an event.
+ * If this exception is thrown, the caller needs to solve the specified
problem or discard the illegal message.
+ */
+public class ProxyEventException extends Exception {
+
+ public ProxyEventException() {
+ }
+
+ public ProxyEventException(String message) {
+ super(message);
+ }
+
+ public ProxyEventException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ProxyEventException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpEventInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpEventInfo.java
new file mode 100644
index 0000000000..f8ae172bba
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpEventInfo.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.http;
+
+import org.apache.inlong.sdk.dataproxy.common.EventInfo;
+import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * HTTP Event Information class
+ *
+ * Used to encapsulate the data information reported by HTTP
+ */
+public class HttpEventInfo extends EventInfo<String> {
+
+ public HttpEventInfo(String groupId, String streamId,
+ long dtMs, String body) throws ProxyEventException {
+ super(groupId, streamId, dtMs, null, null, null, true,
Collections.singletonList(body));
+ }
+
+ public HttpEventInfo(String groupId, String streamId,
+ long dtMs, long auditId, String body) throws ProxyEventException {
+ super(groupId, streamId, dtMs, auditId, null, null, true,
Collections.singletonList(body));
+ }
+
+ public HttpEventInfo(String groupId, String streamId,
+ long dtMs, List<String> bodyList) throws ProxyEventException {
+ super(groupId, streamId, dtMs, null, null, null, false, bodyList);
+ }
+
+ public HttpEventInfo(String groupId, String streamId,
+ long dtMs, long auditId, List<String> bodyList) throws
ProxyEventException {
+ super(groupId, streamId, dtMs, auditId, null, null, false, bodyList);
+ }
+
+ public List<String> getBodyList() {
+ return bodyList;
+ }
+
+ @Override
+ protected void setBodyList(boolean isSingle, List<String> bodyList) throws
ProxyEventException {
+ String tmpValue;
+ if (isSingle) {
+ if (StringUtils.isBlank(bodyList.get(0))) {
+ throw new ProxyEventException("body is null or empty!");
+ }
+ tmpValue = bodyList.get(0).trim();
+ this.bodyList.add(tmpValue);
+ this.bodySize = tmpValue.length();
+ this.msgCnt = 1;
+ } else {
+ if (bodyList == null || bodyList.isEmpty()) {
+ throw new ProxyEventException("bodyList is null or empty!");
+ }
+ for (String body : bodyList) {
+ if (StringUtils.isBlank(body)) {
+ continue;
+ }
+ tmpValue = body.trim();
+ this.bodyList.add(tmpValue.trim());
+ this.bodySize += tmpValue.length();
+ this.msgCnt++;
+ }
+ if (this.bodyList.isEmpty()) {
+ throw new ProxyEventException("bodyList no valid content!");
+ }
+ }
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
new file mode 100644
index 0000000000..dfa255aab6
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.tcp;
+
+import org.apache.inlong.sdk.dataproxy.common.EventInfo;
+import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HTTP Event Information class
+ *
+ * Used to encapsulate the data information reported by TCP
+ */
+public class TcpEventInfo extends EventInfo<byte[]> {
+
+ public TcpEventInfo(String groupId, String streamId, long dtMs,
+ Map<String, String> attrs, byte[] body) throws ProxyEventException
{
+ super(groupId, streamId, dtMs, null, null, attrs, true,
Collections.singletonList(body));
+ }
+
+ public TcpEventInfo(String groupId, String streamId, long dtMs, long
auditId,
+ Map<String, String> attrs, byte[] body) throws ProxyEventException
{
+ super(groupId, streamId, dtMs, auditId, null, attrs, true,
Collections.singletonList(body));
+ }
+
+ public TcpEventInfo(String groupId, String streamId, long dtMs, String
msgUUID,
+ Map<String, String> attrs, byte[] body) throws ProxyEventException
{
+ super(groupId, streamId, dtMs, null, msgUUID, attrs, true,
Collections.singletonList(body));
+ }
+
+ public TcpEventInfo(String groupId, String streamId, long dtMs, long
auditId, String msgUUID,
+ Map<String, String> attrs, byte[] body) throws ProxyEventException
{
+ super(groupId, streamId, dtMs, auditId, msgUUID, attrs, true,
Collections.singletonList(body));
+ }
+
+ public TcpEventInfo(String groupId, String streamId,
+ long dtMs, Map<String, String> attrs, List<byte[]> bodyList)
throws ProxyEventException {
+ super(groupId, streamId, dtMs, null, null, attrs, false, bodyList);
+ }
+
+ public TcpEventInfo(String groupId, String streamId, long dtMs,
+ long auditId, Map<String, String> attrs, List<byte[]> bodyList)
throws ProxyEventException {
+ super(groupId, streamId, dtMs, auditId, null, attrs, false, bodyList);
+ }
+
+ public TcpEventInfo(String groupId, String streamId, long dtMs,
+ String msgUUID, Map<String, String> attrs, List<byte[]> bodyList)
throws ProxyEventException {
+ super(groupId, streamId, dtMs, null, msgUUID, attrs, false, bodyList);
+ }
+
+ public TcpEventInfo(String groupId, String streamId, long dtMs,
+ long auditId, String msgUUID, Map<String, String> attrs,
List<byte[]> bodyList) throws ProxyEventException {
+ super(groupId, streamId, dtMs, auditId, msgUUID, attrs, false,
bodyList);
+ }
+
+ public List<byte[]> getBodyList() {
+ return bodyList;
+ }
+
+ public void setAttr(String key, String value) throws ProxyEventException {
+ if (StringUtils.isBlank(key)) {
+ throw new ProxyEventException("Key is blank!");
+ }
+ innSetAttr(key.trim(), value);
+ }
+
+ @Override
+ protected void setBodyList(boolean isSingle, List<byte[]> bodyList) throws
ProxyEventException {
+ if (isSingle) {
+ if (bodyList.get(0) == null || bodyList.get(0).length == 0) {
+ throw new ProxyEventException("body is null or empty!");
+ }
+ this.bodyList.add(bodyList.get(0));
+ this.bodySize = bodyList.get(0).length;
+ this.msgCnt = 1;
+ } else {
+ if (bodyList == null || bodyList.isEmpty()) {
+ throw new ProxyEventException("bodyList is null or empty!");
+ }
+ for (byte[] body : bodyList) {
+ if (body == null || body.length == 0) {
+ continue;
+ }
+ this.bodyList.add(body);
+ this.bodySize += body.length;
+ this.msgCnt++;
+ }
+ if (this.bodyList.isEmpty()) {
+ throw new ProxyEventException("bodyList no valid content!");
+ }
+ }
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
index edb14c62b2..3fac799a3c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
@@ -27,7 +27,7 @@ public class LogCounter {
private long control = 100000L;
private long reset = 60 * 1000L;
- private AtomicLong lastLogTime = new
AtomicLong(System.currentTimeMillis());
+ private final AtomicLong lastLogTime = new
AtomicLong(System.currentTimeMillis());
public LogCounter(long start, long control, long reset) {
this.start = start;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 0367e6e06a..61dbb263df 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -38,11 +38,20 @@ import java.util.Set;
public class ProxyUtils {
+ public static final String KEY_FILE_STATUS_CHECK = "_file_status_check";
+ public static final String KEY_SECRET_ID = "_secretId";
+ public static final String KEY_SIGNATURE = "_signature";
+ public static final String KEY_TIME_STAMP = "_timeStamp";
+ public static final String KEY_NONCE = "_nonce";
+ public static final String KEY_USERNAME = "_userName";
+ public static final String KEY_CLIENT_IP = "_clientIP";
+ public static final String KEY_ENCY_VERSION = "_encyVersion";
+ public static final String KEY_ENCY_AES_KEY = "_encyAesKey";
private static final Logger logger =
LoggerFactory.getLogger(ProxyUtils.class);
private static final LogCounter exceptCounter = new LogCounter(10, 200000,
60 * 1000L);
private static final int TIME_LENGTH = 13;
- private static final Set<String> invalidAttr = new HashSet<>();
+ public static final Set<String> SdkReservedWords = new HashSet<>();
public static final Set<MsgType> SdkAllowedMsgType = new HashSet<>();
private static String localHost;
private static String sdkVersion;
@@ -50,10 +59,25 @@ public class ProxyUtils {
static {
localHost = getLocalIp();
getJarVersion();
- Collections.addAll(invalidAttr, "groupId", "streamId", "dt",
"msgUUID", "cp",
- "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId",
"_file_status_check", "_secretId",
- "_signature", "_timeStamp", "_nonce", "_userName",
"_clientIP", "_encyVersion", "_encyAesKey",
- "proxySend", "errMsg", "errCode",
AttributeConstants.MSG_RPT_TIME);
+ Collections.addAll(SdkReservedWords,
+ AttributeConstants.GROUP_ID, AttributeConstants.STREAM_ID,
+ AttributeConstants.DATA_TIME, AttributeConstants.MSG_UUID,
+ AttributeConstants.COMPRESS_TYPE,
AttributeConstants.MESSAGE_COUNT,
+ AttributeConstants.MESSAGE_TYPE, AttributeConstants.METHOD,
+ AttributeConstants.SEQUENCE_ID, AttributeConstants.TIME_STAMP,
+ AttributeConstants.NODE_IP, AttributeConstants.MESSAGE_ID,
+ AttributeConstants.MESSAGE_IS_ACK,
AttributeConstants.MESSAGE_PROXY_SEND,
+ AttributeConstants.MESSAGE_PROCESS_ERRCODE,
AttributeConstants.MESSAGE_PROCESS_ERRMSG,
+ AttributeConstants.MSG_RPT_TIME,
AttributeConstants.AUDIT_VERSION,
+ AttributeConstants.PROXY_SDK_VERSION, KEY_FILE_STATUS_CHECK,
+ KEY_SECRET_ID, KEY_SIGNATURE, KEY_TIME_STAMP, KEY_NONCE,
KEY_USERNAME,
+ KEY_CLIENT_IP, KEY_ENCY_VERSION, KEY_ENCY_AES_KEY);
+ /*
+ * Collections.addAll(SdkReservedWords, "groupId", "streamId", "dt",
"msgUUID", "cp", "cnt", "mt", "m", "sid",
+ * "t", "NodeIP", "messageId", "isAck", "proxySend", "errCode",
"errMsg", "rtms", "sdkVersion", "auditVersion",
+ * "_file_status_check", "_secretId", "_signature", "_timeStamp",
"_nonce", "_userName", "_clientIP",
+ * "_encyVersion", "_encyAesKey");
+ */
Collections.addAll(SdkAllowedMsgType,
MsgType.MSG_ACK_SERVICE, MsgType.MSG_MULTI_BODY,
MsgType.MSG_BIN_MULTI_BODY);
@@ -106,7 +130,7 @@ public class ProxyUtils {
return false;
}
for (String key : attrsMap.keySet()) {
- if (invalidAttr.contains(key)) {
+ if (SdkReservedWords.contains(key)) {
logger.error("the attributes is invalid ,please check ! {}",
key);
return false;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EventInfoTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EventInfoTest.java
new file mode 100644
index 0000000000..c2af1eacb4
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EventInfoTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EventInfoTest {
+
+ @Test
+ public void testTcpEventInfo() throws Exception {
+ // case a1 normal data setting
+ String groupId = "groupId";
+ String streamId = "streamId";
+ long dtMs = System.currentTimeMillis();
+ Map<String, String> attrsA1 = null;
+ byte[] body = "test".getBytes(StandardCharsets.UTF_8);
+ TcpEventInfo eventInfoA1 =
+ new TcpEventInfo(groupId, streamId, dtMs, attrsA1, body);
+ Assert.assertEquals(groupId, eventInfoA1.getGroupId());
+ Assert.assertEquals(streamId, eventInfoA1.getStreamId());
+ Assert.assertEquals(dtMs, eventInfoA1.getDtMs());
+ Assert.assertNotNull(eventInfoA1.getAttrs());
+ Assert.assertEquals(0, eventInfoA1.getAttrs().size());
+ Assert.assertEquals(1, eventInfoA1.getMsgCnt());
+ Assert.assertEquals(body.length, eventInfoA1.getBodySize());
+ Assert.assertArrayEquals(body, eventInfoA1.getBodyList().get(0));
+ eventInfoA1.setAttr("a1key", "mmm");
+ Assert.assertEquals(1, eventInfoA1.getAttrs().size());
+ Assert.assertEquals(1, eventInfoA1.getMsgCnt());
+ // case a2 normal data setting
+ Map<String, String> attrsA2 = new HashMap<>();
+ List<byte[]> bodyListA2 = new ArrayList<>();
+ bodyListA2.add("test_msg_1".getBytes(StandardCharsets.UTF_8));
+ bodyListA2.add("test_msg_2".getBytes(StandardCharsets.UTF_8));
+ TcpEventInfo eventInfoA2 =
+ new TcpEventInfo(groupId, streamId, dtMs, attrsA2, bodyListA2);
+ Assert.assertEquals(groupId, eventInfoA2.getGroupId());
+ Assert.assertEquals(streamId, eventInfoA2.getStreamId());
+ Assert.assertEquals(dtMs, eventInfoA2.getDtMs());
+ Assert.assertEquals(attrsA2, eventInfoA2.getAttrs());
+ Assert.assertEquals(2, eventInfoA2.getMsgCnt());
+ int totalSize = 0;
+ List<byte[]> tgtListA2 = eventInfoA2.getBodyList();
+ for (byte[] b : bodyListA2) {
+ Assert.assertTrue(tgtListA2.contains(b));
+ totalSize += b.length;
+ }
+ Assert.assertEquals(totalSize, eventInfoA2.getBodySize());
+ eventInfoA2.setAttr("a2key", "ccc");
+ Assert.assertEquals(1, eventInfoA2.getAttrs().size());
+ // case A3 abnormal data setting
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(null, streamId, dtMs, attrsA1, body));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(" ", streamId, dtMs, attrsA1, body));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, null, dtMs, attrsA1, body));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, " ", dtMs, attrsA1, body));
+ byte[] bodyA301 = null;
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1,
bodyA301));
+ byte[] bodyA302 = "".getBytes(StandardCharsets.UTF_8);
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1,
bodyA302));
+ byte[] bodyA303 = " ".getBytes(StandardCharsets.UTF_8);
+ TcpEventInfo eventInfoA303 =
+ new TcpEventInfo(groupId, streamId, dtMs, attrsA1, bodyA303);
+ Assert.assertEquals(1, eventInfoA303.getMsgCnt());
+ Assert.assertEquals(bodyA303.length, eventInfoA303.getBodySize());
+ List<byte[]> msgListA311 = null;
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1,
msgListA311));
+ List<byte[]> msgListA312 = new ArrayList<>();
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1,
msgListA312));
+ List<byte[]> msgListA313 = new ArrayList<>();
+ msgListA313.add(null);
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1,
msgListA313));
+ List<byte[]> msgListA314 = new ArrayList<>();
+ msgListA314.add("".getBytes(StandardCharsets.UTF_8));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1,
msgListA314));
+ List<byte[]> msgListA315 = new ArrayList<>();
+ msgListA315.add("".getBytes(StandardCharsets.UTF_8));
+ msgListA315.add("test".getBytes(StandardCharsets.UTF_8));
+ TcpEventInfo eventInfoA315 =
+ new TcpEventInfo(groupId, streamId, dtMs, attrsA1,
msgListA315);
+ Assert.assertEquals(1, eventInfoA315.getMsgCnt());
+ Assert.assertEquals("test".getBytes(StandardCharsets.UTF_8).length,
eventInfoA315.getBodySize());
+ // case A4 normal attributes setting
+ Map<String, String> attrsA41 = new HashMap<>();
+ attrsA41.put("aaa&mmm", "value");
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA41,
body));
+ Map<String, String> attrsA42 = new HashMap<>();
+ attrsA42.put("aaa=mmm", "value");
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA42,
body));
+ Map<String, String> attrsA43 = new HashMap<>();
+ attrsA43.put("groupId", "value");
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA43,
body));
+ Map<String, String> attrsA44 = new HashMap<>();
+ attrsA44.put("testA44", "va&lue");
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA44,
body));
+ Map<String, String> attrsA45 = new HashMap<>();
+ attrsA45.put("testA45", "va=lue");
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA45,
body));
+ TcpEventInfo eventInfoA46 =
+ new TcpEventInfo(groupId, streamId, dtMs, attrsA1, body);
+ Assert.assertThrows(ProxyEventException.class,
+ () -> eventInfoA46.setAttr("aaa&mmm", "value"));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> eventInfoA46.setAttr("streamId", "value"));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> eventInfoA46.setAttr("kkk=mmm", "value"));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> eventInfoA46.setAttr("aaaa", "va=lue"));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> eventInfoA46.setAttr("aaaa", "va&lue"));
+ // case 5, set uuid, auditVersion
+ String uuid = "uuid";
+ long auditVer = 32L;
+ TcpEventInfo eventInfoA51 =
+ new TcpEventInfo(groupId, streamId, dtMs, auditVer, attrsA1,
body);
+ Assert.assertNotNull(eventInfoA51.getAttrs());
+ Assert.assertEquals(1, eventInfoA51.getAttrs().size());
+ TcpEventInfo eventInfoA52 =
+ new TcpEventInfo(groupId, streamId, dtMs, uuid, attrsA1, body);
+ Assert.assertNotNull(eventInfoA52.getAttrs());
+ Assert.assertEquals(1, eventInfoA52.getAttrs().size());
+ TcpEventInfo eventInfoA53 =
+ new TcpEventInfo(groupId, streamId, dtMs, auditVer, uuid,
attrsA1, body);
+ Assert.assertNotNull(eventInfoA53.getAttrs());
+ Assert.assertEquals(2, eventInfoA53.getAttrs().size());
+ }
+
+ @Test
+ public void testHttpEventInfo() throws Exception {
+ // case a1 normal data setting
+ String groupId = "groupId";
+ String streamId = "streamId";
+ long dtMs = System.currentTimeMillis();
+ String body = "test";
+ HttpEventInfo eventInfoA1 =
+ new HttpEventInfo(groupId, streamId, dtMs, body);
+ Assert.assertEquals(groupId, eventInfoA1.getGroupId());
+ Assert.assertEquals(streamId, eventInfoA1.getStreamId());
+ Assert.assertEquals(dtMs, eventInfoA1.getDtMs());
+ Assert.assertNotNull(eventInfoA1.getAttrs());
+ Assert.assertTrue(eventInfoA1.getAttrs().isEmpty());
+ Assert.assertEquals(1, eventInfoA1.getMsgCnt());
+ Assert.assertEquals(body.length(), eventInfoA1.getBodySize());
+ Assert.assertEquals(body, eventInfoA1.getBodyList().get(0));
+ // case A2 normal setting
+ List<String> bodyListA2 = new ArrayList<>();
+ bodyListA2.add("test_body_1");
+ bodyListA2.add("test_body_2");
+ HttpEventInfo eventInfoA2 =
+ new HttpEventInfo(groupId, streamId, dtMs, bodyListA2);
+ Assert.assertEquals(groupId, eventInfoA2.getGroupId());
+ Assert.assertEquals(streamId, eventInfoA2.getStreamId());
+ Assert.assertEquals(dtMs, eventInfoA2.getDtMs());
+ Assert.assertNotNull(eventInfoA2.getAttrs());
+ Assert.assertTrue(eventInfoA2.getAttrs().isEmpty());
+ Assert.assertEquals(2, eventInfoA2.getMsgCnt());
+ int totalSize = 0;
+ List<String> tgtA2 = eventInfoA2.getBodyList();
+ for (String item : tgtA2) {
+ Assert.assertTrue(bodyListA2.contains(item));
+ totalSize += item.length();
+ }
+ Assert.assertEquals(totalSize, eventInfoA2.getBodySize());
+ // case A3 auditVer
+ long auditVer = 1000L;
+ HttpEventInfo eventInfoA3 =
+ new HttpEventInfo(groupId, streamId, dtMs, auditVer, body);
+ Assert.assertNotNull(eventInfoA3.getAttrs());
+ Assert.assertEquals(1, eventInfoA3.getAttrs().size());
+ // case A4 abnormal setting
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new HttpEventInfo(null, streamId, dtMs, body));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new HttpEventInfo(" ", streamId, dtMs, body));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new HttpEventInfo(groupId, null, dtMs, body));
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new HttpEventInfo(groupId, " ", dtMs, body));
+ String bodyA401 = null;
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new HttpEventInfo(groupId, streamId, dtMs, bodyA401));
+ String bodyA402 = "";
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new HttpEventInfo(groupId, streamId, dtMs, bodyA402));
+ String bodyA403 = " ";
+ Assert.assertThrows(ProxyEventException.class,
+ () -> new HttpEventInfo(groupId, streamId, dtMs, bodyA403));
+ }
+}