This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0aa56b867e [INLONG-8990][Manager] Rename DataProxyMsgEncType to 
Message WrapType (#8991)
0aa56b867e is described below

commit 0aa56b867e2e730da81f090ac1325bfd32de9ec6
Author: fuweng11 <[email protected]>
AuthorDate: Mon Sep 25 22:48:26 2023 +0800

    [INLONG-8990][Manager] Rename DataProxyMsgEncType to Message WrapType 
(#8991)
---
 .../{DataProxyMsgEncType.java => MessageWrapType.java} | 18 +++++++++---------
 .../inlong/dataproxy/metrics/audit/AuditUtils.java     |  4 ++--
 .../dataproxy/sink/common/DefaultEventHandler.java     |  6 +++---
 .../dataproxy/source/httpMsg/HttpMessageHandler.java   |  6 +++---
 .../inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java   |  6 +++---
 .../apache/inlong/dataproxy/utils/MessageUtils.java    |  4 ++--
 .../inlong/manager/common/consts/InlongConstants.java  |  4 ++--
 .../manager/service/message/DeserializeOperator.java   |  4 ++--
 .../service/message/DeserializeOperatorFactory.java    |  4 ++--
 .../service/message/InlongMsgDeserializeOperator.java  |  6 +++---
 .../service/message/PbMsgDeserializeOperator.java      |  6 +++---
 .../service/message/RawMsgDeserializeOperator.java     |  6 +++---
 .../service/resource/queue/kafka/KafkaOperator.java    |  6 +++---
 .../service/resource/queue/pulsar/PulsarOperator.java  |  6 +++---
 .../service/resource/queue/tubemq/TubeMQOperator.java  |  6 +++---
 .../sdk/sort/impl/decode/MessageDeserializer.java      | 10 +++++-----
 16 files changed, 51 insertions(+), 51 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/MessageWrapType.java
similarity index 71%
rename from 
inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
rename to 
inlong-common/src/main/java/org/apache/inlong/common/enums/MessageWrapType.java
index 1335cfcf6a..159c081c56 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/MessageWrapType.java
@@ -20,14 +20,14 @@ package org.apache.inlong.common.enums;
 /**
  * Enumeration class of encoding format of data output from DataProxy to MQ
  */
-public enum DataProxyMsgEncType {
+public enum MessageWrapType {
 
-    MSG_ENCODE_TYPE_RAW(0, "Raw", "Raw message without any InLong format"),
-    MSG_ENCODE_TYPE_PB(1, "PB", "The PB MessagePack encode format"),
-    MSG_ENCODE_TYPE_INLONGMSG(2, "InLongMsg", "The InLongMsg encode format"),
-    MSG_ENCODE_TYPE_UNKNOWN(99, "Unknown", "Unknown encode format");
+    RAW(0, "RAW", "The message body wrapped with nothing"),
+    INLONG_MSG_V1(1, "INLONG_MSG_V1", "The message body wrapped with inlong 
msg v1, which is the PB protocol"),
+    INLONG_MSG_V0(2, "INLONG_MSG_V0", "The message body wrapped with inlong 
msg v0, which is a six segment protocol"),
+    UNKNOWN(99, "UNKNOWN", "Unknown message wrap type");
 
-    DataProxyMsgEncType(int id, String name, String desc) {
+    MessageWrapType(int id, String name, String desc) {
         this.id = id;
         this.name = name;
         this.desc = desc;
@@ -49,13 +49,13 @@ public enum DataProxyMsgEncType {
         return desc;
     }
 
-    public static DataProxyMsgEncType valueOf(int value) {
-        for (DataProxyMsgEncType msgEncType : DataProxyMsgEncType.values()) {
+    public static MessageWrapType valueOf(int value) {
+        for (MessageWrapType msgEncType : MessageWrapType.values()) {
             if (msgEncType.getId() == value) {
                 return msgEncType;
             }
         }
-        return MSG_ENCODE_TYPE_UNKNOWN;
+        return UNKNOWN;
     }
 
     private final int id;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 258bc24255..7bd27521c6 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -19,7 +19,7 @@ package org.apache.inlong.dataproxy.metrics.audit;
 
 import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.audit.util.AuditConfig;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -64,7 +64,7 @@ public class AuditUtils {
         }
         Map<String, String> headers = event.getHeaders();
         String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
-        if 
(DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId().equalsIgnoreCase(pkgVersion))
 {
+        if 
(MessageWrapType.INLONG_MSG_V1.getStrId().equalsIgnoreCase(pkgVersion)) {
             String inlongGroupId = 
DataProxyMetricItem.getInlongGroupId(headers);
             String inlongStreamId = 
DataProxyMetricItem.getInlongStreamId(headers);
             long logTime = getLogTime(headers);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
index 88430d2eba..64313adef5 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.dataproxy.sink.common;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
@@ -51,8 +51,8 @@ public class DefaultEventHandler implements EventHandler {
             INLONG_COMPRESSED_TYPE compressType) {
         Map<String, String> headers = new HashMap<>();
         // version int32 protocol version, the value is 1
-        headers.put(ConfigConstants.MSG_ENCODE_VER, 
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
-        headers.put(EventConstants.HEADER_KEY_VERSION, 
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
+        headers.put(ConfigConstants.MSG_ENCODE_VER, 
MessageWrapType.INLONG_MSG_V1.getStrId());
+        headers.put(EventConstants.HEADER_KEY_VERSION, 
MessageWrapType.INLONG_MSG_V1.getStrId());
         // inlongGroupId string inlongGroupId
         headers.put(EventConstants.INLONG_GROUP_ID, 
profile.getInlongGroupId());
         // inlongStreamId string inlongStreamId
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
index 7d98d199f3..89c8e04857 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.dataproxy.source.httpMsg;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.InLongMsg;
@@ -360,9 +360,9 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
         eventHeaders.put(ConfigConstants.DATAPROXY_IP_KEY, 
source.getSrcHost());
         eventHeaders.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
         eventHeaders.put(ConfigConstants.MSG_ENCODE_VER,
-                DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+                MessageWrapType.INLONG_MSG_V0.getStrId());
         eventHeaders.put(EventConstants.HEADER_KEY_VERSION,
-                DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+                MessageWrapType.INLONG_MSG_V0.getStrId());
         eventHeaders.put(AttributeConstants.RCV_TIME, 
String.valueOf(msgRcvTime));
         eventHeaders.put(ConfigConstants.PKG_TIME_KEY, 
String.valueOf(pkgTime));
         Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
index a4e3a33305..68e05d4dc2 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.dataproxy.source.v0msg;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.consts.StatConstants;
@@ -240,9 +240,9 @@ public abstract class AbsV0MsgCodec {
         headers.put(ConfigConstants.DATAPROXY_IP_KEY, source.getSrcHost());
         headers.put(ConfigConstants.MSG_COUNTER_KEY, String.valueOf(msgCount));
         headers.put(ConfigConstants.MSG_ENCODE_VER,
-                DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+                MessageWrapType.INLONG_MSG_V0.getStrId());
         headers.put(EventConstants.HEADER_KEY_VERSION,
-                DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+                MessageWrapType.INLONG_MSG_V0.getStrId());
         headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
         headers.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
         headers.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(msgPkgTime));
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
index 36b6a0d9ed..d8b2227f11 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.dataproxy.utils;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.MsgType;
@@ -293,7 +293,7 @@ public class MessageUtils {
         // common attributes
         Map<String, String> attrs = new HashMap<>();
         attrs.put(ConfigConstants.MSG_ENCODE_VER, pkgVersion);
-        if 
(DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId().equalsIgnoreCase(pkgVersion))
 {
+        if 
(MessageWrapType.INLONG_MSG_V1.getStrId().equalsIgnoreCase(pkgVersion)) {
             attrs.put("dataproxyip", NetworkUtils.getLocalIp());
             attrs.put(Constants.INLONG_GROUP_ID, 
headers.get(Constants.INLONG_GROUP_ID));
             attrs.put(Constants.INLONG_STREAM_ID, 
headers.get(Constants.INLONG_STREAM_ID));
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 7772ef4ba0..0f6c86056a 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.common.consts;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 
 import com.google.common.collect.Sets;
 
@@ -202,7 +202,7 @@ public class InlongConstants {
     /**
      * Message compression type, 0: Raw message without any InLong format, 1: 
InlongMsgPb, 2: InlongMsg
      * <p/>
-     * See more: {@link DataProxyMsgEncType}
+     * See more: {@link MessageWrapType}
      */
     public static final String MSG_ENCODE_VER = "msgEnType";
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
index 290ac1ca15..ebb24b52d7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.message;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 
@@ -42,7 +42,7 @@ public interface DeserializeOperator {
     /**
      * Determines whether the current instance matches the specified type.
      */
-    boolean accept(DataProxyMsgEncType type);
+    boolean accept(MessageWrapType type);
 
     /**
      * List brief mq message info
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperatorFactory.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperatorFactory.java
index 6d7d43e82e..0c796dd810 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperatorFactory.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperatorFactory.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.message;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
@@ -38,7 +38,7 @@ public class DeserializeOperatorFactory {
     /**
      * Get a message queue resource operator instance via the given mqType
      */
-    public DeserializeOperator getInstance(DataProxyMsgEncType type) {
+    public DeserializeOperator getInstance(MessageWrapType type) {
         return operatorList.stream()
                 .filter(inst -> inst.accept(type))
                 .findFirst()
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
index b5165eb8bf..0e1b2a0a56 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.message;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.common.util.StringUtil;
@@ -39,8 +39,8 @@ import java.util.Objects;
 public class InlongMsgDeserializeOperator implements DeserializeOperator {
 
     @Override
-    public boolean accept(DataProxyMsgEncType type) {
-        return DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.equals(type);
+    public boolean accept(MessageWrapType type) {
+        return MessageWrapType.INLONG_MSG_V0.equals(type);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
index 19742325f8..7b7727e8e7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.message;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.util.Utils;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
@@ -41,8 +41,8 @@ import java.util.Map;
 public class PbMsgDeserializeOperator implements DeserializeOperator {
 
     @Override
-    public boolean accept(DataProxyMsgEncType type) {
-        return DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.equals(type);
+    public boolean accept(MessageWrapType type) {
+        return MessageWrapType.INLONG_MSG_V1.equals(type);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
index cf334ebcc8..361afe46d6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.message;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -35,8 +35,8 @@ import java.util.Map;
 public class RawMsgDeserializeOperator implements DeserializeOperator {
 
     @Override
-    public boolean accept(DataProxyMsgEncType type) {
-        return DataProxyMsgEncType.MSG_ENCODE_TYPE_RAW.equals(type);
+    public boolean accept(MessageWrapType type) {
+        return MessageWrapType.RAW.equals(type);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index 82b2d7a03b..deb6601b63 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.resource.queue.kafka;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
@@ -157,9 +157,9 @@ public class KafkaOperator {
                 }
 
                 int wrapTypeId = 
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-                        
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
+                        
Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
                 DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(
-                        DataProxyMsgEncType.valueOf(wrapTypeId));
+                        MessageWrapType.valueOf(wrapTypeId));
                 messageList.addAll(deserializeOperator.decodeMsg(streamInfo, 
record.value(), headers, index));
                 if (messageList.size() >= messageCount) {
                     break;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index f3f5f36615..77feca618c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.resource.queue.pulsar;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.conversion.ConversionHandle;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -430,9 +430,9 @@ public class PulsarOperator {
                     pulsarAdmin.topics().examineMessage(topicPartition, 
"latest", messagePosition);
             Map<String, String> headers = pulsarMessage.getProperties();
             int wrapTypeId = 
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-                    
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
+                    Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
             DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(
-                    DataProxyMsgEncType.valueOf(wrapTypeId));
+                    MessageWrapType.valueOf(wrapTypeId));
             briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, 
pulsarMessage.getData(),
                     headers, index));
         } catch (Exception e) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
index 06193a49bd..52ab267dc1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.resource.queue.tubemq;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.HttpUtils;
@@ -303,10 +303,10 @@ public class TubeMQOperator {
                 }
 
                 int wrapTypeId = 
Integer.parseInt(map.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-                        
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
+                        
Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
                 byte[] messageData = 
Base64.getDecoder().decode(tubeDataInfo.getData());
                 DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(
-                        DataProxyMsgEncType.valueOf(wrapTypeId));
+                        MessageWrapType.valueOf(wrapTypeId));
                 messageList.addAll(deserializeOperator.decodeMsg(streamInfo, 
messageData, map, index));
             }
 
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
index ace88b88e3..25ff99a872 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.sdk.sort.impl.decode;
 
-import org.apache.inlong.common.enums.DataProxyMsgEncType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.common.util.StringUtil;
 import org.apache.inlong.common.util.Utils;
@@ -74,12 +74,12 @@ public class MessageDeserializer implements Deserializer {
 
         // 1. version
         int version = 
Integer.parseInt(headers.getOrDefault(EventConstants.HEADER_KEY_VERSION,
-                
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
-        if (version == DataProxyMsgEncType.MSG_ENCODE_TYPE_RAW.getId()) {
+                Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
+        if (version == MessageWrapType.RAW.getId()) {
             return decode(context, inLongTopic, data, headers);
-        } else if (version == DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getId()) {
+        } else if (version == MessageWrapType.INLONG_MSG_V1.getId()) {
             return decodePB(context, inLongTopic, data, headers);
-        } else if (version == 
DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId()) {
+        } else if (version == MessageWrapType.INLONG_MSG_V0.getId()) {
             return decodeInlongMsg(context, inLongTopic, data, headers);
         } else {
             throw new IllegalArgumentException("Unknown version type:" + 
version);

Reply via email to