This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5fd224816e [INLONG-11674][Sort] Pulsar Source supports InlongMsg
metadata (#11691)
5fd224816e is described below
commit 5fd224816e0af2c4dca2109cc4d8a34f80b2ae60
Author: vernedeng <[email protected]>
AuthorDate: Wed Jan 22 15:27:09 2025 +0800
[INLONG-11674][Sort] Pulsar Source supports InlongMsg metadata (#11691)
---
.../src/main/java/org/apache/inlong/common/enums/MetaField.java | 7 ++++++-
.../java/org/apache/inlong/sort/protocol/node/ExtractNode.java | 2 ++
.../inlong/sort/protocol/node/extract/PulsarExtractNode.java | 5 ++++-
3 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
index e6d0bbc6cb..e80d006433 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
@@ -168,7 +168,12 @@ public enum MetaField {
/**
* Inlong data time for audit.
*/
- AUDIT_DATA_TIME;
+ AUDIT_DATA_TIME,
+
+ /**
+ * Inlong properties in InlongMsg.
+ */
+ INLONG_PROPERTIES;
public static MetaField forName(String name) {
for (MetaField metaField : values()) {
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index fb0f9dcd80..c61b12ab32 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -78,6 +78,8 @@ public abstract class ExtractNode implements Node {
public static final String INLONG_MSG_AUDIT_TIME = "value.data-time";
+ public static final String INLONG_MSG_PROPERTIES =
"value.inlong-msg-properties";
+
public static final String CONSUME_AUDIT_TIME = "consume_time";
@JsonProperty("id")
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index ab90ba7d19..1887aca145 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -195,6 +195,9 @@ public class PulsarExtractNode extends ExtractNode
implements InlongMetric, Meta
metadataKey = CONSUME_AUDIT_TIME;
}
break;
+ case INLONG_PROPERTIES:
+ metadataKey = INLONG_MSG_PROPERTIES;
+ break;
default:
throw new
UnsupportedOperationException(String.format("Unsupported meta field for %s: %s",
this.getClass().getSimpleName(), metaField));
@@ -209,7 +212,7 @@ public class PulsarExtractNode extends ExtractNode
implements InlongMetric, Meta
@Override
public Set<MetaField> supportedMetaFields() {
- return EnumSet.of(MetaField.AUDIT_DATA_TIME);
+ return EnumSet.of(MetaField.AUDIT_DATA_TIME,
MetaField.INLONG_PROPERTIES);
}
@Override