This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6260dd7def [INLONG-11926][SDK] If SortSDK fails to retrieve GroupId
and StreamId from the InLongMsgV0 protocol, it defaults to obtaining them from
the unified metadata (#11927)
6260dd7def is described below
commit 6260dd7defd2c6be9eedaaf360dd663b5e603e56
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jul 7 19:10:06 2025 +0800
[INLONG-11926][SDK] If SortSDK fails to retrieve GroupId and StreamId from
the InLongMsgV0 protocol, it defaults to obtaining them from the unified
metadata (#11927)
---
.../apache/inlong/sdk/sort/entity/InLongTopic.java | 51 ++++++++++++++++++++++
.../sdk/sort/impl/decode/MessageDeserializer.java | 6 ++-
2 files changed, 55 insertions(+), 2 deletions(-)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
index 3b636c912b..cffbd39a07 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
@@ -30,6 +30,9 @@ public class InLongTopic {
private String startConsumeTime;
private String stopConsumeTime;
private Map<String, Object> properties;
+ private String groupId;
+ private String streamId;
+ private String dataFlowId;
public void setStopConsumeTime(String stopConsumeTime) {
this.stopConsumeTime = stopConsumeTime;
@@ -87,6 +90,54 @@ public class InLongTopic {
this.properties = properties;
}
+ /**
+ * get groupId
+ * @return the groupId
+ */
+ public String getGroupId() {
+ return groupId;
+ }
+
+ /**
+ * set groupId
+ * @param groupId the groupId to set
+ */
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ /**
+ * get streamId
+ * @return the streamId
+ */
+ public String getStreamId() {
+ return streamId;
+ }
+
+ /**
+ * set streamId
+ * @param streamId the streamId to set
+ */
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ }
+
+ /**
+ * get dataFlowId
+ * @return the dataFlowId
+ */
+ public String getDataFlowId() {
+ return dataFlowId;
+ }
+
+ /**
+ * set dataFlowId
+ * @param dataFlowId the dataFlowId to set
+ */
+ public void setDataFlowId(String dataFlowId) {
+ this.dataFlowId = dataFlowId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
index 25ff99a872..042f76a6cc 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
@@ -173,11 +173,13 @@ public class MessageDeserializer implements Deserializer {
Map<String, String> attributes = StringUtil.splitKv(attr,
INLONGMSG_ATTR_ENTRY_DELIMITER,
INLONGMSG_ATTR_KV_DELIMITER, null, null);
- String groupId =
Optional.ofNullable(attributes.get(INLONGMSG_ATTR_GROUP_ID))
+ String groupId =
Optional.ofNullable(attributes.getOrDefault(INLONGMSG_ATTR_GROUP_ID,
+ inLongTopic.getGroupId()))
.orElseThrow(() -> new
IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING,
INLONGMSG_ATTR_GROUP_ID)));
- String streamId =
Optional.ofNullable(attributes.get(INLONGMSG_ATTR_STREAM_ID))
+ String streamId =
Optional.ofNullable(attributes.getOrDefault(INLONGMSG_ATTR_STREAM_ID,
+ inLongTopic.getStreamId()))
.orElseThrow(() -> new
IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING,
INLONGMSG_ATTR_STREAM_ID)));