This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 58b2fc963c [INLONG-9818][Manager] Decode Msg based on the manager's
configuration (#9819)
58b2fc963c is described below
commit 58b2fc963cad0715ab0d4a82de50a9c95fd37159
Author: fuweng11 <[email protected]>
AuthorDate: Thu Mar 14 16:45:53 2024 +0800
[INLONG-9818][Manager] Decode Msg based on the manager's configuration
(#9819)
---
.../manager/service/resource/queue/pulsar/PulsarOperator.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 99653c05c1..ccabb716f1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -453,10 +453,12 @@ public class PulsarOperator {
messagePosition);
PulsarMessageInfo messageInfo =
PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition);
Map<String, String> headers = messageInfo.getProperties();
- int wrapTypeId =
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
- Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
- DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(
- MessageWrapType.valueOf(wrapTypeId));
+ MessageWrapType messageWrapType =
MessageWrapType.forType(streamInfo.getWrapType());
+ if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) {
+ messageWrapType =
+
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
+ }
+ DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(messageWrapType);
briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo,
messageInfo.getBody(),
headers, index));
} catch (Exception e) {