This is an automated email from the ASF dual-hosted git repository.

wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 685c210444 [INLONG-11722][Manager]Pulsar source supports Inlong 
properties field (#11723)
685c210444 is described below

commit 685c2104440c599502c122075fb72eed994b3048
Author: fuweng11 <[email protected]>
AuthorDate: Fri Feb 7 18:04:39 2025 +0800

    [INLONG-11722][Manager]Pulsar source supports Inlong properties field 
(#11723)
---
 .../inlong/manager/pojo/sort/node/NodeFactory.java | 17 ++++++++++++++++-
 .../pojo/sort/node/base/LoadNodeProvider.java      |  4 ++++
 .../manager/pojo/sort/node/base/NodeProvider.java  | 10 ++++++++++
 .../pojo/sort/node/provider/PulsarProvider.java    | 22 ++++++++++++++++++++++
 .../pojo/sort/node/provider/TubeMqProvider.java    | 22 ++++++++++++++++++++++
 5 files changed, 74 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
index 3cda291dbe..fecd2d96bd 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
@@ -116,9 +116,24 @@ public class NodeFactory {
             
sinkInfo.setSinkFieldList(loadNodeProvider.addSinkFieldsForSinkMultiple(sinkInfo.getSinkFieldList()));
         }
         if (FieldInfoUtils.compareFields(extractNodeProvider.getMetaFields(), 
loadNodeProvider.getMetaFields())) {
+            if (loadNodeProvider.needInlongPropertiesField(sinkInfo)) {
+                
loadNodeProvider.addInlongPropertiesFieldForStream(sourceInfo.getFieldList());
+                
loadNodeProvider.addInlongPropertiesFieldForSink(sinkInfo.getSinkFieldList());
+            }
+            if (extractNodeProvider.needInlongPropertiesField(sourceInfo)) {
+                
extractNodeProvider.addInlongPropertiesFieldForStream(sourceInfo.getFieldList());
+            }
             extractNodeProvider.addStreamMetaFields(sourceInfo.getFieldList());
             if (CollectionUtils.isNotEmpty(transformResponses)) {
-                transformResponses.forEach(v -> 
extractNodeProvider.addStreamMetaFields(v.getFieldList()));
+                transformResponses.forEach(v -> {
+                    if (loadNodeProvider.needInlongPropertiesField(sinkInfo)) {
+                        
loadNodeProvider.addInlongPropertiesFieldForStream(v.getFieldList());
+                    }
+                    if 
(extractNodeProvider.needInlongPropertiesField(sourceInfo)) {
+                        
extractNodeProvider.addInlongPropertiesFieldForStream(v.getFieldList());
+                    }
+                    extractNodeProvider.addStreamMetaFields(v.getFieldList());
+                });
             }
             loadNodeProvider.addSinkMetaFields(sinkInfo.getSinkFieldList());
         }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
index e243226b7c..d32b12e141 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -158,4 +158,8 @@ public interface LoadNodeProvider extends NodeProvider {
     default List<SinkField> addSinkFieldsForSinkMultiple(List<SinkField> 
sinkFields) {
         return new ArrayList<>();
     }
+
+    default List<SinkField> addInlongPropertiesFieldForSink(List<SinkField> 
sinkFields) {
+        return sinkFields;
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
index f17d38bde0..94987f6e76 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.pojo.sort.node.base;
 
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
 
 import java.util.ArrayList;
@@ -54,4 +56,12 @@ public interface NodeProvider {
         return new ArrayList<>();
     }
 
+    default boolean needInlongPropertiesField(StreamNode streamNode) {
+        return false;
+    }
+
+    default List<StreamField> 
addInlongPropertiesFieldForStream(List<StreamField> streamFields) {
+        return streamFields;
+    }
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index 1ece6c0184..50254ddadc 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.LongFormatInfo;
 import org.apache.inlong.manager.common.consts.SourceType;
@@ -36,6 +37,7 @@ import org.springframework.stereotype.Service;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -106,4 +108,24 @@ public class PulsarProvider implements ExtractNodeProvider 
{
         fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new 
LongFormatInfo()));
         return fieldInfos;
     }
+
+    @Override
+    public boolean needInlongPropertiesField(StreamNode streamNode) {
+        if (streamNode instanceof PulsarSource) {
+            PulsarSource pulsarSource = (PulsarSource) streamNode;
+            return !Objects.equals(pulsarSource.getWrapType(), 
MessageWrapType.RAW.getName());
+        }
+        return true;
+    }
+
+    @Override
+    public List<StreamField> 
addInlongPropertiesFieldForStream(List<StreamField> streamFields) {
+        List<String> fieldNames = 
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+        if (!fieldNames.contains(MetaField.INLONG_PROPERTIES.name())) {
+            streamFields.add(0,
+                    new StreamField(0, "map", 
MetaField.INLONG_PROPERTIES.name(), "inlong properties", null, 1,
+                            MetaField.INLONG_PROPERTIES.name()));
+        }
+        return streamFields;
+    }
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index d2553a76ab..455c627d1d 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.LongFormatInfo;
 import org.apache.inlong.manager.common.consts.SourceType;
@@ -34,6 +35,7 @@ import org.springframework.stereotype.Service;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -92,4 +94,24 @@ public class TubeMqProvider implements ExtractNodeProvider {
         return fieldInfos;
     }
 
+    @Override
+    public boolean needInlongPropertiesField(StreamNode streamNode) {
+        if (streamNode instanceof TubeMQSource) {
+            TubeMQSource tubeMQSource = (TubeMQSource) streamNode;
+            return !Objects.equals(tubeMQSource.getWrapType(), 
MessageWrapType.RAW.getName());
+        }
+        return true;
+    }
+
+    @Override
+    public List<StreamField> 
addInlongPropertiesFieldForStream(List<StreamField> streamFields) {
+        List<String> fieldNames = 
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+        if (!fieldNames.contains(MetaField.INLONG_PROPERTIES.name())) {
+            streamFields.add(0,
+                    new StreamField(0, "map", 
MetaField.INLONG_PROPERTIES.name(), "inlong properties", null, 1,
+                            MetaField.INLONG_PROPERTIES.name()));
+        }
+        return streamFields;
+    }
+
 }
\ No newline at end of file

Reply via email to