This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 52d73f48c3 [INLONG-8951][Manager] Support for configuring built-in
fields for iceberg and starrocks (#8952)
52d73f48c3 is described below
commit 52d73f48c343954f784b51c8c15aaa1599986f52
Author: fuweng11 <[email protected]>
AuthorDate: Fri Sep 22 10:56:50 2023 +0800
[INLONG-8951][Manager] Support for configuring built-in fields for iceberg
and starrocks (#8952)
---
.../org/apache/inlong/common/enums/MetaField.java | 7 ++-
.../inlong/manager/pojo/sort/node/NodeFactory.java | 59 ++++++++++++++++++++++
.../pojo/sort/node/base/ExtractNodeProvider.java | 5 ++
.../pojo/sort/node/base/LoadNodeProvider.java | 4 ++
.../manager/pojo/sort/node/base/NodeProvider.java | 9 ++++
.../pojo/sort/node/provider/IcebergProvider.java | 25 +++++++++
.../pojo/sort/node/provider/StarRocksProvider.java | 26 ++++++++++
.../manager/pojo/sort/util/FieldInfoUtils.java | 12 +++++
.../resource/sort/DefaultSortConfigOperator.java | 27 ++++++++--
.../src/main/resources/application-dev.properties | 2 +
.../src/main/resources/application-prod.properties | 2 +
.../src/main/resources/application-test.properties | 2 +
.../org/apache/inlong/sort/protocol/Metadata.java | 1 +
.../protocol/node/extract/IcebergExtractNode.java | 33 +++++++++++-
.../org/apache/inlong/sort/base/Constants.java | 2 +-
.../sort/iceberg/IcebergReadableMetadata.java | 4 +-
.../starrocks/table/sink/utils/SchemaUtils.java | 21 ++++----
17 files changed, 221 insertions(+), 20 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
index f3c1a388f5..e6d0bbc6cb 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
@@ -163,7 +163,12 @@ public enum MetaField {
/**
* Timestamp of the Kafka record, it is only used for Kafka.
*/
- TIMESTAMP;
+ TIMESTAMP,
+
+ /**
+ * Inlong data time for audit.
+ */
+ AUDIT_DATA_TIME;
public static MetaField forName(String name) {
for (MetaField metaField : values()) {
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
index cc6a545490..74f2efae00 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
@@ -18,14 +18,23 @@
package org.apache.inlong.manager.pojo.sort.node;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.transform.TransformResponse;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -33,6 +42,7 @@ import java.util.stream.Collectors;
/**
* The node factory
*/
+@Slf4j
public class NodeFactory {
/**
@@ -61,4 +71,53 @@ public class NodeFactory {
return
LoadNodeProviderFactory.getLoadNodeProvider(sinkType).createLoadNode(v,
constantFieldMap);
}).collect(Collectors.toList());
}
+
+ /**
+ * Create extract node from the given source.
+ */
+ public static ExtractNode createExtractNode(StreamSource sourceInfo) {
+ if (sourceInfo == null) {
+ return null;
+ }
+ String sourceType = sourceInfo.getSourceType();
+ return
ExtractNodeProviderFactory.getExtractNodeProvider(sourceType).createExtractNode(sourceInfo);
+ }
+
+ /**
+ * Create load node from the given sink.
+ */
+ public static LoadNode createLoadNode(StreamSink sinkInfo, Map<String,
StreamField> constantFieldMap) {
+ if (sinkInfo == null) {
+ return null;
+ }
+ String sinkType = sinkInfo.getSinkType();
+ return
LoadNodeProviderFactory.getLoadNodeProvider(sinkType).createLoadNode(sinkInfo,
constantFieldMap);
+ }
+
+ /**
+ * Add built-in field for extra node and load node
+ */
+ public static List<Node> addBuiltInField(StreamSource sourceInfo,
StreamSink sinkInfo,
+ List<TransformResponse> transformResponses, Map<String,
StreamField> constantFieldMap) {
+ ExtractNodeProvider extractNodeProvider =
ExtractNodeProviderFactory.getExtractNodeProvider(
+ sourceInfo.getSourceType());
+ LoadNodeProvider loadNodeProvider =
LoadNodeProviderFactory.getLoadNodeProvider(sinkInfo.getSinkType());
+
+ if (FieldInfoUtils.compareFields(extractNodeProvider.getMetaFields(),
loadNodeProvider.getMetaFields())) {
+ extractNodeProvider.addStreamMetaFields(sourceInfo.getFieldList());
+ transformResponses.forEach(v ->
extractNodeProvider.addStreamMetaFields(v.getFieldList()));
+ loadNodeProvider.addSinkMetaFields(sinkInfo.getSinkFieldList());
+ }
+
+ ExtractNode extractNode =
extractNodeProvider.createExtractNode(sourceInfo);
+ List<TransformNode> transformNodes =
+ TransformNodeUtils.createTransformNodes(transformResponses,
constantFieldMap);
+ LoadNode loadNode = loadNodeProvider.createLoadNode(sinkInfo,
constantFieldMap);
+
+ List<Node> nodes = new ArrayList<>();
+ nodes.add(extractNode);
+ nodes.addAll(transformNodes);
+ nodes.add(loadNode);
+ return nodes;
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index 3642cc7a3e..a8622a7be8 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -135,4 +135,9 @@ public interface ExtractNodeProvider extends NodeProvider {
}
return format;
}
+
+ default List<StreamField> addStreamMetaFields(List<StreamField>
streamFields) {
+ return streamFields;
+ }
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
index 2df89d9710..4af5e532d4 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -141,4 +141,8 @@ public interface LoadNodeProvider extends NodeProvider {
}
return format;
}
+
+ default List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) {
+ return sinkFields;
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
index a40bd1106d..f17d38bde0 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
@@ -17,6 +17,10 @@
package org.apache.inlong.manager.pojo.sort.node.base;
+import org.apache.inlong.sort.protocol.FieldInfo;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -45,4 +49,9 @@ public interface NodeProvider {
.filter(v -> Objects.nonNull(v.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
}
+
+ default List<FieldInfo> getMetaFields() {
+ return new ArrayList<>();
+ }
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
index 416a409ac5..06cd989198 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.consts.StreamType;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
@@ -25,6 +26,7 @@ import
org.apache.inlong.manager.pojo.source.iceberg.IcebergSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.constant.IcebergConstant;
import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
import org.apache.inlong.sort.protocol.node.ExtractNode;
@@ -33,12 +35,17 @@ import
org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* The Provider for creating Iceberg load nodes.
*/
+@Slf4j
public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider {
@Override
@@ -91,4 +98,22 @@ public class IcebergProvider implements ExtractNodeProvider,
LoadNodeProvider {
icebergSink.getCatalogUri(),
icebergSink.getWarehouse());
}
+
+ @Override
+ public List<StreamField> addStreamMetaFields(List<StreamField>
streamFields) {
+ List<String> fieldNames =
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+ if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+ streamFields.add(0,
+ new StreamField(0, "long",
MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1,
+ MetaField.AUDIT_DATA_TIME.name()));
+ }
+ return streamFields;
+ }
+
+ @Override
+ public List<FieldInfo> getMetaFields() {
+ List<FieldInfo> fieldInfos = new ArrayList<>();
+ fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(),
MetaField.AUDIT_DATA_TIME));
+ return fieldInfos;
+ }
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
index acae2efeb3..5a86455a0f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
@@ -17,23 +17,31 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* The Provider for creating StarRocks load nodes.
*/
+@Slf4j
public class StarRocksProvider implements LoadNodeProvider {
@Override
@@ -71,4 +79,22 @@ public class StarRocksProvider implements LoadNodeProvider {
starRocksSink.getDatabasePattern(),
starRocksSink.getTablePattern());
}
+
+ @Override
+ public List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) {
+ List<String> fieldNames =
sinkFields.stream().map(SinkField::getFieldName).collect(Collectors.toList());
+ if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+ sinkFields.add(0, new SinkField(0, "long",
MetaField.AUDIT_DATA_TIME.name(), "long",
+ MetaField.AUDIT_DATA_TIME.name()));
+ }
+ return sinkFields;
+ }
+
+ @Override
+ public List<FieldInfo> getMetaFields() {
+ List<FieldInfo> fieldInfos = new ArrayList<>();
+ fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new
LongFormatInfo()));
+ return fieldInfos;
+ }
+
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
index 5f4d7f274d..43d48a6e76 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
@@ -391,4 +391,16 @@ public class FieldInfoUtils {
return sortFormat;
}
+ public static boolean compareFields(List<FieldInfo> sourceFields,
List<FieldInfo> targetFields) {
+ if (sourceFields.size() != targetFields.size()) {
+ return false;
+ }
+ for (int i = 0; i < sourceFields.size(); i++) {
+ if (!Objects.equals(sourceFields.get(i).getName(),
targetFields.get(i).getName())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 52ff3bb5e8..74f0819613 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -35,6 +35,7 @@ import
org.apache.inlong.manager.service.transform.StreamTransformService;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.apache.commons.collections.CollectionUtils;
@@ -42,6 +43,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
@@ -51,6 +53,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -63,6 +66,8 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultSortConfigOperator.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ @Value("${metrics.audit.proxy.hosts:127.0.0.1}")
+ private String auditHost;
@Autowired
private StreamSourceService sourceService;
@Autowired
@@ -127,12 +132,11 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
for (StreamSink sink : sinks) {
Map<String, Object> properties = sink.getProperties();
- properties.putIfAbsent("metrics.audit.key",
auditService.getAuditId(sink.getSinkType(), true));
+ addAuditId(sink.getProperties(), sink.getSinkType(), true);
}
for (StreamSource source : sources) {
source.setFieldList(inlongStream.getFieldList());
- Map<String, Object> properties = source.getProperties();
- properties.putIfAbsent("metrics.audit.key",
auditService.getAuditId(source.getSourceType(), false));
+ addAuditId(source.getProperties(), source.getSourceType(),
false);
}
List<NodeRelation> relations;
@@ -222,8 +226,13 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
private List<Node> createNodes(List<StreamSource> sources,
List<TransformResponse> transformResponses,
List<StreamSink> sinks, Map<String, StreamField> constantFieldMap)
{
List<Node> nodes = new ArrayList<>();
+ if (Objects.equals(sources.size(), sinks.size()) &&
Objects.equals(sources.size(), 1)) {
+ return NodeFactory.addBuiltInField(sources.get(0), sinks.get(0),
transformResponses, constantFieldMap);
+ }
+ List<TransformNode> transformNodes =
+ TransformNodeUtils.createTransformNodes(transformResponses,
constantFieldMap);
nodes.addAll(NodeFactory.createExtractNodes(sources));
-
nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses,
constantFieldMap));
+ nodes.addAll(transformNodes);
nodes.addAll(NodeFactory.createLoadNodes(sinks, constantFieldMap));
return nodes;
}
@@ -264,4 +273,14 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
groupInfo.getExtList().add(extInfo);
}
+ private void addAuditId(Map<String, Object> properties, String type,
boolean isSent) {
+ try {
+ String auditId = auditService.getAuditId(type, isSent);
+ properties.putIfAbsent("metrics.audit.key", auditId);
+ properties.putIfAbsent("metrics.audit.proxy.hosts", auditHost);
+ } catch (Exception e) {
+ LOGGER.error("Current type ={} is not set auditId", type);
+ }
+
+ }
}
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index ff248053a4..376d9b9f73 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -107,3 +107,5 @@ group.deleted.latest.hours=10
group.deleted.batchSize=100
# If turned on, the groups could be deleted periodically.
group.deleted.enabled=false
+
+metrics.audit.proxy.hosts=127.0.0.1:10081
diff --git
a/inlong-manager/manager-web/src/main/resources/application-prod.properties
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 69122ed272..c47fc92334 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -106,3 +106,5 @@ group.deleted.latest.hours=10
group.deleted.batchSize=100
# If turned on, the groups could be deleted periodically.
group.deleted.enabled=false
+
+metrics.audit.proxy.hosts=127.0.0.1:10081
diff --git
a/inlong-manager/manager-web/src/main/resources/application-test.properties
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index ff248053a4..376d9b9f73 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -107,3 +107,5 @@ group.deleted.latest.hours=10
group.deleted.batchSize=100
# If turned on, the groups could be deleted periodically.
group.deleted.enabled=false
+
+metrics.audit.proxy.hosts=127.0.0.1:10081
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
index 8dcf069165..11c4ec6737 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
@@ -114,6 +114,7 @@ public interface Metadata {
case BATCH_ID:
case PARTITION:
case OFFSET:
+ case AUDIT_DATA_TIME:
metadataType = "BIGINT";
break;
case UPDATE_BEFORE:
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
index c48899b887..e87c111743 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
@@ -17,7 +17,10 @@
package org.apache.inlong.sort.protocol.node.extract;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.Metadata;
import org.apache.inlong.sort.protocol.constant.IcebergConstant;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -32,8 +35,10 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Serializable;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Iceberg extract node for extract data from iceberg
@@ -42,7 +47,7 @@ import java.util.Map;
@JsonTypeName("icebergExtract")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
-public class IcebergExtractNode extends ExtractNode implements Serializable {
+public class IcebergExtractNode extends ExtractNode implements InlongMetric,
Metadata, Serializable {
@JsonProperty("tableName")
@Nonnull
@@ -113,7 +118,7 @@ public class IcebergExtractNode extends ExtractNode
implements Serializable {
// support streaming only
options.put(IcebergConstant.STREAMING, "true");
options.put(IcebergConstant.STARTING_STRATEGY_KEY,
-
IcebergConstant.StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT.name());
+
IcebergConstant.StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL.name());
if (null != uri) {
options.put(IcebergConstant.URI_KEY, uri);
}
@@ -138,4 +143,28 @@ public class IcebergExtractNode extends ExtractNode
implements Serializable {
return super.getPartitionFields();
}
+ @Override
+ public String getMetadataKey(MetaField metaField) {
+ String metadataKey;
+ switch (metaField) {
+ case AUDIT_DATA_TIME:
+ metadataKey = "audit_data_time";
+ break;
+ default:
+ throw new
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+ this.getClass().getSimpleName(), metaField));
+ }
+ return metadataKey;
+ }
+
+ @Override
+ public boolean isVirtual(MetaField metaField) {
+ return true;
+ }
+
+ @Override
+ public Set<MetaField> supportedMetaFields() {
+ return EnumSet.of(MetaField.AUDIT_DATA_TIME);
+ }
+
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 72ce9d89b8..5065c78c1f 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -167,7 +167,7 @@ public final class Constants {
public static final String META_INCREMENTAL = "incremental_inlong";
- public static final String META_INLONG_DATA_TIME = "inlong_data_time";
+ public static final String META_AUDIT_DATA_TIME = "audit_data_time";
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric.labels")
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
index e003555a45..11f7c51d60 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
@@ -30,8 +30,8 @@ import java.io.Serializable;
*/
public enum IcebergReadableMetadata {
- INLONG_DATA_TIME(
- Constants.META_INLONG_DATA_TIME,
+ AUDIT_DATA_TIME(
+ Constants.META_AUDIT_DATA_TIME,
DataTypes.BIGINT().notNull(),
r -> System.currentTimeMillis());
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
index 178196d466..76e91e6cf3 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
@@ -31,7 +31,7 @@ public class SchemaUtils implements Serializable {
private static final long serialVersionUID = 1L;
- private final String INLONG_DATA_TIME = "inlong_data_time";
+ private final String AUDIT_DATA_TIME = "audit_data_time";
private final int DATA_TIME_ABSENT_INDEX = -1;
private final int dataTimeFieldIndex;
@@ -41,16 +41,16 @@ public class SchemaUtils implements Serializable {
public long getDataTime(Object[] data) {
if (dataTimeFieldIndex == DATA_TIME_ABSENT_INDEX) {
- // if INLONG_DATA_TIME field is absent, return local time
+ // if AUDIT_DATA_TIME field is absent, return local time
return System.currentTimeMillis();
}
return (Long) data[dataTimeFieldIndex];
}
/**
- * filter out INLONG_DATA_TIME field
+ * filter out AUDIT_DATA_TIME field
* @param data
- * @return data without INLONG_DATA_TIME
+ * @return data without AUDIT_DATA_TIME
*/
public Object[] filterOutTimeField(Object[] data) {
if (dataTimeFieldIndex == DATA_TIME_ABSENT_INDEX) {
@@ -66,24 +66,25 @@ public class SchemaUtils implements Serializable {
}
/**
- * INLONG_DATA_TIME should not occur in actual data schema fields
+ * AUDIT_DATA_TIME should not occur in actual data schema fields
+ *
* @param schema
- * @return fieldNames without INLONG_DATA_TIME
+ * @return fieldNames without AUDIT_DATA_TIME
*/
public String[] filterOutTimeField(TableSchema schema) {
return Arrays.stream(schema.getFieldNames())
- .filter(field -> !INLONG_DATA_TIME.equals(field))
+ .filter(field -> !AUDIT_DATA_TIME.equals(field))
.toArray(String[]::new);
}
/**
- * get the index of INLONG_DATA_TIME in fieldNames
+ * get the index of AUDIT_DATA_TIME in fieldNames
* @param fieldNames
- * @return index of INLONG_DATA_TIME in fieldNames, or
DATA_TIME_ABSENT_INDEX if absent
+ * @return index of AUDIT_DATA_TIME in fieldNames, or
DATA_TIME_ABSENT_INDEX if absent
*/
private int getDataTimeIndex(String[] fieldNames) {
for (int i = 0; i < fieldNames.length; i++) {
- if (INLONG_DATA_TIME.equals(fieldNames[i])) {
+ if (AUDIT_DATA_TIME.equals(fieldNames[i])) {
return i;
}
}