This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 17be87606d [INLONG-10413][Manager] Support for configuring built-in
fields for mysql and kafka (#10414)
17be87606d is described below
commit 17be87606d8848a4fc458976a3977219dee547f5
Author: fuweng11 <[email protected]>
AuthorDate: Fri Jun 14 17:31:29 2024 +0800
[INLONG-10413][Manager] Support for configuring built-in fields for mysql
and kafka (#10414)
---
.../pojo/sort/node/provider/KafkaProvider.java | 23 +++++++++++++++++++++
.../sort/node/provider/MySQLBinlogProvider.java | 24 ++++++++++++++++++++++
2 files changed, 47 insertions(+)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
index e7ca76241e..18ec699107 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.consts.StreamType;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
@@ -27,6 +28,7 @@ import
org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.LoadNode;
@@ -44,8 +46,10 @@ import
org.apache.inlong.sort.protocol.transformation.FieldRelation;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* The Provider for creating Kafka extract or load nodes.
@@ -180,4 +184,23 @@ public class KafkaProvider implements ExtractNodeProvider,
LoadNodeProvider {
}
return format;
}
+
+ @Override
+ public List<FieldInfo> getMetaFields() {
+ List<FieldInfo> fieldInfos = new ArrayList<>();
+ fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(),
MetaField.AUDIT_DATA_TIME));
+ return fieldInfos;
+ }
+
+ @Override
+ public List<StreamField> addStreamMetaFields(List<StreamField>
streamFields) {
+ List<String> fieldNames =
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+ if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+ streamFields.add(0,
+ new StreamField(0, "long",
MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1,
+ MetaField.AUDIT_DATA_TIME.name()));
+ }
+ return streamFields;
+ }
+
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
index 2c444eaf92..d02373af3e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
@@ -17,13 +17,16 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.consts.SourceType;
import
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
import
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeStrategyFactory;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -31,8 +34,10 @@ import com.google.common.base.Splitter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* The Provider for creating MySQLBinlog extract nodes.
@@ -96,4 +101,23 @@ public class MySQLBinlogProvider implements
ExtractNodeProvider {
true,
serverTimeZone);
}
+
+ @Override
+ public List<FieldInfo> getMetaFields() {
+ List<FieldInfo> fieldInfos = new ArrayList<>();
+ fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(),
MetaField.AUDIT_DATA_TIME));
+ return fieldInfos;
+ }
+
+ @Override
+ public List<StreamField> addStreamMetaFields(List<StreamField>
streamFields) {
+ List<String> fieldNames =
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+ if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+ streamFields.add(0,
+ new StreamField(0, "long",
MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1,
+ MetaField.AUDIT_DATA_TIME.name()));
+ }
+ return streamFields;
+ }
+
}
\ No newline at end of file