This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 699d16552a [Feature][Transform-v2] Add metadata transform (#7899)
699d16552a is described below
commit 699d16552a4d6ca900503fe98a1bc184718574f2
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Nov 11 20:48:39 2024 +0800
[Feature][Transform-v2] Add metadata transform (#7899)
---
docs/en/transform-v2/dynamic-compile.md | 4 +-
docs/en/transform-v2/metadata.md | 85 +++++++++++
docs/zh/transform-v2/dynamic-compile.md | 4 +-
docs/zh/transform-v2/metadata.md | 85 +++++++++++
plugin-mapping.properties | 1 +
.../seatunnel/api/table/type/CommonOptions.java | 36 ++++-
.../seatunnel/api/table/type/MetadataUtil.java | 93 ++++++++++++
.../seatunnel/api/table/type/SeaTunnelRow.java | 4 +-
.../api/table/type}/SeaTunnelRowAccessor.java | 11 +-
.../cdc/base/utils/SourceRecordUtils.java | 7 +-
.../row/SeaTunnelRowDebeziumDeserializeSchema.java | 16 ++-
.../MongoDBConnectorDeserializationSchema.java | 17 ++-
.../connector-cdc-mongodb-e2e/pom.xml | 14 ++
.../src/test/java/mongodb/MongodbCDCIT.java | 45 +++++-
.../test/resources/mongodbcdc_metadata_trans.conf | 104 ++++++++++++++
.../connector-cdc-mysql-e2e/pom.xml | 14 ++
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 39 +++++
.../test/resources/mysqlcdc_to_metadata_trans.conf | 103 +++++++++++++
.../connector-cdc-opengauss-e2e/pom.xml | 14 ++
.../seatunnel/cdc/postgres/OpengaussCDCIT.java | 42 ++++++
.../resources/opengausscdc_to_meatadata_trans.conf | 105 ++++++++++++++
.../connector-cdc-oracle-e2e/pom.xml | 15 ++
.../seatunnel/cdc/oracle/OracleCDCIT.java | 42 ++++++
.../resources/oraclecdc_to_metadata_trans.conf | 119 +++++++++++++++
.../connector-cdc-postgres-e2e/pom.xml | 14 ++
.../seatunnel/cdc/postgres/PostgresCDCIT.java | 43 ++++++
.../resources/postgrescdc_to_metadata_trans.conf | 105 ++++++++++++++
.../connector-cdc-sqlserver-e2e/pom.xml | 15 ++
.../connector/cdc/sqlserver/SqlServerCDCIT.java | 39 +++++
.../resources/sqlservercdc_to_metadata_trans.conf | 110 ++++++++++++++
.../e2e/common/container/TestContainer.java | 4 +
.../container/seatunnel/SeaTunnelContainer.java | 24 +++-
.../src/test/resources/seatunnel.yaml | 4 +-
...ixed_dynamic_groovy_java_compile_transform.conf | 6 +-
.../multiple_dynamic_groovy_compile_transform.conf | 6 +-
.../multiple_dynamic_java_compile_transform.conf | 6 +-
.../single_dynamic_groovy_compile_transform.conf | 4 +-
.../single_dynamic_http_compile_transform.conf | 4 +-
.../single_dynamic_java_compile_transform.conf | 4 +-
.../dynamic_compile/source_file/GroovyFile | 4 +-
.../resources/dynamic_compile/source_file/JavaFile | 2 +-
.../common/MultipleFieldOutputTransform.java | 1 +
.../transform/common/SeaTunnelRowAccessor.java | 1 +
.../common/SingleFieldOutputTransform.java | 1 +
.../transform/copy/CopyFieldTransform.java | 2 +-
.../dynamiccompile/DynamicCompileTransform.java | 2 +-
.../transform/exception/TransformCommonError.java | 16 +++
.../exception/TransformCommonErrorCode.java | 8 +-
.../transform/jsonpath/JsonPathTransform.java | 2 +-
.../transform/metadata/MetadataTransform.java | 159 +++++++++++++++++++++
.../MetadataTransformConfig.java} | 38 ++---
.../metadata/MetadataTransformFactory.java | 44 ++++++
.../nlpmodel/embadding/EmbeddingTransform.java | 2 +-
.../transform/nlpmodel/llm/LLMTransform.java | 2 +-
.../transform/replace/ReplaceTransform.java | 2 +-
.../rowkind/RowKindExtractorTransform.java | 2 +-
.../seatunnel/transform/split/SplitTransform.java | 2 +-
.../transform/metadata/MetadataTransformTest.java | 132 +++++++++++++++++
58 files changed, 1752 insertions(+), 77 deletions(-)
diff --git a/docs/en/transform-v2/dynamic-compile.md
b/docs/en/transform-v2/dynamic-compile.md
index fb5500880a..66b7ba1f83 100644
--- a/docs/en/transform-v2/dynamic-compile.md
+++ b/docs/en/transform-v2/dynamic-compile.md
@@ -88,7 +88,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.catalog.CatalogTable
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.*;
@@ -146,7 +146,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md
new file mode 100644
index 0000000000..23f593ee0a
--- /dev/null
+++ b/docs/en/transform-v2/metadata.md
@@ -0,0 +1,85 @@
+# Metadata
+
+> Metadata transform plugin
+
+## Description
+Metadata transform plugin for adding metadata fields to data
+
+## Available Metadata
+
+| Key | DataType | Description
|
+|:---------:|:--------:|:---------------------------------------------------------------------------------------------------|
+| Database | string | Name of the table that contain the row.
|
+| Table | string | Name of the table that contain the row.
|
+| RowKind | string | The type of operation
|
+| EventTime | Long | The time at which the connector processed the event.
|
+| Delay | Long | The difference between data extraction time and
database change time |
+| Partition | string | Contains the partition field of the corresponding
number table of the row, multiple using `,` join |
+
+### note
+ `Delay` `Partition` only worked on cdc series connectors for now , except
TiDB-CDC
+
+## Options
+
+| name | type | required | default value | Description
|
+|:---------------:|------|----------|---------------|---------------------------------------------------------------------------|
+| metadata_fields | map | yes | | A mapping metadata input
fields and their corresponding output fields. |
+
+### metadata_fields [map]
+
+A mapping between metadata fields and their respective output fields.
+
+```hocon
+metadata_fields {
+ Database = c_database
+ Table = c_table
+ RowKind = c_rowKind
+ EventTime = c_ts_ms
+ Delay = c_delay
+}
+```
+
+## Examples
+
+```yaml
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second = 7000000
+ read_limit.rows_per_second = 400
+}
+
+source {
+ MySQL-CDC {
+ result_table_name = "customers_mysql_cdc"
+ server-id = 5652
+ username = "root"
+ password = "zdyk_Dev@2024"
+ table-names = ["source.user"]
+ base-url = "jdbc:mysql://172.16.17.123:3306/source"
+ }
+}
+
+transform {
+ Metadata {
+ metadata_fields {
+ Database = database
+ Table = table
+ RowKind = rowKind
+ EventTime = ts_ms
+ Delay = delay
+ }
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "custom_name"
+ }
+}
+
+```
+
diff --git a/docs/zh/transform-v2/dynamic-compile.md
b/docs/zh/transform-v2/dynamic-compile.md
index c5af808d4e..4db9b86d73 100644
--- a/docs/zh/transform-v2/dynamic-compile.md
+++ b/docs/zh/transform-v2/dynamic-compile.md
@@ -85,7 +85,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.catalog.CatalogTable
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.*;
@@ -143,7 +143,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md
new file mode 100644
index 0000000000..09a743adf0
--- /dev/null
+++ b/docs/zh/transform-v2/metadata.md
@@ -0,0 +1,85 @@
+# Metadata
+
+> Metadata transform plugin
+
+## Description
+元数据转换插件,用于将元数据字段添加到数据中
+
+## 支持的元数据
+
+| Key | DataType | Description |
+|:---------:|:--------:|:-----------------------:|
+| Database | string | 包含该行的数据库名 |
+| Table | string | 包含该行的数表名 |
+| RowKind | string | 行类型 |
+| EventTime | Long | |
+| Delay | Long | 数据抽取时间与数据库变更时间的差 |
+| Partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 |
+
+### 注意事项
+ `Delay` `Partition`目前只适用于cdc系列连接器,除外TiDB-CDC
+
+## 配置选项
+
+| name | type | required | default value | Description |
+|:---------------:|------|:--------:|:-------------:|-------------------|
+| metadata_fields | map | 是 | - | 元数据字段与输入字段相应的映射关系 |
+
+### metadata_fields [map]
+
+元数据字段和相应的输出字段之间的映射关系
+
+```hocon
+metadata_fields {
+ database = c_database
+ table = c_table
+ rowKind = c_rowKind
+ ts_ms = c_ts_ms
+ delay = c_delay
+}
+```
+
+## 示例
+
+```yaml
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second = 7000000
+ read_limit.rows_per_second = 400
+}
+
+source {
+ MySQL-CDC {
+ result_table_name = "customers_mysql_cdc"
+ server-id = 5652
+ username = "root"
+ password = "zdyk_Dev@2024"
+ table-names = ["source.user"]
+ base-url = "jdbc:mysql://172.16.17.123:3306/source"
+ }
+}
+
+transform {
+ Metadata {
+ metadata_fields {
+ Database = database
+ Table = table
+ RowKind = rowKind
+ EventTime = ts_ms
+ Delay = delay
+ }
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "custom_name"
+ }
+}
+
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 82c941b70f..c494686161 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -154,3 +154,4 @@ seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
seatunnel.transform.Embedding = seatunnel-transforms-v2
seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
+seatunnel.transform.Metadata = seatunnel-transforms-v2
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
index 839d611132..8b5b36682a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
@@ -30,18 +30,44 @@ public enum CommonOptions {
/**
* The key of {@link Column#getOptions()} to specify the column value is a
json format string.
*/
- JSON("Json"),
+ JSON("Json", false),
/** The key of {@link Column#getOptions()} to specify the column value is
a metadata field. */
- METADATA("Metadata"),
+ METADATA("Metadata", false),
/**
* The key of {@link SeaTunnelRow#getOptions()} to store the partition
value of the row value.
*/
- PARTITION("Partition"),
- ;
+ PARTITION("Partition", true),
+ /**
+ * The key of {@link SeaTunnelRow#getOptions()} to store the DATABASE
value of the row value.
+ */
+ DATABASE("Database", true),
+ /** The key of {@link SeaTunnelRow#getOptions()} to store the TABLE value
of the row value. */
+ TABLE("Table", true),
+ /**
+ * The key of {@link SeaTunnelRow#getOptions()} to store the ROW_KIND
value of the row value.
+ */
+ ROW_KIND("RowKind", true),
+ /**
+ * The key of {@link SeaTunnelRow#getOptions()} to store the EVENT_TIME
value of the row value.
+ */
+ EVENT_TIME("EventTime", true),
+ /** The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value
of the row value. */
+ DELAY("Delay", true);
private final String name;
+ private final boolean supportMetadataTrans;
- CommonOptions(String name) {
+ CommonOptions(String name, boolean supportMetadataTrans) {
this.name = name;
+ this.supportMetadataTrans = supportMetadataTrans;
+ }
+
+ public static CommonOptions fromName(String name) {
+ for (CommonOptions option : CommonOptions.values()) {
+ if (option.getName().equals(name)) {
+ return option;
+ }
+ }
+ throw new IllegalArgumentException("Unknown option name: " + name);
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
new file mode 100644
index 0000000000..42ab203576
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import static org.apache.seatunnel.api.table.type.CommonOptions.DELAY;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
+import static org.apache.seatunnel.api.table.type.CommonOptions.PARTITION;
+
+public class MetadataUtil {
+
+ public static final List<String> METADATA_FIELDS;
+
+ static {
+ METADATA_FIELDS = new ArrayList<>();
+ Stream.of(CommonOptions.values())
+ .filter(CommonOptions::isSupportMetadataTrans)
+ .map(CommonOptions::getName)
+ .forEach(METADATA_FIELDS::add);
+ }
+
+ public static void setDelay(SeaTunnelRow row, Long delay) {
+ row.getOptions().put(DELAY.getName(), delay);
+ }
+
+ public static void setPartition(SeaTunnelRow row, String[] partition) {
+ row.getOptions().put(PARTITION.getName(), partition);
+ }
+
+ public static void setEventTime(SeaTunnelRow row, Long delay) {
+ row.getOptions().put(EVENT_TIME.getName(), delay);
+ }
+
+ public static Long getDelay(SeaTunnelRowAccessor row) {
+ return (Long) row.getOptions().get(DELAY.getName());
+ }
+
+ public static String getDatabase(SeaTunnelRowAccessor row) {
+ if (row.getTableId() == null) {
+ return null;
+ }
+ return TablePath.of(row.getTableId()).getDatabaseName();
+ }
+
+ public static String getTable(SeaTunnelRowAccessor row) {
+ if (row.getTableId() == null) {
+ return null;
+ }
+ return TablePath.of(row.getTableId()).getTableName();
+ }
+
+ public static String getRowKind(SeaTunnelRowAccessor row) {
+ return row.getRowKind().shortString();
+ }
+
+ public static String getPartitionStr(SeaTunnelRowAccessor row) {
+ Object partition = row.getOptions().get(PARTITION.getName());
+ return Objects.nonNull(partition) ? String.join(",", (String[])
partition) : null;
+ }
+
+ public static String[] getPartition(SeaTunnelRowAccessor row) {
+ return (String[]) row.getOptions().get(PARTITION.getName());
+ }
+
+ public static Long getEventTime(SeaTunnelRowAccessor row) {
+ return (Long) row.getOptions().get(EVENT_TIME.getName());
+ }
+
+ public static boolean isMetadataField(String fieldName) {
+ return METADATA_FIELDS.contains(fieldName);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index b6da4eea7b..84e172f2df 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -34,10 +34,10 @@ public final class SeaTunnelRow implements Serializable {
/** The array to store the actual internal format values. */
private final Object[] fields;
- private volatile int size;
-
private Map<String, Object> options;
+ private volatile int size;
+
public SeaTunnelRow(int arity) {
this.fields = new Object[arity];
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowAccessor.java
similarity index 89%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowAccessor.java
index 5b97f34168..6bbca49cd5 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowAccessor.java
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform.common;
-
-import org.apache.seatunnel.api.table.type.RowKind;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+package org.apache.seatunnel.api.table.type;
import lombok.AllArgsConstructor;
+import java.util.Map;
+
@AllArgsConstructor
public class SeaTunnelRowAccessor {
private final SeaTunnelRow row;
@@ -45,4 +44,8 @@ public class SeaTunnelRowAccessor {
public Object[] getFields() {
return row.getFields();
}
+
+ public Map<String, Object> getOptions() {
+ return row.getOptions();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
index f7e9577ddb..1df22ee428 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
@@ -67,10 +67,9 @@ public class SourceRecordUtils {
}
/**
- * Return the timestamp when the change event is produced in MySQL.
- *
- * <p>The field `source.ts_ms` in {@link SourceRecord} data struct is the
time when the change
- * event is operated in MySQL.
+ * In the source object, ts_ms indicates the time that the change was made
in the database. By
+ * comparing the value for payload.source.ts_ms with the value for
payload.ts_ms, you can
+ * determine the lag between the source database update and Debezium.
*/
public static Long getMessageTimestamp(SourceRecord record) {
Schema schema = record.valueSchema();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index 948e872d48..117e84325e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import
org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -181,26 +182,39 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
} else {
converters = tableRowConverters.get(DEFAULT_TABLE_NAME_KEY);
}
-
+ Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(record);
+ Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(record);
+ long delay = -1L;
+ if (fetchTimestamp != null && messageTimestamp != null) {
+ delay = fetchTimestamp - messageTimestamp;
+ }
if (operation == Envelope.Operation.CREATE || operation ==
Envelope.Operation.READ) {
SeaTunnelRow insert = extractAfterRow(converters, record,
messageStruct, valueSchema);
insert.setRowKind(RowKind.INSERT);
insert.setTableId(tableId);
+ MetadataUtil.setDelay(insert, delay);
+ MetadataUtil.setEventTime(insert, fetchTimestamp);
collector.collect(insert);
} else if (operation == Envelope.Operation.DELETE) {
SeaTunnelRow delete = extractBeforeRow(converters, record,
messageStruct, valueSchema);
delete.setRowKind(RowKind.DELETE);
delete.setTableId(tableId);
+ MetadataUtil.setDelay(delete, delay);
+ MetadataUtil.setEventTime(delete, fetchTimestamp);
collector.collect(delete);
} else if (operation == Envelope.Operation.UPDATE) {
SeaTunnelRow before = extractBeforeRow(converters, record,
messageStruct, valueSchema);
before.setRowKind(RowKind.UPDATE_BEFORE);
before.setTableId(tableId);
+ MetadataUtil.setDelay(before, delay);
+ MetadataUtil.setEventTime(before, fetchTimestamp);
collector.collect(before);
SeaTunnelRow after = extractAfterRow(converters, record,
messageStruct, valueSchema);
after.setRowKind(RowKind.UPDATE_AFTER);
after.setTableId(tableId);
+ MetadataUtil.setDelay(after, delay);
+ MetadataUtil.setEventTime(after, fetchTimestamp);
collector.collect(after);
} else {
log.warn("Received {} operation, skip", operation);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index 4811217cf4..8b76d202d7 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -24,12 +24,14 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
@@ -111,18 +113,27 @@ public class MongoDBConnectorDeserializationSchema
log.debug("Ignore newly added table {}", tableId);
return;
}
-
+ Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(record);
+ Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(record);
+ long delay = -1L;
+ if (fetchTimestamp != null && messageTimestamp != null) {
+ delay = fetchTimestamp - messageTimestamp;
+ }
switch (op) {
case INSERT:
SeaTunnelRow insert = extractRowData(tableRowConverter,
fullDocument);
insert.setRowKind(RowKind.INSERT);
insert.setTableId(tableId);
+ MetadataUtil.setDelay(insert, delay);
+ MetadataUtil.setEventTime(insert, fetchTimestamp);
emit(record, insert, out);
break;
case DELETE:
SeaTunnelRow delete = extractRowData(tableRowConverter,
documentKey);
delete.setRowKind(RowKind.DELETE);
delete.setTableId(tableId);
+ MetadataUtil.setDelay(delete, delay);
+ MetadataUtil.setEventTime(delete, fetchTimestamp);
emit(record, delete, out);
break;
case UPDATE:
@@ -132,12 +143,16 @@ public class MongoDBConnectorDeserializationSchema
SeaTunnelRow updateAfter = extractRowData(tableRowConverter,
fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
updateAfter.setTableId(tableId);
+ MetadataUtil.setDelay(updateAfter, delay);
+ MetadataUtil.setEventTime(updateAfter, fetchTimestamp);
emit(record, updateAfter, out);
break;
case REPLACE:
SeaTunnelRow replaceAfter = extractRowData(tableRowConverter,
fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
replaceAfter.setTableId(tableId);
+ MetadataUtil.setDelay(replaceAfter, delay);
+ MetadataUtil.setEventTime(replaceAfter, fetchTimestamp);
emit(record, replaceAfter, out);
break;
case INVALIDATE:
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
index a8814c11ee..e985193498 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
@@ -67,5 +67,19 @@
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index 3789731354..7cf18c8032 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
@@ -45,6 +46,7 @@ import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Sorts;
import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
@@ -148,7 +150,9 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
}
@TestTemplate
- public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) {
+ public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container)
+ throws InterruptedException {
+ cleanSourceTable();
CompletableFuture.supplyAsync(
() -> {
try {
@@ -225,6 +229,45 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
});
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "This case requires obtaining the task health status and
manually canceling the canceled task, which is currently only supported by the
zeta engine.")
+ public void testMongodbCdcMetadataTrans(TestContainer container) throws
InterruptedException {
+ cleanSourceTable();
+ Long jobId = JobIdGenerator.newJobId();
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/mongodbcdc_metadata_trans.conf",
String.valueOf(jobId));
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException();
+ }
+ return null;
+ });
+ TimeUnit.SECONDS.sleep(10);
+ // insert update delete
+ upsertDeleteSourceTable();
+ TimeUnit.SECONDS.sleep(20);
+ await().atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ String jobStatus =
container.getJobStatus(String.valueOf(jobId));
+ Assertions.assertEquals("RUNNING", jobStatus);
+ });
+
+ try {
+ Container.ExecResult cancelJobResult =
container.cancelJob(String.valueOf(jobId));
+ Assertions.assertEquals(0, cancelJobResult.getExitCode(),
cancelJobResult.getStderr());
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf
new file mode 100644
index 0000000000..bc6475359c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ MongoDB-CDC {
+ hosts = "mongo0:27017"
+ database = ["inventory"]
+ collection = ["inventory.products"]
+ username = superuser
+ password = superpw
+ schema = {
+ fields {
+ "_id": string,
+ "name": string,
+ "description": string,
+ "weight": string
+ }
+ }
+ }
+}
+
+transform {
+ Metadata {
+ metadata_fields {
+ Database = database
+ Table = table
+ RowKind = rowKind
+ EventTime = ts_ms
+ Delay = delay
+ }
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "trans_result"
+ rules {
+ field_rules = [
+ {
+ field_name = database
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = table
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = rowKind
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = ts_ms
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = delay
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
index 539fce1890..8cb38aaffa 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
@@ -54,6 +54,20 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 2891907431..6f6467257f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -180,6 +180,45 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
});
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "This case requires obtaining the task health status and
manually canceling the canceled task, which is currently only supported by the
zeta engine.")
+ public void testMysqlCdcMetadataTrans(TestContainer container) throws
InterruptedException {
+ // Clear related content to ensure that multiple operations are not
affected
+ clearTable(MYSQL_DATABASE, SOURCE_TABLE_1);
+ clearTable(MYSQL_DATABASE, SINK_TABLE);
+ Long jobId = JobIdGenerator.newJobId();
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/mysqlcdc_to_metadata_trans.conf",
String.valueOf(jobId));
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ TimeUnit.SECONDS.sleep(10);
+ // insert update delete
+ upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
+ TimeUnit.SECONDS.sleep(10);
+ await().atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ String jobStatus =
container.getJobStatus(String.valueOf(jobId));
+ Assertions.assertEquals("RUNNING", jobStatus);
+ });
+ try {
+ Container.ExecResult cancelJobResult =
container.cancelJob(String.valueOf(jobId));
+ Assertions.assertEquals(0, cancelJobResult.getExitCode(),
cancelJobResult.getStderr());
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@TestTemplate
public void testMysqlCdcCheckDataWithDisableExactlyonce(TestContainer
container) {
// Clear related content to ensure that multiple operations are not
affected
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf
new file mode 100644
index 0000000000..8787c8987d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second = 7000000
+ read_limit.rows_per_second = 400
+}
+
+source {
+ MySQL-CDC {
+ result_table_name = "customers_mysql_cdc"
+ server-id = 5652
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+ }
+}
+
+transform {
+ Metadata {
+ metadata_fields {
+ Database = database
+ Table = table
+ RowKind = rowKind
+ EventTime = ts_ms
+ Delay = delay
+ }
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "trans_result"
+ rules {
+ field_rules = [
+ {
+ field_name = database
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = table
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = rowKind
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = ts_ms
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = delay
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
index f95e5cdb1a..b855c0d6d5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
@@ -57,6 +57,20 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
index ed3fdd74b4..35be6c0d12 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -199,6 +200,47 @@ public class OpengaussCDCIT extends TestSuiteBase
implements TestResource {
}
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "This case requires obtaining the task health status and
manually canceling the canceled task, which is currently only supported by the
zeta engine.")
+ public void testOpengaussCdcMeatadataTrans(TestContainer container)
+ throws InterruptedException, IOException {
+ try {
+ Long jobId = JobIdGenerator.newJobId();
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/opengausscdc_to_meatadata_trans.conf",
String.valueOf(jobId));
+ } catch (Exception e) {
+ log.error("Commit task exception :" +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ TimeUnit.SECONDS.sleep(10);
+ // insert update delete
+ upsertDeleteSourceTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1);
+
+ TimeUnit.SECONDS.sleep(20);
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ String jobStatus =
container.getJobStatus(String.valueOf(jobId));
+ Assertions.assertEquals("RUNNING", jobStatus);
+ });
+ Container.ExecResult cancelJobResult =
container.cancelJob(String.valueOf(jobId));
+ Assertions.assertEquals(0, cancelJobResult.getExitCode(),
cancelJobResult.getStderr());
+ } finally {
+ clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1);
+ clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_1);
+ }
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/resources/opengausscdc_to_meatadata_trans.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/resources/opengausscdc_to_meatadata_trans.conf
new file mode 100644
index 0000000000..9de18e661d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/resources/opengausscdc_to_meatadata_trans.conf
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ Opengauss-CDC {
+ result_table_name = "customers_opengauss_cdc"
+ username = "gaussdb"
+ password = "openGauss@123"
+ database-names = ["opengauss_cdc"]
+ schema-names = ["inventory"]
+ table-names = ["opengauss_cdc.inventory.opengauss_cdc_table_1"]
+ base-url =
"jdbc:postgresql://opengauss_cdc_e2e:5432/opengauss_cdc?loggerLevel=OFF"
+ decoding.plugin.name = "pgoutput"
+ }
+}
+
+transform {
+ Metadata {
+ metadata_fields {
+ Database = database
+ Table = table
+ RowKind = rowKind
+ EventTime = ts_ms
+ Delay = delay
+ }
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "trans_result"
+ rules {
+ field_rules = [
+ {
+ field_name = database
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = table
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = rowKind
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = ts_ms
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = delay
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
index 01c9b2c756..0757d1128f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
@@ -56,6 +56,21 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index d09b502d8f..3376ed74bc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -253,6 +253,48 @@ public class OracleCDCIT extends AbstractOracleCDCIT
implements TestResource {
});
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "This case requires obtaining the task health status and
manually canceling the canceled task, which is currently only supported by the
zeta engine.")
+ public void testOracleCdcMetadataTrans(TestContainer container) throws
Exception {
+
+ clearTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+ clearTable(DATABASE, SINK_TABLE1);
+
+ insertSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+ Long jobId = JobIdGenerator.newJobId();
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/oraclecdc_to_metadata_trans.conf",
String.valueOf(jobId));
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ TimeUnit.SECONDS.sleep(10);
+ // insert update delete
+ updateSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+ TimeUnit.SECONDS.sleep(20);
+ await().atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ String jobStatus =
container.getJobStatus(String.valueOf(jobId));
+ Assertions.assertEquals("RUNNING", jobStatus);
+ });
+ try {
+ Container.ExecResult cancelJobResult =
container.cancelJob(String.valueOf(jobId));
+ Assertions.assertEquals(0, cancelJobResult.getExitCode(),
cancelJobResult.getStderr());
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_metadata_trans.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_metadata_trans.conf
new file mode 100644
index 0000000000..6a24214f8c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_metadata_trans.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Oracle-CDC {
+ result_table_name = "customers"
+ username = "system"
+ password = "top_secret"
+ database-names = ["ORCLCDB"]
+ schema-names = ["DEBEZIUM"]
+ base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
+ source.reader.close.timeout = 120000
+ connection.pool.size = 1
+ debezium {
+ # log.mining.strategy = "online_catalog"
+ # log.mining.continuous.mine = true
+ database.oracle.jdbc.timezoneAsRegion = "false"
+ }
+
+ table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"]
+ table-names-config = [
+ {
+ table = "ORCLCDB.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"
+ primaryKeys = ["ID"]
+ }
+ ]
+
+ exactly_once = true
+ }
+}
+
+transform {
+ Metadata {
+ metadata_fields {
+ Database = database
+ Table = table
+ RowKind = rowKind
+ EventTime = ts_ms
+ Delay = delay
+ }
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "trans_result"
+ rules {
+ field_rules = [
+ {
+ field_name = database
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = table
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = rowKind
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = ts_ms
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = delay
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
index 0e78978295..bb152c2795 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
@@ -53,6 +53,20 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index 3abca057fb..acb9a2a41c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -184,6 +184,49 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
}
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "This case requires obtaining the task health status and
manually canceling the canceled task, which is currently only supported by the
zeta engine.")
+ public void testMPostgresCdcMetadataTrans(TestContainer container) throws
InterruptedException {
+
+ Long jobId = JobIdGenerator.newJobId();
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/postgrescdc_to_postgres.conf",
String.valueOf(jobId));
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ TimeUnit.SECONDS.sleep(10);
+ // insert update delete
+ upsertDeleteSourceTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_1);
+
+ TimeUnit.SECONDS.sleep(20);
+ await().atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ String jobStatus =
container.getJobStatus(String.valueOf(jobId));
+ Assertions.assertEquals("RUNNING", jobStatus);
+ });
+
+ try {
+ Container.ExecResult cancelJobResult =
container.cancelJob(String.valueOf(jobId));
+ Assertions.assertEquals(0, cancelJobResult.getExitCode(),
cancelJobResult.getStderr());
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ // Clear related content to ensure that multiple operations are
not affected
+ clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_1);
+ clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_1);
+ }
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_metadata_trans.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_metadata_trans.conf
new file mode 100644
index 0000000000..d0337069b0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_metadata_trans.conf
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ Postgres-CDC {
+ result_table_name = "customers_postgres_cdc"
+ username = "postgres"
+ password = "postgres"
+ database-names = ["postgres_cdc"]
+ schema-names = ["inventory"]
+ table-names = ["postgres_cdc.inventory.postgres_cdc_table_1"]
+ base-url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ decoding.plugin.name = "decoderbufs"
+ }
+}
+
+transform {
+ Metadata {
+ metadata_fields {
+ Database = database
+ Table = table
+ RowKind = rowKind
+ EventTime = ts_ms
+ Delay = delay
+ }
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "trans_result"
+ rules {
+ field_rules = [
+ {
+ field_name = database
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = table
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = rowKind
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = ts_ms
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = delay
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
index 0c5a0fa8e0..59a673fe86 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
@@ -51,6 +51,21 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index eb891be771..1b699d5805 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
@@ -307,6 +308,44 @@ public class SqlServerCDCIT extends TestSuiteBase
implements TestResource {
});
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "This case requires obtaining the task health status and
manually canceling the canceled task, which is currently only supported by the
zeta engine.")
+ public void testSqlServerCDCMetadataTrans(TestContainer container) throws
InterruptedException {
+ initializeSqlServerTable("column_type_test");
+
+ Long jobId = JobIdGenerator.newJobId();
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/sqlservercdc_to_metadata_trans.conf",
String.valueOf(jobId));
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ TimeUnit.SECONDS.sleep(10);
+ // insert update delete
+ updateSourceTable(SOURCE_TABLE_CUSTOM_PRIMARY_KEY);
+ TimeUnit.SECONDS.sleep(20);
+ await().atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ String jobStatus =
container.getJobStatus(String.valueOf(jobId));
+ Assertions.assertEquals("RUNNING", jobStatus);
+ });
+ try {
+ Container.ExecResult cancelJobResult =
container.cancelJob(String.valueOf(jobId));
+ Assertions.assertEquals(0, cancelJobResult.getExitCode(),
cancelJobResult.getStderr());
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Executes a JDBC statement using the default jdbc config without
autocommitting the
* connection.
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_metadata_trans.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_metadata_trans.conf
new file mode 100644
index 0000000000..49272fc5cf
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_metadata_trans.conf
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ SqlServer-CDC {
+ result_table_name = "customers"
+ username = "sa"
+ password = "Password!"
+ database-names = ["column_type_test"]
+ table-names = ["column_type_test.dbo.full_types_custom_primary_key"]
+ base-url =
"jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test"
+
+ exactly_once = true
+ table-names-config = [
+ {
+ table = "column_type_test.dbo.full_types_custom_primary_key"
+ primaryKeys = ["id"]
+ }
+ ]
+ }
+}
+
+transform {
+ Metadata {
+ metadata_fields {
+ Database = database
+ Table = table
+ RowKind = rowKind
+ EventTime = ts_ms
+ Delay = delay
+ }
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "trans_result"
+ rules {
+ field_rules = [
+ {
+ field_name = database
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = table
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = rowKind
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = ts_ms
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = delay
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index 72584158f6..b55cbae6c7 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -68,6 +68,10 @@ public interface TestContainer extends TestResource {
throw new UnsupportedOperationException("Not implemented");
}
+ default String getJobStatus(String jobId) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
String getServerLogs();
void copyFileToContainer(String path, String targetPath);
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index e33d89cc0a..03a0f87be8 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -17,7 +17,10 @@
package org.apache.seatunnel.e2e.common.container.seatunnel;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -25,6 +28,7 @@ import
org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.commons.compress.utils.Lists;
+import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -96,7 +100,7 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
"seatunnel-engine:" +
JDK_DOCKER_IMAGE)))
.waitingFor(Wait.forLogMessage(".*received new worker
register:.*", 1));
copySeaTunnelStarterToContainer(server);
- server.setPortBindings(Collections.singletonList("5801:5801"));
+ server.setPortBindings(Arrays.asList("5801:5801", "8080:8080"));
server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
@@ -490,6 +494,24 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
return cancelJob(server, jobId);
}
+ @Override
+ public String getJobStatus(String jobId) {
+ HttpGet get = new HttpGet("http://" + server.getHost() +
":8080/job-info/" + jobId);
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
+ CloseableHttpResponse response = client.execute(get);
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String jobStatus = EntityUtils.toString(response.getEntity());
+ ObjectNode jsonNodes = JsonUtils.parseObject(jobStatus);
+ if (jsonNodes.has("jobStatus")) {
+ return jsonNodes.get("jobStatus").asText();
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
@Override
public String getServerLogs() {
return server.getLogs();
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 80b928fcdc..2c3d39d0be 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -33,10 +33,10 @@ seatunnel:
plugin-config:
namespace: /tmp/seatunnel/checkpoint_snapshot/
http:
- enable-http: false
+ enable-http: true
port: 8080
telemetry:
metric:
enabled: false
logs:
- scheduled-deletion-enable: true
\ No newline at end of file
+ scheduled-deletion-enable: true
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
index e91765fbf3..960df97405 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
@@ -46,7 +46,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
@@ -84,7 +84,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.catalog.CatalogTable
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.*;
@@ -154,4 +154,4 @@ sink {
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf
index 8689404a17..5d226764c1 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf
@@ -43,7 +43,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.catalog.CatalogTable
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.*;
@@ -77,7 +77,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.catalog.CatalogTable
import
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.*;
@@ -154,4 +154,4 @@ sink {
]
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf
index 9e59a5e535..64272fef83 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf
@@ -46,7 +46,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
@@ -84,7 +84,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
@@ -156,4 +156,4 @@ sink {
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf
index 7958b88076..661f5562ee 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf
@@ -43,7 +43,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.catalog.CatalogTable
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.*;
@@ -108,4 +108,4 @@ sink {
]
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf
index 904066d69b..6aa16b64b9 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf
@@ -44,7 +44,7 @@ transform {
source_code="""
import cn.hutool.http.HttpUtil;
import org.apache.seatunnel.api.table.catalog.Column
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.catalog.CatalogTable
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.*;
@@ -112,4 +112,4 @@ Assert {
}
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf
index b65877d465..1f732bb306 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf
@@ -43,7 +43,7 @@ DynamicCompile {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
- import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
@@ -112,4 +112,4 @@ sink {
]
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile
index 9bb6a8fcdf..079e78d677 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile
@@ -15,7 +15,7 @@
* limitations under the License.
*/
import org.apache.seatunnel.api.table.catalog.Column
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.catalog.CatalogTable
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.*;
@@ -39,4 +39,4 @@ class demo {
fieldValues[0]="AA"
return fieldValues;
}
-};
\ No newline at end of file
+};
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile
index 7d1947c077..0fe36b01c0 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile
@@ -18,7 +18,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import java.util.ArrayList;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 51e1172412..7f6ac1fba7 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
index 5b97f34168..39dd951748 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import lombok.AllArgsConstructor;
@AllArgsConstructor
+@Deprecated
public class SeaTunnelRowAccessor {
private final SeaTunnelRow row;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index 13d25989ac..394242a41b 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
index 1718030db6..75712b5fc8 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
@@ -24,10 +24,10 @@ import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.exception.TransformCommonError;
import java.lang.reflect.Array;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
index ea55569420..bfae2b8d2a 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
@@ -20,10 +20,10 @@ package org.apache.seatunnel.transform.dynamiccompile;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.dynamiccompile.parse.AbstractParse;
import org.apache.seatunnel.transform.dynamiccompile.parse.GroovyClassParse;
import org.apache.seatunnel.transform.dynamiccompile.parse.JavaClassParse;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
index b35df6a448..51e4b8265f 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
@@ -25,6 +25,8 @@ import java.util.Map;
import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND;
+import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_FIELDS_NOT_FOUND;
+import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_MAPPING_FIELD_EXISTS;
/** The common error of SeaTunnel transform. Please refer {@link CommonError}
*/
public class TransformCommonError {
@@ -43,4 +45,18 @@ public class TransformCommonError {
params.put("transform", transform);
return new TransformException(INPUT_FIELDS_NOT_FOUND, params);
}
+
+ public static TransformException cannotFindMetadataFieldError(String
transform, String field) {
+ Map<String, String> params = new HashMap<>();
+ params.put("field", field);
+ params.put("transform", transform);
+ return new TransformException(METADATA_FIELDS_NOT_FOUND, params);
+ }
+
+ public static TransformException metadataMappingFieldExists(String
transform, String field) {
+ Map<String, String> params = new HashMap<>();
+ params.put("field", field);
+ params.put("transform", transform);
+ return new TransformException(METADATA_MAPPING_FIELD_EXISTS, params);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
index dc5008ec04..b0d72d7cf1 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
@@ -25,7 +25,13 @@ public enum TransformCommonErrorCode implements
SeaTunnelErrorCode {
"The input field '<field>' of '<transform>' transform not found in
upstream schema"),
INPUT_FIELDS_NOT_FOUND(
"TRANSFORM_COMMON-02",
- "The input fields '<fields>' of '<transform>' transform not found
in upstream schema");
+ "The input fields '<fields>' of '<transform>' transform not found
in upstream schema"),
+ METADATA_FIELDS_NOT_FOUND(
+ "TRANSFORM_COMMON-03",
+ "The metadata fields '<field>' of '<transform>' transform not
found "),
+ METADATA_MAPPING_FIELD_EXISTS(
+ "TRANSFORM_COMMON-04",
+ "The metadata mapping field '<field>' of '<transform>' transform
already exists in upstream schema");
private final String code;
private final String description;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
index 3e14e8488d..7978aa2260 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
@@ -23,12 +23,12 @@ import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.format.json.JsonToRowConverters;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
import org.apache.seatunnel.transform.exception.TransformCommonError;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
new file mode 100644
index 0000000000..2a1679b500
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.metadata;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.CommonOptions;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.NonNull;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.seatunnel.api.table.type.MetadataUtil.isMetadataField;
+
+public class MetadataTransform extends MultipleFieldOutputTransform {
+
+ private List<String> fieldNames;
+ private Map<String, String> metadataFieldMapping;
+
+ public MetadataTransform(ReadonlyConfig config, @NonNull CatalogTable
inputCatalogTable) {
+ super(inputCatalogTable);
+ initOutputFields(inputCatalogTable,
config.get(MetadataTransformConfig.METADATA_FIELDS));
+ }
+
+ private void initOutputFields(CatalogTable inputCatalogTable, Map<String,
String> fields) {
+ List<String> sourceTableFiledNames =
+
Arrays.asList(inputCatalogTable.getTableSchema().getFieldNames());
+ List<String> fieldNames = new ArrayList<>();
+ for (Map.Entry<String, String> field : fields.entrySet()) {
+ String srcField = field.getKey();
+ if (!isMetadataField(srcField)) {
+ throw
TransformCommonError.cannotFindMetadataFieldError(getPluginName(), srcField);
+ }
+ String targetField = field.getValue();
+ if (sourceTableFiledNames.contains(targetField)) {
+ throw
TransformCommonError.metadataMappingFieldExists(getPluginName(), srcField);
+ }
+ fieldNames.add(field.getKey());
+ }
+ this.fieldNames = fieldNames;
+ this.metadataFieldMapping = fields;
+ }
+
+ @Override
+ public String getPluginName() {
+ return MetadataTransformConfig.PLUGIN_NAME;
+ }
+
+ @Override
+ protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+ Object[] value = new Object[fieldNames.size()];
+ for (Map.Entry<String, String> mapping :
metadataFieldMapping.entrySet()) {
+ String metadataFieldName = mapping.getKey();
+ String mappingFieldName = mapping.getValue();
+ int i = fieldNames.indexOf(metadataFieldName);
+ Object fieldValue = null;
+ switch (CommonOptions.fromName(metadataFieldName)) {
+ case DATABASE:
+ fieldValue = MetadataUtil.getDatabase(inputRow);
+ break;
+ case TABLE:
+ fieldValue = MetadataUtil.getTable(inputRow);
+ break;
+ case ROW_KIND:
+ fieldValue = MetadataUtil.getRowKind(inputRow);
+ break;
+ case DELAY:
+ fieldValue = MetadataUtil.getDelay(inputRow);
+ break;
+ case EVENT_TIME:
+ fieldValue = MetadataUtil.getEventTime(inputRow);
+ break;
+ case PARTITION:
+ fieldValue = MetadataUtil.getPartitionStr(inputRow);
+ break;
+ default:
+ throw TransformCommonError.cannotFindMetadataFieldError(
+ getPluginName(), mappingFieldName);
+ }
+ value[i] = fieldValue;
+ }
+ return value;
+ }
+
+ @Override
+ protected Column[] getOutputColumns() {
+ Column[] columns = new Column[fieldNames.size()];
+ for (Map.Entry<String, String> mapping :
metadataFieldMapping.entrySet()) {
+ String metadataFieldName = mapping.getKey();
+ String mappingFieldName = mapping.getValue();
+ int i = fieldNames.indexOf(metadataFieldName);
+ Column column;
+ switch (CommonOptions.fromName(metadataFieldName)) {
+ case DATABASE:
+ case TABLE:
+ case ROW_KIND:
+ case PARTITION:
+ column =
+ PhysicalColumn.of(
+ mappingFieldName,
+ BasicType.STRING_TYPE,
+ (Long) null,
+ null,
+ true,
+ null,
+ null);
+ break;
+ case DELAY:
+ case EVENT_TIME:
+ column =
+ PhysicalColumn.of(
+ mappingFieldName,
+ BasicType.LONG_TYPE,
+ (Long) null,
+ null,
+ true,
+ null,
+ null);
+ break;
+ default:
+ throw TransformCommonError.cannotFindMetadataFieldError(
+ getPluginName(), mappingFieldName);
+ }
+ columns[i] = column;
+ }
+ return columns;
+ }
+
+ @VisibleForTesting
+ public void initRowContainerGenerator() {
+ transformTableSchema();
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java
similarity index 53%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
copy to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java
index 5b97f34168..fe9971d3a1 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java
@@ -15,34 +15,22 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform.common;
+package org.apache.seatunnel.transform.metadata;
-import org.apache.seatunnel.api.table.type.RowKind;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.transform.nlpmodel.ModelTransformConfig;
-import lombok.AllArgsConstructor;
+import java.util.Map;
-@AllArgsConstructor
-public class SeaTunnelRowAccessor {
- private final SeaTunnelRow row;
+public class MetadataTransformConfig extends ModelTransformConfig {
- public int getArity() {
- return row.getArity();
- }
+ public static final String PLUGIN_NAME = "Metadata";
- public String getTableId() {
- return row.getTableId();
- }
-
- public RowKind getRowKind() {
- return row.getRowKind();
- }
-
- public Object getField(int pos) {
- return row.getField(pos);
- }
-
- public Object[] getFields() {
- return row.getFields();
- }
+ public static final Option<Map<String, String>> METADATA_FIELDS =
+ Options.key("metadata_fields")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify the metadata field relationship between
input and output");
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java
new file mode 100644
index 0000000000..ebbacb5cd0
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.metadata;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MetadataTransformFactory implements TableTransformFactory {
+ @Override
+ public String factoryIdentifier() {
+ return MetadataTransformConfig.PLUGIN_NAME;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return
OptionRule.builder().required(MetadataTransformConfig.METADATA_FIELDS).build();
+ }
+
+ @Override
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ return () -> new MetadataTransform(context.getOptions(),
context.getCatalogTables().get(0));
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java
index 9e77043f0a..ce6d864da6 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java
@@ -21,10 +21,10 @@ import
org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.VectorType;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.exception.TransformCommonError;
import org.apache.seatunnel.transform.nlpmodel.ModelProvider;
import org.apache.seatunnel.transform.nlpmodel.ModelTransformConfig;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java
index 069945951b..c99b03776e 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
import org.apache.seatunnel.transform.nlpmodel.ModelProvider;
import org.apache.seatunnel.transform.nlpmodel.ModelTransformConfig;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
index 5c5451fce7..b2c9fa44ce 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.transform.replace;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
import org.apache.seatunnel.transform.exception.TransformCommonError;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
index ae7b1fcaa1..354e5a3dd5 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
import com.google.common.annotations.VisibleForTesting;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
index c1ead2dd0b..922cdbd97c 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.exception.TransformCommonError;
import lombok.NonNull;
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
new file mode 100644
index 0000000000..a3ddf1ced4
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.metadata;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetadataTransformTest {
+
+ static CatalogTable catalogTable;
+
+ static Object[] values;
+
+ static SeaTunnelRow inputRow;
+
+ static Long eventTime;
+
+ @BeforeAll
+ static void setUp() {
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", TablePath.DEFAULT),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "key1",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key2",
+ BasicType.INT_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key3",
+ BasicType.LONG_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key4",
+ BasicType.DOUBLE_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key5",
+ BasicType.FLOAT_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ "comment");
+ values = new Object[] {"value1", 1, 896657703886127105L, 3.1415916,
3.14};
+ inputRow = new SeaTunnelRow(values);
+ inputRow.setTableId(TablePath.DEFAULT.getFullName());
+ eventTime =
LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli();
+ MetadataUtil.setDelay(inputRow, 150L);
+ MetadataUtil.setEventTime(inputRow, eventTime);
+ MetadataUtil.setPartition(inputRow, Arrays.asList("key1",
"key2").toArray(new String[0]));
+ }
+
+ @Test
+ void testMetadataTransform() {
+ Map<String, String> metadataMapping = new HashMap<>();
+ metadataMapping.put("Database", "database");
+ metadataMapping.put("Table", "table");
+ metadataMapping.put("Partition", "partition");
+ metadataMapping.put("RowKind", "rowKind");
+ metadataMapping.put("EventTime", "ts_ms");
+ metadataMapping.put("Delay", "delay");
+ Map<String, Object> config = new HashMap<>();
+ config.put("metadata_fields", metadataMapping);
+ MetadataTransform transform =
+ new MetadataTransform(ReadonlyConfig.fromMap(config),
catalogTable);
+ transform.initRowContainerGenerator();
+ SeaTunnelRow outputRow = transform.map(inputRow);
+ Assertions.assertEquals(values.length + 6, outputRow.getArity());
+ Assertions.assertEquals(
+ "SeaTunnelRow{tableId=default.default.default, kind=+I,
fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, key1,key2, default, "
+ + eventTime
+ + ", +I, default, 150]}",
+ outputRow.toString());
+ }
+}