This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 17be87606d [INLONG-10413][Manager] Support for configuring built-in 
fields for mysql and kafka (#10414)
17be87606d is described below

commit 17be87606d8848a4fc458976a3977219dee547f5
Author: fuweng11 <[email protected]>
AuthorDate: Fri Jun 14 17:31:29 2024 +0800

    [INLONG-10413][Manager] Support for configuring built-in fields for mysql 
and kafka (#10414)
---
 .../pojo/sort/node/provider/KafkaProvider.java     | 23 +++++++++++++++++++++
 .../sort/node/provider/MySQLBinlogProvider.java    | 24 ++++++++++++++++++++++
 2 files changed, 47 insertions(+)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
index e7ca76241e..18ec699107 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.consts.StreamType;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
@@ -27,6 +28,7 @@ import 
org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.LoadNode;
@@ -44,8 +46,10 @@ import 
org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import com.google.common.collect.Lists;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * The Provider for creating Kafka extract or load nodes.
@@ -180,4 +184,23 @@ public class KafkaProvider implements ExtractNodeProvider, 
LoadNodeProvider {
         }
         return format;
     }
+
+    @Override
+    public List<FieldInfo> getMetaFields() {
+        List<FieldInfo> fieldInfos = new ArrayList<>();
+        fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(), 
MetaField.AUDIT_DATA_TIME));
+        return fieldInfos;
+    }
+
+    @Override
+    public List<StreamField> addStreamMetaFields(List<StreamField> 
streamFields) {
+        List<String> fieldNames = 
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+        if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+            streamFields.add(0,
+                    new StreamField(0, "long", 
MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1,
+                            MetaField.AUDIT_DATA_TIME.name()));
+        }
+        return streamFields;
+    }
+
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
index 2c444eaf92..d02373af3e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
@@ -17,13 +17,16 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.consts.SourceType;
 import 
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
 import 
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeStrategyFactory;
 import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 
@@ -31,8 +34,10 @@ import com.google.common.base.Splitter;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * The Provider for creating MySQLBinlog extract nodes.
@@ -96,4 +101,23 @@ public class MySQLBinlogProvider implements 
ExtractNodeProvider {
                 true,
                 serverTimeZone);
     }
+
+    @Override
+    public List<FieldInfo> getMetaFields() {
+        List<FieldInfo> fieldInfos = new ArrayList<>();
+        fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(), 
MetaField.AUDIT_DATA_TIME));
+        return fieldInfos;
+    }
+
+    @Override
+    public List<StreamField> addStreamMetaFields(List<StreamField> 
streamFields) {
+        List<String> fieldNames = 
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+        if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+            streamFields.add(0,
+                    new StreamField(0, "long", 
MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1,
+                            MetaField.AUDIT_DATA_TIME.name()));
+        }
+        return streamFields;
+    }
+
 }
\ No newline at end of file

Reply via email to