This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 585b9fb033 [INLONG-10141][Manager] Data preview supports returning
header and specific field information for inlong msg v1 (#10142)
585b9fb033 is described below
commit 585b9fb033a485c3d17c7ee190b5b0df413f70d4
Author: fuweng11 <[email protected]>
AuthorDate: Tue May 7 21:35:43 2024 +0800
[INLONG-10141][Manager] Data preview supports returning header and specific
field information for inlong msg v1 (#10142)
---
.../service/message/PbMsgDeserializeOperator.java | 41 +++++++++++++++++-----
1 file changed, 32 insertions(+), 9 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
index a5aec49cd5..79760cf737 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
@@ -17,13 +17,18 @@
package org.apache.inlong.manager.service.message;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.AttributeConstants;
import
org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
import
org.apache.inlong.common.pojo.sort.dataflow.deserialization.InlongMsgPbDeserialiationConfig;
import org.apache.inlong.common.util.Utils;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.datatype.DataTypeOperator;
+import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
@@ -31,6 +36,7 @@ import
org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
import org.apache.inlong.sort.configuration.Constants;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.Charset;
@@ -43,6 +49,9 @@ import java.util.Map;
@Service
public class PbMsgDeserializeOperator implements DeserializeOperator {
+ @Autowired
+ public DataTypeOperatorFactory dataTypeOperatorFactory;
+
@Override
public boolean accept(MessageWrapType type) {
return MessageWrapType.INLONG_MSG_V1.equals(type);
@@ -81,15 +90,29 @@ public class PbMsgDeserializeOperator implements
DeserializeOperator {
headers.put(mapFieldEntry.getKey(), mapFieldEntry.getValue());
}
- BriefMQMessage message = BriefMQMessage.builder()
- .id(index)
- .inlongGroupId(headers.get(AttributeConstants.GROUP_ID))
- .inlongStreamId(headers.get(AttributeConstants.STREAM_ID))
- .dt(messageObj.getMsgTime())
- .clientIp(headers.get(CLIENT_IP))
- .body(new String(messageObj.getBody().toByteArray(),
Charset.forName(streamInfo.getDataEncoding())))
- .build();
- messageList.add(message);
+ try {
+ String body = new String(messageObj.getBody().toByteArray(),
+ Charset.forName(streamInfo.getDataEncoding()));
+ DataTypeOperator dataTypeOperator =
+
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
+ List<FieldInfo> streamFieldList =
dataTypeOperator.parseFields(body, streamInfo);
+ BriefMQMessage message = BriefMQMessage.builder()
+ .id(index)
+
.inlongGroupId(headers.get(AttributeConstants.GROUP_ID))
+
.inlongStreamId(headers.get(AttributeConstants.STREAM_ID))
+ .dt(messageObj.getMsgTime())
+ .clientIp(headers.get(CLIENT_IP))
+ .body(body)
+ .headers(headers)
+ .fieldList(streamFieldList)
+ .build();
+ messageList.add(message);
+ } catch (Exception e) {
+ String errMsg = String.format("decode msg failed for
groupId=%s, streamId=%s",
+ streamInfo.getInlongGroupId(),
streamInfo.getInlongStreamId());
+ log.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
}
return messageList;