This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8c7543559b [INLONG-9248][Manager] Supports configuring builtIn fields
for tube source and pulsar source (#9249)
8c7543559b is described below
commit 8c7543559b769832fb4534906f3f24225b1f143d
Author: fuweng11 <[email protected]>
AuthorDate: Sat Nov 11 14:39:16 2023 +0800
[INLONG-9248][Manager] Supports configuring builtIn fields for tube source
and pulsar source (#9249)
---
.../pojo/sort/node/provider/IcebergProvider.java | 11 ++++++++++
.../pojo/sort/node/provider/PulsarProvider.java | 24 +++++++++++++++++++++-
.../pojo/sort/node/provider/TubeMqProvider.java | 24 ++++++++++++++++++++++
.../inlong/manager/pojo/source/StreamSource.java | 3 ++-
4 files changed, 60 insertions(+), 2 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
index 06cd989198..4af912f347 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.sort.node.provider;
import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
@@ -110,6 +111,16 @@ public class IcebergProvider implements
ExtractNodeProvider, LoadNodeProvider {
return streamFields;
}
+ @Override
+ public List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) {
+ List<String> fieldNames =
sinkFields.stream().map(SinkField::getFieldName).collect(Collectors.toList());
+ if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+ sinkFields.add(0, new SinkField(0, "long",
MetaField.AUDIT_DATA_TIME.name(), "iceberg meta field",
+ MetaField.AUDIT_DATA_TIME.name(), "long", 1,
MetaField.AUDIT_DATA_TIME.name(), null));
+ }
+ return sinkFields;
+ }
+
@Override
public List<FieldInfo> getMetaFields() {
List<FieldInfo> fieldInfos = new ArrayList<>();
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index b0bcd0c1c8..994ce8839a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -17,10 +17,13 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
@@ -29,8 +32,10 @@ import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* The Provider for creating Pulsar extract nodes.
@@ -50,7 +55,6 @@ public class PulsarProvider implements ExtractNodeProvider {
String fullTopicName =
pulsarSource.getPulsarTenant() + "/" +
pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
-
Format format = parsingFormat(pulsarSource.getSerializationType(),
pulsarSource.getWrapType(),
pulsarSource.getDataSeparator(),
@@ -78,4 +82,22 @@ public class PulsarProvider implements ExtractNodeProvider {
pulsarSource.getSubscription(),
scanStartupSubStartOffset);
}
+
+ @Override
+ public List<StreamField> addStreamMetaFields(List<StreamField>
streamFields) {
+ List<String> fieldNames =
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+ if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+ streamFields.add(0,
+ new StreamField(0, "long",
MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1,
+ MetaField.AUDIT_DATA_TIME.name()));
+ }
+ return streamFields;
+ }
+
+ @Override
+ public List<FieldInfo> getMetaFields() {
+ List<FieldInfo> fieldInfos = new ArrayList<>();
+ fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new
LongFormatInfo()));
+ return fieldInfos;
+ }
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index a1996ddc82..2942741a08 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -17,17 +17,22 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
import org.apache.inlong.sort.protocol.node.format.Format;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* The Provider for creating TubeMQ extract nodes.
@@ -63,4 +68,23 @@ public class TubeMqProvider implements ExtractNodeProvider {
source.getSessionKey(),
source.getStreamId());
}
+
+ @Override
+ public List<StreamField> addStreamMetaFields(List<StreamField>
streamFields) {
+ List<String> fieldNames =
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+ if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+ streamFields.add(0,
+ new StreamField(0, "long",
MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1,
+ MetaField.AUDIT_DATA_TIME.name()));
+ }
+ return streamFields;
+ }
+
+ @Override
+ public List<FieldInfo> getMetaFields() {
+ List<FieldInfo> fieldInfos = new ArrayList<>();
+ fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new
LongFormatInfo()));
+ return fieldInfos;
+ }
+
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
index 0e5594a5d8..42b829de2f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
@@ -114,8 +114,9 @@ public abstract class StreamSource extends StreamNode {
@ApiModelProperty("Sub source information of existing agents")
private List<SubSourceDTO> subSourceList;
+ @Builder.Default
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value, true as default")
- private boolean ignoreParseError;
+ private boolean ignoreParseError = true;
public SourceRequest genSourceRequest() {
return null;