This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0aa56b867e [INLONG-8990][Manager] Rename DataProxyMsgEncType to
Message WrapType (#8991)
0aa56b867e is described below
commit 0aa56b867e2e730da81f090ac1325bfd32de9ec6
Author: fuweng11 <[email protected]>
AuthorDate: Mon Sep 25 22:48:26 2023 +0800
[INLONG-8990][Manager] Rename DataProxyMsgEncType to Message WrapType
(#8991)
---
.../{DataProxyMsgEncType.java => MessageWrapType.java} | 18 +++++++++---------
.../inlong/dataproxy/metrics/audit/AuditUtils.java | 4 ++--
.../dataproxy/sink/common/DefaultEventHandler.java | 6 +++---
.../dataproxy/source/httpMsg/HttpMessageHandler.java | 6 +++---
.../inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java | 6 +++---
.../apache/inlong/dataproxy/utils/MessageUtils.java | 4 ++--
.../inlong/manager/common/consts/InlongConstants.java | 4 ++--
.../manager/service/message/DeserializeOperator.java | 4 ++--
.../service/message/DeserializeOperatorFactory.java | 4 ++--
.../service/message/InlongMsgDeserializeOperator.java | 6 +++---
.../service/message/PbMsgDeserializeOperator.java | 6 +++---
.../service/message/RawMsgDeserializeOperator.java | 6 +++---
.../service/resource/queue/kafka/KafkaOperator.java | 6 +++---
.../service/resource/queue/pulsar/PulsarOperator.java | 6 +++---
.../service/resource/queue/tubemq/TubeMQOperator.java | 6 +++---
.../sdk/sort/impl/decode/MessageDeserializer.java | 10 +++++-----
16 files changed, 51 insertions(+), 51 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/MessageWrapType.java
similarity index 71%
rename from
inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
rename to
inlong-common/src/main/java/org/apache/inlong/common/enums/MessageWrapType.java
index 1335cfcf6a..159c081c56 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/MessageWrapType.java
@@ -20,14 +20,14 @@ package org.apache.inlong.common.enums;
/**
* Enumeration class of encoding format of data output from DataProxy to MQ
*/
-public enum DataProxyMsgEncType {
+public enum MessageWrapType {
- MSG_ENCODE_TYPE_RAW(0, "Raw", "Raw message without any InLong format"),
- MSG_ENCODE_TYPE_PB(1, "PB", "The PB MessagePack encode format"),
- MSG_ENCODE_TYPE_INLONGMSG(2, "InLongMsg", "The InLongMsg encode format"),
- MSG_ENCODE_TYPE_UNKNOWN(99, "Unknown", "Unknown encode format");
+ RAW(0, "RAW", "The message body wrapped with nothing"),
+ INLONG_MSG_V1(1, "INLONG_MSG_V1", "The message body wrapped with inlong
msg v1, which is the PB protocol"),
+ INLONG_MSG_V0(2, "INLONG_MSG_V0", "The message body wrapped with inlong
msg v0, which is a six segment protocol"),
+ UNKNOWN(99, "UNKNOWN", "Unknown message wrap type");
- DataProxyMsgEncType(int id, String name, String desc) {
+ MessageWrapType(int id, String name, String desc) {
this.id = id;
this.name = name;
this.desc = desc;
@@ -49,13 +49,13 @@ public enum DataProxyMsgEncType {
return desc;
}
- public static DataProxyMsgEncType valueOf(int value) {
- for (DataProxyMsgEncType msgEncType : DataProxyMsgEncType.values()) {
+ public static MessageWrapType valueOf(int value) {
+ for (MessageWrapType msgEncType : MessageWrapType.values()) {
if (msgEncType.getId() == value) {
return msgEncType;
}
}
- return MSG_ENCODE_TYPE_UNKNOWN;
+ return UNKNOWN;
}
private final int id;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 258bc24255..7bd27521c6 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -19,7 +19,7 @@ package org.apache.inlong.dataproxy.metrics.audit;
import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.util.AuditConfig;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -64,7 +64,7 @@ public class AuditUtils {
}
Map<String, String> headers = event.getHeaders();
String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
- if
(DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId().equalsIgnoreCase(pkgVersion))
{
+ if
(MessageWrapType.INLONG_MSG_V1.getStrId().equalsIgnoreCase(pkgVersion)) {
String inlongGroupId =
DataProxyMetricItem.getInlongGroupId(headers);
String inlongStreamId =
DataProxyMetricItem.getInlongStreamId(headers);
long logTime = getLogTime(headers);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
index 88430d2eba..64313adef5 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
@@ -17,7 +17,7 @@
package org.apache.inlong.dataproxy.sink.common;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
@@ -51,8 +51,8 @@ public class DefaultEventHandler implements EventHandler {
INLONG_COMPRESSED_TYPE compressType) {
Map<String, String> headers = new HashMap<>();
// version int32 protocol version, the value is 1
- headers.put(ConfigConstants.MSG_ENCODE_VER,
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
- headers.put(EventConstants.HEADER_KEY_VERSION,
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
+ headers.put(ConfigConstants.MSG_ENCODE_VER,
MessageWrapType.INLONG_MSG_V1.getStrId());
+ headers.put(EventConstants.HEADER_KEY_VERSION,
MessageWrapType.INLONG_MSG_V1.getStrId());
// inlongGroupId string inlongGroupId
headers.put(EventConstants.INLONG_GROUP_ID,
profile.getInlongGroupId());
// inlongStreamId string inlongStreamId
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
index 7d98d199f3..89c8e04857 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
@@ -18,7 +18,7 @@
package org.apache.inlong.dataproxy.source.httpMsg;
import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.InLongMsg;
@@ -360,9 +360,9 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
eventHeaders.put(ConfigConstants.DATAPROXY_IP_KEY,
source.getSrcHost());
eventHeaders.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
eventHeaders.put(ConfigConstants.MSG_ENCODE_VER,
- DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+ MessageWrapType.INLONG_MSG_V0.getStrId());
eventHeaders.put(EventConstants.HEADER_KEY_VERSION,
- DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+ MessageWrapType.INLONG_MSG_V0.getStrId());
eventHeaders.put(AttributeConstants.RCV_TIME,
String.valueOf(msgRcvTime));
eventHeaders.put(ConfigConstants.PKG_TIME_KEY,
String.valueOf(pkgTime));
Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
index a4e3a33305..68e05d4dc2 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
@@ -18,7 +18,7 @@
package org.apache.inlong.dataproxy.source.v0msg;
import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
@@ -240,9 +240,9 @@ public abstract class AbsV0MsgCodec {
headers.put(ConfigConstants.DATAPROXY_IP_KEY, source.getSrcHost());
headers.put(ConfigConstants.MSG_COUNTER_KEY, String.valueOf(msgCount));
headers.put(ConfigConstants.MSG_ENCODE_VER,
- DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+ MessageWrapType.INLONG_MSG_V0.getStrId());
headers.put(EventConstants.HEADER_KEY_VERSION,
- DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+ MessageWrapType.INLONG_MSG_V0.getStrId());
headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
headers.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
headers.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(msgPkgTime));
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
index 36b6a0d9ed..d8b2227f11 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -18,7 +18,7 @@
package org.apache.inlong.dataproxy.utils;
import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.MsgType;
@@ -293,7 +293,7 @@ public class MessageUtils {
// common attributes
Map<String, String> attrs = new HashMap<>();
attrs.put(ConfigConstants.MSG_ENCODE_VER, pkgVersion);
- if
(DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId().equalsIgnoreCase(pkgVersion))
{
+ if
(MessageWrapType.INLONG_MSG_V1.getStrId().equalsIgnoreCase(pkgVersion)) {
attrs.put("dataproxyip", NetworkUtils.getLocalIp());
attrs.put(Constants.INLONG_GROUP_ID,
headers.get(Constants.INLONG_GROUP_ID));
attrs.put(Constants.INLONG_STREAM_ID,
headers.get(Constants.INLONG_STREAM_ID));
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 7772ef4ba0..0f6c86056a 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.common.consts;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import com.google.common.collect.Sets;
@@ -202,7 +202,7 @@ public class InlongConstants {
/**
* Message compression type, 0: Raw message without any InLong format, 1:
InlongMsgPb, 2: InlongMsg
* <p/>
- * See more: {@link DataProxyMsgEncType}
+ * See more: {@link MessageWrapType}
*/
public static final String MSG_ENCODE_VER = "msgEnType";
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
index 290ac1ca15..ebb24b52d7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.message;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -42,7 +42,7 @@ public interface DeserializeOperator {
/**
* Determines whether the current instance matches the specified type.
*/
- boolean accept(DataProxyMsgEncType type);
+ boolean accept(MessageWrapType type);
/**
* List brief mq message info
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperatorFactory.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperatorFactory.java
index 6d7d43e82e..0c796dd810 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperatorFactory.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperatorFactory.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.message;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -38,7 +38,7 @@ public class DeserializeOperatorFactory {
/**
* Get a message queue resource operator instance via the given mqType
*/
- public DeserializeOperator getInstance(DataProxyMsgEncType type) {
+ public DeserializeOperator getInstance(MessageWrapType type) {
return operatorList.stream()
.filter(inst -> inst.accept(type))
.findFirst()
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
index b5165eb8bf..0e1b2a0a56 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.message;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.util.StringUtil;
@@ -39,8 +39,8 @@ import java.util.Objects;
public class InlongMsgDeserializeOperator implements DeserializeOperator {
@Override
- public boolean accept(DataProxyMsgEncType type) {
- return DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.equals(type);
+ public boolean accept(MessageWrapType type) {
+ return MessageWrapType.INLONG_MSG_V0.equals(type);
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
index 19742325f8..7b7727e8e7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.message;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.util.Utils;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
@@ -41,8 +41,8 @@ import java.util.Map;
public class PbMsgDeserializeOperator implements DeserializeOperator {
@Override
- public boolean accept(DataProxyMsgEncType type) {
- return DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.equals(type);
+ public boolean accept(MessageWrapType type) {
+ return MessageWrapType.INLONG_MSG_V1.equals(type);
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
index cf334ebcc8..361afe46d6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.message;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -35,8 +35,8 @@ import java.util.Map;
public class RawMsgDeserializeOperator implements DeserializeOperator {
@Override
- public boolean accept(DataProxyMsgEncType type) {
- return DataProxyMsgEncType.MSG_ENCODE_TYPE_RAW.equals(type);
+ public boolean accept(MessageWrapType type) {
+ return MessageWrapType.RAW.equals(type);
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index 82b2d7a03b..deb6601b63 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.resource.queue.kafka;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
@@ -157,9 +157,9 @@ public class KafkaOperator {
}
int wrapTypeId =
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
+
Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(
- DataProxyMsgEncType.valueOf(wrapTypeId));
+ MessageWrapType.valueOf(wrapTypeId));
messageList.addAll(deserializeOperator.decodeMsg(streamInfo,
record.value(), headers, index));
if (messageList.size() >= messageCount) {
break;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index f3f5f36615..77feca618c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.resource.queue.pulsar;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -430,9 +430,9 @@ public class PulsarOperator {
pulsarAdmin.topics().examineMessage(topicPartition,
"latest", messagePosition);
Map<String, String> headers = pulsarMessage.getProperties();
int wrapTypeId =
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
+ Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(
- DataProxyMsgEncType.valueOf(wrapTypeId));
+ MessageWrapType.valueOf(wrapTypeId));
briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo,
pulsarMessage.getData(),
headers, index));
} catch (Exception e) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
index 06193a49bd..52ab267dc1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.resource.queue.tubemq;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.HttpUtils;
@@ -303,10 +303,10 @@ public class TubeMQOperator {
}
int wrapTypeId =
Integer.parseInt(map.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
+
Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
byte[] messageData =
Base64.getDecoder().decode(tubeDataInfo.getData());
DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(
- DataProxyMsgEncType.valueOf(wrapTypeId));
+ MessageWrapType.valueOf(wrapTypeId));
messageList.addAll(deserializeOperator.decodeMsg(streamInfo,
messageData, map, index));
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
index ace88b88e3..25ff99a872 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sdk.sort.impl.decode;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.util.StringUtil;
import org.apache.inlong.common.util.Utils;
@@ -74,12 +74,12 @@ public class MessageDeserializer implements Deserializer {
// 1. version
int version =
Integer.parseInt(headers.getOrDefault(EventConstants.HEADER_KEY_VERSION,
-
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
- if (version == DataProxyMsgEncType.MSG_ENCODE_TYPE_RAW.getId()) {
+ Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
+ if (version == MessageWrapType.RAW.getId()) {
return decode(context, inLongTopic, data, headers);
- } else if (version == DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getId()) {
+ } else if (version == MessageWrapType.INLONG_MSG_V1.getId()) {
return decodePB(context, inLongTopic, data, headers);
- } else if (version ==
DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId()) {
+ } else if (version == MessageWrapType.INLONG_MSG_V0.getId()) {
return decodeInlongMsg(context, inLongTopic, data, headers);
} else {
throw new IllegalArgumentException("Unknown version type:" +
version);