This is an automated email from the ASF dual-hosted git repository.
wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 685c210444 [INLONG-11722][Manager]Pulsar source supports Inlong
properties field (#11723)
685c210444 is described below
commit 685c2104440c599502c122075fb72eed994b3048
Author: fuweng11 <[email protected]>
AuthorDate: Fri Feb 7 18:04:39 2025 +0800
[INLONG-11722][Manager]Pulsar source supports Inlong properties field
(#11723)
---
.../inlong/manager/pojo/sort/node/NodeFactory.java | 17 ++++++++++++++++-
.../pojo/sort/node/base/LoadNodeProvider.java | 4 ++++
.../manager/pojo/sort/node/base/NodeProvider.java | 10 ++++++++++
.../pojo/sort/node/provider/PulsarProvider.java | 22 ++++++++++++++++++++++
.../pojo/sort/node/provider/TubeMqProvider.java | 22 ++++++++++++++++++++++
5 files changed, 74 insertions(+), 1 deletion(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
index 3cda291dbe..fecd2d96bd 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
@@ -116,9 +116,24 @@ public class NodeFactory {
sinkInfo.setSinkFieldList(loadNodeProvider.addSinkFieldsForSinkMultiple(sinkInfo.getSinkFieldList()));
}
if (FieldInfoUtils.compareFields(extractNodeProvider.getMetaFields(),
loadNodeProvider.getMetaFields())) {
+ if (loadNodeProvider.needInlongPropertiesField(sinkInfo)) {
+
loadNodeProvider.addInlongPropertiesFieldForStream(sourceInfo.getFieldList());
+
loadNodeProvider.addInlongPropertiesFieldForSink(sinkInfo.getSinkFieldList());
+ }
+ if (extractNodeProvider.needInlongPropertiesField(sourceInfo)) {
+
extractNodeProvider.addInlongPropertiesFieldForStream(sourceInfo.getFieldList());
+ }
extractNodeProvider.addStreamMetaFields(sourceInfo.getFieldList());
if (CollectionUtils.isNotEmpty(transformResponses)) {
- transformResponses.forEach(v ->
extractNodeProvider.addStreamMetaFields(v.getFieldList()));
+ transformResponses.forEach(v -> {
+ if (loadNodeProvider.needInlongPropertiesField(sinkInfo)) {
+
loadNodeProvider.addInlongPropertiesFieldForStream(v.getFieldList());
+ }
+ if
(extractNodeProvider.needInlongPropertiesField(sourceInfo)) {
+
extractNodeProvider.addInlongPropertiesFieldForStream(v.getFieldList());
+ }
+ extractNodeProvider.addStreamMetaFields(v.getFieldList());
+ });
}
loadNodeProvider.addSinkMetaFields(sinkInfo.getSinkFieldList());
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
index e243226b7c..d32b12e141 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -158,4 +158,8 @@ public interface LoadNodeProvider extends NodeProvider {
default List<SinkField> addSinkFieldsForSinkMultiple(List<SinkField>
sinkFields) {
return new ArrayList<>();
}
+
+ default List<SinkField> addInlongPropertiesFieldForSink(List<SinkField>
sinkFields) {
+ return sinkFields;
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
index f17d38bde0..94987f6e76 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.pojo.sort.node.base;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
import java.util.ArrayList;
@@ -54,4 +56,12 @@ public interface NodeProvider {
return new ArrayList<>();
}
+ default boolean needInlongPropertiesField(StreamNode streamNode) {
+ return false;
+ }
+
+ default List<StreamField>
addInlongPropertiesFieldForStream(List<StreamField> streamFields) {
+ return streamFields;
+ }
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index 1ece6c0184..50254ddadc 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.LongFormatInfo;
import org.apache.inlong.manager.common.consts.SourceType;
@@ -36,6 +37,7 @@ import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -106,4 +108,24 @@ public class PulsarProvider implements ExtractNodeProvider
{
fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new
LongFormatInfo()));
return fieldInfos;
}
+
+ @Override
+ public boolean needInlongPropertiesField(StreamNode streamNode) {
+ if (streamNode instanceof PulsarSource) {
+ PulsarSource pulsarSource = (PulsarSource) streamNode;
+ return !Objects.equals(pulsarSource.getWrapType(),
MessageWrapType.RAW.getName());
+ }
+ return true;
+ }
+
+ @Override
+ public List<StreamField>
addInlongPropertiesFieldForStream(List<StreamField> streamFields) {
+ List<String> fieldNames =
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+ if (!fieldNames.contains(MetaField.INLONG_PROPERTIES.name())) {
+ streamFields.add(0,
+ new StreamField(0, "map",
MetaField.INLONG_PROPERTIES.name(), "inlong properties", null, 1,
+ MetaField.INLONG_PROPERTIES.name()));
+ }
+ return streamFields;
+ }
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index d2553a76ab..455c627d1d 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.LongFormatInfo;
import org.apache.inlong.manager.common.consts.SourceType;
@@ -34,6 +35,7 @@ import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -92,4 +94,24 @@ public class TubeMqProvider implements ExtractNodeProvider {
return fieldInfos;
}
+ @Override
+ public boolean needInlongPropertiesField(StreamNode streamNode) {
+ if (streamNode instanceof TubeMQSource) {
+ TubeMQSource tubeMQSource = (TubeMQSource) streamNode;
+ return !Objects.equals(tubeMQSource.getWrapType(),
MessageWrapType.RAW.getName());
+ }
+ return true;
+ }
+
+ @Override
+ public List<StreamField>
addInlongPropertiesFieldForStream(List<StreamField> streamFields) {
+ List<String> fieldNames =
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+ if (!fieldNames.contains(MetaField.INLONG_PROPERTIES.name())) {
+ streamFields.add(0,
+ new StreamField(0, "map",
MetaField.INLONG_PROPERTIES.name(), "inlong properties", null, 1,
+ MetaField.INLONG_PROPERTIES.name()));
+ }
+ return streamFields;
+ }
+
}
\ No newline at end of file