This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6260dd7def [INLONG-11926][SDK] If SortSDK fails to retrieve GroupId 
and StreamId from the InLongMsgV0 protocol, it defaults to obtaining them from 
the unified metadata (#11927)
6260dd7def is described below

commit 6260dd7defd2c6be9eedaaf360dd663b5e603e56
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jul 7 19:10:06 2025 +0800

    [INLONG-11926][SDK] If SortSDK fails to retrieve GroupId and StreamId from 
the InLongMsgV0 protocol, it defaults to obtaining them from the unified 
metadata (#11927)
---
 .../apache/inlong/sdk/sort/entity/InLongTopic.java | 51 ++++++++++++++++++++++
 .../sdk/sort/impl/decode/MessageDeserializer.java  |  6 ++-
 2 files changed, 55 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
index 3b636c912b..cffbd39a07 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
@@ -30,6 +30,9 @@ public class InLongTopic {
     private String startConsumeTime;
     private String stopConsumeTime;
     private Map<String, Object> properties;
+    private String groupId;
+    private String streamId;
+    private String dataFlowId;
 
     public void setStopConsumeTime(String stopConsumeTime) {
         this.stopConsumeTime = stopConsumeTime;
@@ -87,6 +90,54 @@ public class InLongTopic {
         this.properties = properties;
     }
 
+    /**
+     * get groupId
+     * @return the groupId
+     */
+    public String getGroupId() {
+        return groupId;
+    }
+
+    /**
+     * set groupId
+     * @param groupId the groupId to set
+     */
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    /**
+     * get streamId
+     * @return the streamId
+     */
+    public String getStreamId() {
+        return streamId;
+    }
+
+    /**
+     * set streamId
+     * @param streamId the streamId to set
+     */
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    /**
+     * get dataFlowId
+     * @return the dataFlowId
+     */
+    public String getDataFlowId() {
+        return dataFlowId;
+    }
+
+    /**
+     * set dataFlowId
+     * @param dataFlowId the dataFlowId to set
+     */
+    public void setDataFlowId(String dataFlowId) {
+        this.dataFlowId = dataFlowId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
index 25ff99a872..042f76a6cc 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
@@ -173,11 +173,13 @@ public class MessageDeserializer implements Deserializer {
             Map<String, String> attributes = StringUtil.splitKv(attr, 
INLONGMSG_ATTR_ENTRY_DELIMITER,
                     INLONGMSG_ATTR_KV_DELIMITER, null, null);
 
-            String groupId = 
Optional.ofNullable(attributes.get(INLONGMSG_ATTR_GROUP_ID))
+            String groupId = 
Optional.ofNullable(attributes.getOrDefault(INLONGMSG_ATTR_GROUP_ID,
+                    inLongTopic.getGroupId()))
                     .orElseThrow(() -> new 
IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING,
                             INLONGMSG_ATTR_GROUP_ID)));
 
-            String streamId = 
Optional.ofNullable(attributes.get(INLONGMSG_ATTR_STREAM_ID))
+            String streamId = 
Optional.ofNullable(attributes.getOrDefault(INLONGMSG_ATTR_STREAM_ID,
+                    inLongTopic.getStreamId()))
                     .orElseThrow(() -> new 
IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING,
                             INLONGMSG_ATTR_STREAM_ID)));
 

Reply via email to