This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 585b9fb033 [INLONG-10141][Manager] Data preview supports returning 
header and specific field information for inlong msg v1 (#10142)
585b9fb033 is described below

commit 585b9fb033a485c3d17c7ee190b5b0df413f70d4
Author: fuweng11 <[email protected]>
AuthorDate: Tue May 7 21:35:43 2024 +0800

    [INLONG-10141][Manager] Data preview supports returning header and specific 
field information for inlong msg v1 (#10142)
---
 .../service/message/PbMsgDeserializeOperator.java  | 41 +++++++++++++++++-----
 1 file changed, 32 insertions(+), 9 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
index a5aec49cd5..79760cf737 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
@@ -17,13 +17,18 @@
 
 package org.apache.inlong.manager.service.message;
 
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import 
org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
 import 
org.apache.inlong.common.pojo.sort.dataflow.deserialization.InlongMsgPbDeserialiationConfig;
 import org.apache.inlong.common.util.Utils;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.datatype.DataTypeOperator;
+import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
@@ -31,6 +36,7 @@ import 
org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
 import org.apache.inlong.sort.configuration.Constants;
 
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.nio.charset.Charset;
@@ -43,6 +49,9 @@ import java.util.Map;
 @Service
 public class PbMsgDeserializeOperator implements DeserializeOperator {
 
+    @Autowired
+    public DataTypeOperatorFactory dataTypeOperatorFactory;
+
     @Override
     public boolean accept(MessageWrapType type) {
         return MessageWrapType.INLONG_MSG_V1.equals(type);
@@ -81,15 +90,29 @@ public class PbMsgDeserializeOperator implements 
DeserializeOperator {
                 headers.put(mapFieldEntry.getKey(), mapFieldEntry.getValue());
             }
 
-            BriefMQMessage message = BriefMQMessage.builder()
-                    .id(index)
-                    .inlongGroupId(headers.get(AttributeConstants.GROUP_ID))
-                    .inlongStreamId(headers.get(AttributeConstants.STREAM_ID))
-                    .dt(messageObj.getMsgTime())
-                    .clientIp(headers.get(CLIENT_IP))
-                    .body(new String(messageObj.getBody().toByteArray(), 
Charset.forName(streamInfo.getDataEncoding())))
-                    .build();
-            messageList.add(message);
+            try {
+                String body = new String(messageObj.getBody().toByteArray(),
+                        Charset.forName(streamInfo.getDataEncoding()));
+                DataTypeOperator dataTypeOperator =
+                        
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
+                List<FieldInfo> streamFieldList = 
dataTypeOperator.parseFields(body, streamInfo);
+                BriefMQMessage message = BriefMQMessage.builder()
+                        .id(index)
+                        
.inlongGroupId(headers.get(AttributeConstants.GROUP_ID))
+                        
.inlongStreamId(headers.get(AttributeConstants.STREAM_ID))
+                        .dt(messageObj.getMsgTime())
+                        .clientIp(headers.get(CLIENT_IP))
+                        .body(body)
+                        .headers(headers)
+                        .fieldList(streamFieldList)
+                        .build();
+                messageList.add(message);
+            } catch (Exception e) {
+                String errMsg = String.format("decode msg failed for 
groupId=%s, streamId=%s",
+                        streamInfo.getInlongGroupId(), 
streamInfo.getInlongStreamId());
+                log.error(errMsg, e);
+                throw new BusinessException(errMsg);
+            }
         }
 
         return messageList;

Reply via email to