This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 221a1ddc5f [INLONG-8284][DataProxy] Unify the message encoding 
definition of DataProxy (#8285)
221a1ddc5f is described below

commit 221a1ddc5fabeada93cb5b1339b8d4a6f559921e
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Jun 19 22:14:25 2023 +0800

    [INLONG-8284][DataProxy] Unify the message encoding definition of DataProxy 
(#8285)
---
 .../inlong/common/enums/DataProxyMsgEncType.java   | 29 ++++++++++++++--------
 .../dataproxy/http/SimpleMessageHandler.java       |  8 ++++--
 .../inlong/dataproxy/metrics/audit/AuditUtils.java |  4 +--
 .../dataproxy/sink/common/DefaultEventHandler.java |  8 +++---
 .../inlong/dataproxy/sink/common/TubeUtils.java    |  4 +--
 .../inlong/dataproxy/sink/mq/BatchPackManager.java | 24 ++++++++++++------
 .../dataproxy/sink/mq/MessageQueueZoneSink.java    |  8 +++---
 .../sink/mq/MessageQueueZoneSinkContext.java       | 12 ++++-----
 .../dataproxy/sink/mq/SimplePackProfile.java       | 19 ++++++++++++++
 .../dataproxy/sink/mq/kafka/KafkaHandler.java      |  5 ++--
 .../dataproxy/sink/mq/pulsar/PulsarHandler.java    |  5 ++--
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 26 +++++++++++--------
 .../dataproxy/source/ServerMessageHandler.java     |  8 ++++--
 .../dataproxy/source/SimpleMessageHandler.java     |  9 ++++---
 .../source2/httpMsg/InLongHttpMsgHandler.java      |  8 ++++--
 .../dataproxy/source2/v0msg/AbsV0MsgCodec.java     | 14 +++++------
 .../inlong/dataproxy/utils/MessageUtils.java       |  3 ++-
 .../sdk/commons/protocol/EventConstants.java       |  2 ++
 .../sdk/sort/impl/decode/MessageDeserializer.java  | 29 +++++++++-------------
 19 files changed, 137 insertions(+), 88 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
similarity index 58%
rename from 
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java
rename to 
inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
index 9dcac24f21..1335cfcf6a 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
@@ -15,14 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.dataproxy.utils;
+package org.apache.inlong.common.enums;
 
-public enum InLongMsgVer {
+/**
+ * Enumeration class of encoding format of data output from DataProxy to MQ
+ */
+public enum DataProxyMsgEncType {
 
-    INLONG_V0(0, "V0", "The inlong-msg V0 format"),
-    INLONG_V1(1, "V1", "The inlong-msg V1 format");
+    MSG_ENCODE_TYPE_RAW(0, "Raw", "Raw message without any InLong format"),
+    MSG_ENCODE_TYPE_PB(1, "PB", "The PB MessagePack encode format"),
+    MSG_ENCODE_TYPE_INLONGMSG(2, "InLongMsg", "The InLongMsg encode format"),
+    MSG_ENCODE_TYPE_UNKNOWN(99, "Unknown", "Unknown encode format");
 
-    InLongMsgVer(int id, String name, String desc) {
+    DataProxyMsgEncType(int id, String name, String desc) {
         this.id = id;
         this.name = name;
         this.desc = desc;
@@ -32,6 +37,10 @@ public enum InLongMsgVer {
         return id;
     }
 
+    public String getStrId() {
+        return String.valueOf(id);
+    }
+
     public String getName() {
         return name;
     }
@@ -40,13 +49,13 @@ public enum InLongMsgVer {
         return desc;
     }
 
-    public static InLongMsgVer valueOf(int value) {
-        for (InLongMsgVer inLongMsgVer : InLongMsgVer.values()) {
-            if (inLongMsgVer.getId() == value) {
-                return inLongMsgVer;
+    public static DataProxyMsgEncType valueOf(int value) {
+        for (DataProxyMsgEncType msgEncType : DataProxyMsgEncType.values()) {
+            if (msgEncType.getId() == value) {
+                return msgEncType;
             }
         }
-        return INLONG_V0;
+        return MSG_ENCODE_TYPE_UNKNOWN;
     }
 
     private final int id;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index e2eba723a3..2a37012d0d 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.http;
 
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.common.msg.AttributeConstants;
@@ -30,8 +31,8 @@ import 
org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source.ServiceDecoder;
 import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
@@ -156,7 +157,10 @@ public class SimpleMessageHandler implements 
MessageHandler {
         headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
         headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
         headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
-        headers.put(ConfigConstants.MSG_ENCODE_VER, 
InLongMsgVer.INLONG_V0.getName());
+        headers.put(ConfigConstants.MSG_ENCODE_VER,
+                DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+        headers.put(EventConstants.HEADER_KEY_VERSION,
+                DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
         byte[] data = inLongMsg.buildArray();
         headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
         Event event = EventBuilder.withBody(data, headers);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 5fb13fb7a3..258bc24255 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -19,12 +19,12 @@ package org.apache.inlong.dataproxy.metrics.audit;
 
 import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.audit.util.AuditConfig;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Event;
@@ -64,7 +64,7 @@ public class AuditUtils {
         }
         Map<String, String> headers = event.getHeaders();
         String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
-        if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
+        if 
(DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId().equalsIgnoreCase(pkgVersion))
 {
             String inlongGroupId = 
DataProxyMetricItem.getInlongGroupId(headers);
             String inlongStreamId = 
DataProxyMetricItem.getInlongStreamId(headers);
             long logTime = getLogTime(headers);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
index c077ebd9a8..88430d2eba 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.dataproxy.sink.common;
 
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
 import org.apache.inlong.sdk.commons.protocol.EventConstants;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
@@ -35,9 +37,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
-import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
-
 /**
  * DefaultEventHandler
  * 
@@ -52,7 +51,8 @@ public class DefaultEventHandler implements EventHandler {
             INLONG_COMPRESSED_TYPE compressType) {
         Map<String, String> headers = new HashMap<>();
         // version int32 protocol version, the value is 1
-        headers.put(HEADER_KEY_VERSION, HEADER_CACHE_VERSION_1);
+        headers.put(ConfigConstants.MSG_ENCODE_VER, 
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
+        headers.put(EventConstants.HEADER_KEY_VERSION, 
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
         // inlongGroupId string inlongGroupId
         headers.put(EventConstants.INLONG_GROUP_ID, 
profile.getInlongGroupId());
         // inlongStreamId string inlongStreamId
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
index ca4bcceed4..9bd682429b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
@@ -17,12 +17,12 @@
 
 package org.apache.inlong.dataproxy.sink.common;
 
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.utils.Constants;
 import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.corebase.Message;
@@ -62,7 +62,7 @@ public class TubeUtils {
         Map<String, String> headers = event.getHeaders();
         Message message = new Message(topicName, event.getBody());
         String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
-        if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
+        if 
(DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId().equalsIgnoreCase(pkgVersion))
 {
             long dataTimeL = 
Long.parseLong(headers.get(ConfigConstants.PKG_TIME_KEY));
             message.putSystemHeader(headers.get(Constants.INLONG_STREAM_ID),
                     DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index 9daebf5c90..f1b4f54473 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -54,6 +54,7 @@ public class BatchPackManager {
     private final long dispatchTimeout;
     private final long maxPackCount;
     private final long maxPackSize;
+    private final String sinkName;
     private final BufferQueue<PackProfile> dispatchQueue;
     private final ConcurrentHashMap<String, PackProfile> profileCache = new 
ConcurrentHashMap<>();
     // flag that manager need to output overtime data.
@@ -63,11 +64,13 @@ public class BatchPackManager {
 
     /**
      * Constructor
-     * 
+     *
+     * @param sinkName the sink name
      * @param context the process context
      * @param dispatchQueue  the batch queue
      */
-    public BatchPackManager(Context context, BufferQueue<PackProfile> 
dispatchQueue) {
+    public BatchPackManager(String sinkName, Context context, 
BufferQueue<PackProfile> dispatchQueue) {
+        this.sinkName = sinkName;
         this.dispatchQueue = dispatchQueue;
         this.dispatchTimeout = context.getLong(KEY_DISPATCH_TIMEOUT, 
DEFAULT_DISPATCH_TIMEOUT);
         this.maxPackCount = context.getLong(KEY_DISPATCH_MAX_PACKCOUNT, 
DEFAULT_DISPATCH_MAX_PACKCOUNT);
@@ -167,8 +170,8 @@ public class BatchPackManager {
         if (!needOutputOvertimeData.getAndSet(false)) {
             return;
         }
-        LOG.debug("start to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{}",
-                profileCache.size(), dispatchQueue.size());
+        int profileSize = profileCache.size();
+        int dispatchSize = dispatchQueue.size();
         long currentTime = System.currentTimeMillis();
         long createThreshold = currentTime - dispatchTimeout;
         List<String> removeKeys = new ArrayList<>();
@@ -190,10 +193,15 @@ public class BatchPackManager {
                 outCounter.addAndGet(dispatchProfile.getCount());
             }
         });
-        LOG.debug("end to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
-                + "inCounter:{},outCounter:{}",
-                profileCache.size(), dispatchQueue.size(), eventCount,
-                inCounter.getAndSet(0), outCounter.getAndSet(0));
+        long hisInCnt = inCounter.getAndSet(0);
+        long hisOutCnt = outCounter.getAndSet(0);
+        if (!removeKeys.isEmpty()) {
+            LOG.info("{} output overtime data, profileCacheSize: before={}, 
after={},"
+                    + " dispatchQueueSize: before={}, after={}, eventCount: 
{},"
+                    + " inCounter: {}, outCounter: {}",
+                    sinkName, profileSize, profileCache.size(), dispatchSize, 
dispatchQueue.size(),
+                    eventCount, hisInCnt, hisOutCnt);
+        }
     }
 
     /**
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index 543a454ba6..0a91655ce4 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -53,7 +53,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
 
     private Context parentContext;
     private MessageQueueZoneSinkContext context;
-    private List<MessageQueueZoneWorker> workers = new ArrayList<>();
+    private final List<MessageQueueZoneWorker> workers = new ArrayList<>();
     // message group
     private BatchPackManager dispatchManager;
     private BufferQueue<PackProfile> dispatchQueue;
@@ -74,7 +74,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
     /**
      * configure
      * 
-     * @param context
+     * @param context the sink context
      */
     @Override
     public void configure(Context context) {
@@ -96,7 +96,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
                 LOG.error(getName() + "'s channel is null");
             }
             this.context.start();
-            this.dispatchManager = new BatchPackManager(parentContext, 
dispatchQueue);
+            this.dispatchManager = new BatchPackManager(getName(), 
parentContext, dispatchQueue);
             this.scheduledPool = Executors.newScheduledThreadPool(2);
             // dispatch
             this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
@@ -157,7 +157,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
     /**
      * process
      * 
-     * @return                        Status
+     * @return  Status
      * @throws EventDeliveryException
      */
     @Override
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 15f0879ad0..9b60cffa3d 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -284,12 +284,12 @@ public class MessageQueueZoneSinkContext extends 
SinkContext {
         if (packProfile instanceof SimplePackProfile) {
             SimplePackProfile simpleProfile = (SimplePackProfile) packProfile;
             StringBuilder statsKey = new StringBuilder(512)
-                    .append(sinkName).append(AttrConstants.SEPARATOR)
-                    
.append(simpleProfile.getInlongGroupId()).append(AttrConstants.SEPARATOR)
-                    
.append(simpleProfile.getInlongStreamId()).append(AttrConstants.SEPARATOR)
-                    .append(topic).append(AttrConstants.SEPARATOR)
-                    
.append(NetworkUtils.getLocalIp()).append(AttrConstants.SEPARATOR)
-                    .append(remoteId).append(AttrConstants.SEPARATOR)
+                    .append(sinkName).append(AttrConstants.SEP_HASHTAG)
+                    
.append(simpleProfile.getInlongGroupId()).append(AttrConstants.SEP_HASHTAG)
+                    
.append(simpleProfile.getInlongStreamId()).append(AttrConstants.SEP_HASHTAG)
+                    .append(topic).append(AttrConstants.SEP_HASHTAG)
+                    
.append(NetworkUtils.getLocalIp()).append(AttrConstants.SEP_HASHTAG)
+                    .append(remoteId).append(AttrConstants.SEP_HASHTAG)
                     
.append(simpleProfile.getProperties().get(ConfigConstants.PKG_TIME_KEY));
             monitorIndex.addSuccStats(statsKey.toString(), NumberUtils.toInt(
                     
simpleProfile.getProperties().get(ConfigConstants.MSG_COUNTER_KEY), 1),
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
index 99d6ac63c9..c816feb14e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
@@ -21,8 +21,12 @@ import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.common.util.NetworkUtils;
 import org.apache.inlong.dataproxy.base.SinkRspEvent;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.source2.InLongMessageHandler;
+import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
 import org.apache.inlong.sdk.commons.protocol.InlongId;
 
 import io.netty.buffer.ByteBuf;
@@ -33,6 +37,7 @@ import org.apache.flume.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -142,6 +147,20 @@ public class SimplePackProfile extends PackProfile {
         return event.getHeaders();
     }
 
+    /**
+     * get required properties sent to MQ
+     *
+     * @return the properties
+     */
+    public Map<String, String> getPropsToMQ() {
+        Map<String, String> result = new HashMap<>();
+        result.put(Constants.HEADER_KEY_SOURCE_TIME, 
event.getHeaders().get(AttributeConstants.RCV_TIME));
+        result.put(ConfigConstants.MSG_ENCODE_VER, 
event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER));
+        result.put(EventConstants.HEADER_KEY_VERSION, 
event.getHeaders().get(EventConstants.HEADER_KEY_VERSION));
+        result.put(ConfigConstants.DATAPROXY_IP_KEY, 
NetworkUtils.getLocalIp());
+        return result;
+    }
+
     /**
      *  Return response to client in source
      */
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index 40a2b4aa7a..e91f41f4b4 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -158,8 +158,7 @@ public class KafkaHandler implements MessageQueueHandler {
             }
             return true;
         } catch (Exception ex) {
-            
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
-                    profile.getUid() + "." + topic);
+            
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
 topic);
             sinkContext.processSendFail(profile, clusterName, 
profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, 
ex.getMessage());
             if (logCounter.shouldPrint()) {
@@ -225,7 +224,7 @@ public class KafkaHandler implements MessageQueueHandler {
     private void sendSimplePackProfile(SimplePackProfile simpleProfile, 
IdTopicConfig idConfig,
             String topic) throws Exception {
         // headers
-        Map<String, String> headers = simpleProfile.getProperties();
+        Map<String, String> headers = simpleProfile.getPropsToMQ();
         // body
         byte[] bodyBytes = simpleProfile.getEvent().getBody();
         // metric
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index e0a57e5e94..3933045d74 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -266,8 +266,7 @@ public class PulsarHandler implements MessageQueueHandler {
             }
             return true;
         } catch (Exception ex) {
-            
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
-                    profile.getUid() + "." + producerTopic);
+            
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
 producerTopic);
             sinkContext.processSendFail(profile, clusterName, 
profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, 
ex.getMessage());
             if (logCounter.shouldPrint()) {
@@ -346,7 +345,7 @@ public class PulsarHandler implements MessageQueueHandler {
             Producer<byte[]> producer,
             String producerTopic) throws Exception {
         // headers
-        Map<String, String> headers = simpleProfile.getProperties();
+        Map<String, String> headers = simpleProfile.getPropsToMQ();
         // body
         byte[] bodyBytes = simpleProfile.getEvent().getBody();
         // metric
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index d78e282d78..e8054f8863 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -26,12 +26,13 @@ import 
org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.sink.common.EventHandler;
-import org.apache.inlong.dataproxy.sink.common.TubeUtils;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
 import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
 import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
 import org.apache.inlong.dataproxy.sink.mq.PackProfile;
 import org.apache.inlong.dataproxy.sink.mq.SimplePackProfile;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
@@ -228,8 +229,7 @@ public class TubeHandler implements MessageQueueHandler {
             }
             return true;
         } catch (Throwable ex) {
-            sinkContext.fileMetricIncWithDetailStats(
-                    StatConstants.EVENT_SINK_SEND_EXCEPTION, profile.getUid() 
+ "." + topic);
+            
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
 topic);
             sinkContext.processSendFail(profile, clusterName, 
profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, 
ex.getMessage());
             if (logCounter.shouldPrint()) {
@@ -249,19 +249,18 @@ public class TubeHandler implements MessageQueueHandler {
             handler = this.sinkContext.createEventHandler();
             handlerLocal.set(handler);
         }
-        // headers
+        // get headers to mq
         Map<String, String> headers = handler.parseHeader(idConfig, 
batchProfile, sinkContext.getNodeId(),
                 sinkContext.getCompressType());
         // compress
         byte[] bodyBytes = handler.parseBody(idConfig, batchProfile, 
sinkContext.getCompressType());
-        // metric
-        sinkContext.addSendMetric(batchProfile, clusterName, topic, 
bodyBytes.length);
-        // sendAsync
         Message message = new Message(topic, bodyBytes);
         // add headers
-        headers.forEach((key, value) -> {
-            message.setAttrKeyVal(key, value);
-        });
+        long dataTimeL = 
Long.parseLong(headers.get(EventConstants.HEADER_KEY_PACK_TIME));
+        message.putSystemHeader(batchProfile.getInlongStreamId(), 
DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+        headers.forEach(message::setAttrKeyVal);
+        // metric
+        sinkContext.addSendMetric(batchProfile, clusterName, topic, 
bodyBytes.length);
         // callback
         long sendTime = System.currentTimeMillis();
         MessageSentCallback callback = new MessageSentCallback() {
@@ -302,7 +301,12 @@ public class TubeHandler implements MessageQueueHandler {
     private void sendSimplePackProfile(SimplePackProfile simpleProfile, 
IdTopicConfig idConfig,
             String topic) throws Exception {
         // build message
-        Message message = TubeUtils.buildMessage(topic, 
simpleProfile.getEvent());
+        Message message = new Message(topic, 
simpleProfile.getEvent().getBody());
+        message.putSystemHeader(simpleProfile.getInlongStreamId(),
+                
simpleProfile.getProperties().get(ConfigConstants.PKG_TIME_KEY));
+        // add headers
+        Map<String, String> headers = simpleProfile.getPropsToMQ();
+        headers.forEach(message::setAttrKeyVal);
         // metric
         sinkContext.addSendMetric(simpleProfile, clusterName, topic, 
simpleProfile.getEvent().getBody().length);
         // callback
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 5669e8de08..bfb5a3fa55 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.dataproxy.source;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.common.msg.AttributeConstants;
@@ -34,8 +35,8 @@ import 
org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.utils.AddressUtils;
 import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
@@ -473,7 +474,10 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                 headers.put(ConfigConstants.REMOTE_IDC_KEY, 
DEFAULT_REMOTE_IDC_VALUE);
                 headers.put(ConfigConstants.MSG_COUNTER_KEY,
                         commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
-                headers.put(ConfigConstants.MSG_ENCODE_VER, 
InLongMsgVer.INLONG_V0.getName());
+                headers.put(ConfigConstants.MSG_ENCODE_VER,
+                        
DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+                headers.put(EventConstants.HEADER_KEY_VERSION,
+                        
DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
                 headers.put(AttributeConstants.RCV_TIME,
                         commonAttrMap.get(AttributeConstants.RCV_TIME));
                 headers.put(ConfigConstants.DECODER_ATTRS,
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 5db66d1d5a..2d0c8ebbd4 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.source;
 
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.common.msg.MsgType;
@@ -30,7 +31,7 @@ import 
org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.utils.AddressUtils;
 import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
@@ -426,9 +427,9 @@ public class SimpleMessageHandler extends 
ChannelInboundHandlerAdapter {
                 commonHeaders.get(AttributeConstants.DATA_TIME));
         headers.put(Constants.HEADER_KEY_SOURCE_IP,
                 commonHeaders.get(AttributeConstants.NODE_IP));
-        headers.put(ConfigConstants.MSG_ENCODE_VER, 
InLongMsgVer.INLONG_V1.getName());
-        Event event = EventBuilder.withBody(proxyMessage.getData(), headers);
-        return event;
+        headers.put(ConfigConstants.MSG_ENCODE_VER, 
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
+        headers.put(EventConstants.HEADER_KEY_VERSION, 
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
+        return EventBuilder.withBody(proxyMessage.getData(), headers);
     }
 
     private void responsePackage(Map<String, String> commonAttrMap,
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
index 2244207f06..e17b35228b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.dataproxy.source2.httpMsg;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.InLongMsg;
@@ -29,7 +30,7 @@ import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.source2.BaseSource;
 import org.apache.inlong.dataproxy.utils.AddressUtils;
 import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -343,7 +344,10 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         eventHeaders.put(AttributeConstants.DATA_TIME, 
String.valueOf(dataTime));
         eventHeaders.put(ConfigConstants.REMOTE_IP_KEY, clientIp);
         eventHeaders.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
-        eventHeaders.put(ConfigConstants.MSG_ENCODE_VER, 
InLongMsgVer.INLONG_V0.getName());
+        eventHeaders.put(ConfigConstants.MSG_ENCODE_VER,
+                DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+        eventHeaders.put(EventConstants.HEADER_KEY_VERSION,
+                DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
         eventHeaders.put(AttributeConstants.RCV_TIME, 
String.valueOf(msgRcvTime));
         eventHeaders.put(ConfigConstants.PKG_TIME_KEY, 
DateTimeUtils.ms2yyyyMMddHHmm(pkgTime));
         Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
index 69fee83565..83d82da94e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
@@ -18,12 +18,13 @@
 package org.apache.inlong.dataproxy.source2.v0msg;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.source2.BaseSource;
 import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
@@ -222,15 +223,14 @@ public abstract class AbsV0MsgCodec {
         headers.put(AttributeConstants.DATA_TIME, String.valueOf(dataTimeMs));
         headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
         headers.put(ConfigConstants.MSG_COUNTER_KEY, String.valueOf(msgCount));
-        headers.put(ConfigConstants.MSG_ENCODE_VER, 
InLongMsgVer.INLONG_V0.getName());
+        headers.put(ConfigConstants.MSG_ENCODE_VER,
+                DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+        headers.put(EventConstants.HEADER_KEY_VERSION,
+                DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
         headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
         headers.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
+        headers.put(ConfigConstants.PKG_TIME_KEY, 
DateTimeUtils.ms2yyyyMMddHHmm(pkgTime));
         // add extra key-value information
-        String pkgTimeStr = attrMap.get(ConfigConstants.PKG_TIME_KEY);
-        if (StringUtils.isBlank(pkgTimeStr)) {
-            pkgTimeStr = DateTimeUtils.ms2yyyyMMddHHmm(pkgTime);
-        }
-        headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
         if (!needResp) {
             headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
         }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
index d72a8fe542..7b8c3f305a 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.dataproxy.utils;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.MsgType;
@@ -387,7 +388,7 @@ public class MessageUtils {
         // common attributes
         Map<String, String> attrs = new HashMap<>();
         attrs.put(ConfigConstants.MSG_ENCODE_VER, pkgVersion);
-        if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
+        if 
(DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId().equalsIgnoreCase(pkgVersion))
 {
             attrs.put("dataproxyip", NetworkUtils.getLocalIp());
             attrs.put(Constants.INLONG_GROUP_ID, 
headers.get(Constants.INLONG_GROUP_ID));
             attrs.put(Constants.INLONG_STREAM_ID, 
headers.get(Constants.INLONG_STREAM_ID));
diff --git 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
index ba3221b874..cad0a4895d 100644
--- 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
+++ 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
@@ -23,8 +23,10 @@ package org.apache.inlong.sdk.commons.protocol;
  */
 public interface EventConstants {
 
+    @Deprecated // replace by ConfigConstants.MSG_ENCODE_VER
     String HEADER_KEY_VERSION = "version";
     String HEADER_SDK_VERSION_1 = "1";
+    @Deprecated // replace by DataProxyMsgEncType.MSG_ENCODE_VER_PB
     String HEADER_CACHE_VERSION_1 = "1";
     // sdk
     String INLONG_GROUP_ID = "inlongGroupId";
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
index e2f07ea374..99894456ca 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.sdk.sort.impl.decode;
 
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
 import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
@@ -40,13 +42,9 @@ import java.util.Optional;
 
 public class MessageDeserializer implements Deserializer {
 
-    private static final int MESSAGE_VERSION_NONE = 0;
-    private static final int MESSAGE_VERSION_PB = 1;
-    private static final int MESSAGE_VERSION_INLONG_MSG = 2;
     private static final int COMPRESS_TYPE_NONE = 0;
     private static final int COMPRESS_TYPE_GZIP = 1;
     private static final int COMPRESS_TYPE_SNAPPY = 2;
-    private static final String VERSION_KEY = "version";
     private static final String COMPRESS_TYPE_KEY = "compressType";
     private static final String MSG_TIME_KEY = "msgTime";
     private static final String SOURCE_IP_KEY = "sourceIp";
@@ -75,19 +73,16 @@ public class MessageDeserializer implements Deserializer {
             byte[] data) throws Exception {
 
         // 1. version
-        int version = Integer.parseInt(headers.getOrDefault(VERSION_KEY, 
Integer.toString(MESSAGE_VERSION_INLONG_MSG)));
-        switch (version) {
-            case MESSAGE_VERSION_NONE: {
-                return decode(context, inLongTopic, data, headers);
-            }
-            case MESSAGE_VERSION_PB: {
-                return decodePB(context, inLongTopic, data, headers);
-            }
-            case MESSAGE_VERSION_INLONG_MSG: {
-                return decodeInlongMsg(context, inLongTopic, data, headers);
-            }
-            default:
-                throw new IllegalArgumentException("Unknown version type:" + 
version);
+        int version = 
Integer.parseInt(headers.getOrDefault(EventConstants.HEADER_KEY_VERSION,
+                
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
+        if (version == DataProxyMsgEncType.MSG_ENCODE_TYPE_RAW.getId()) {
+            return decode(context, inLongTopic, data, headers);
+        } else if (version == DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getId()) {
+            return decodePB(context, inLongTopic, data, headers);
+        } else if (version == 
DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId()) {
+            return decodeInlongMsg(context, inLongTopic, data, headers);
+        } else {
+            throw new IllegalArgumentException("Unknown version type:" + 
version);
         }
     }
 


Reply via email to