This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 385814e7f1 [Improve][API] Add metadata schema into catalog table 
(#9586)
385814e7f1 is described below

commit 385814e7f11d55918dd5153c14ea80ff1dbc1f59
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jul 31 21:39:01 2025 +0800

    [Improve][API] Add metadata schema into catalog table (#9586)
---
 docs/en/transform-v2/metadata.md                   |  2 +-
 docs/zh/transform-v2/metadata.md                   |  2 +-
 .../{TableSchema.java => AbstractSchema.java}      | 69 +++----------------
 .../seatunnel/api/table/catalog/CatalogTable.java  | 55 +++++++++++++---
 .../api/table/catalog/MetadataColumn.java          | 27 +++-----
 .../api/table/catalog/MetadataSchema.java          | 63 ++++++++++++++++++
 .../seatunnel/api/table/catalog/TableSchema.java   | 52 ++-------------
 .../seatunnel/api/table/type/MetadataUtil.java     | 14 ----
 .../assertion/excecutor/AssertExecutor.java        |  7 ++
 .../cdc/base/source/IncrementalSource.java         | 43 +++++++++++-
 .../seatunnel/milvus/utils/MilvusConvertUtils.java | 20 +++++-
 .../e2e/connector/doris/AbstractDorisIT.java       |  5 ++
 .../common/AbstractSeaTunnelTransform.java         |  4 +-
 .../common/MultipleFieldOutputTransform.java       |  1 +
 .../common/SingleFieldOutputTransform.java         |  1 +
 .../transform/metadata/MetadataTransform.java      | 46 +++++--------
 .../transform/sql/zeta/ZetaSQLEngine.java          |  1 +
 .../transform/sql/zeta/ZetaSQLFunction.java        |  1 +
 .../transform/metadata/MetadataTransformTest.java  | 77 ++++++++++++++++++++--
 19 files changed, 301 insertions(+), 189 deletions(-)

diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md
index 8656278d20..536038b4d8 100644
--- a/docs/en/transform-v2/metadata.md
+++ b/docs/en/transform-v2/metadata.md
@@ -17,7 +17,7 @@ Metadata transform plugin for adding metadata fields to data
 | Partition |  string  | Contains the partition field of the corresponding 
number table of the row, multiple using `,` join |
 
 ### note
-    `Delay` `Partition` only worked on cdc series connectors for now , except 
TiDB-CDC
+    `Delay` `EventTime` only worked on cdc series connectors for now , except 
TiDB-CDC
 
 ## Options
 
diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md
index aa4ae3d6b4..2e92c1282f 100644
--- a/docs/zh/transform-v2/metadata.md
+++ b/docs/zh/transform-v2/metadata.md
@@ -17,7 +17,7 @@
 | Partition |  string  | 包含该行对应数表的分区字段,多个使用`,`连接 |
 
 ### 注意事项
-    `Delay` `Partition`目前只适用于cdc系列连接器,除外TiDB-CDC
+    `Delay` `EventTime`目前只适用于cdc系列连接器,TiDB-CDC除外
 
 ## 配置选项
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/AbstractSchema.java
similarity index 54%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/AbstractSchema.java
index 2238da2617..3ed52733a3 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/AbstractSchema.java
@@ -32,27 +32,22 @@ import java.util.stream.Collectors;
 
 /** Represent a physical table schema. */
 @Data
-public final class TableSchema implements Serializable {
+public class AbstractSchema implements Serializable {
     private static final long serialVersionUID = 1L;
-    private final List<Column> columns;
+    protected final List<Column> columns;
 
     @Getter(AccessLevel.PRIVATE)
-    private final List<String> columnNames;
+    protected final List<String> columnNames;
 
-    private final PrimaryKey primaryKey;
-
-    private final List<ConstraintKey> constraintKeys;
-
-    public TableSchema(
-            List<Column> columns, PrimaryKey primaryKey, List<ConstraintKey> 
constraintKeys) {
+    public AbstractSchema(List<Column> columns) {
         this.columns = columns;
         this.columnNames = 
columns.stream().map(Column::getName).collect(Collectors.toList());
-        this.primaryKey = primaryKey;
-        this.constraintKeys = constraintKeys;
     }
 
-    public static Builder builder() {
-        return new Builder();
+    // Lombok requires a no-arg constructor for @Data annotation to work 
properly
+    private AbstractSchema() {
+        this.columns = new ArrayList<>();
+        this.columnNames = new ArrayList<>();
     }
 
     public SeaTunnelRowType toPhysicalRowDataType() {
@@ -88,52 +83,4 @@ public final class TableSchema implements Serializable {
     public List<Column> getColumns() {
         return Collections.unmodifiableList(columns);
     }
-
-    public static final class Builder {
-        private final List<Column> columns = new ArrayList<>();
-
-        private PrimaryKey primaryKey;
-
-        private final List<ConstraintKey> constraintKeys = new ArrayList<>();
-
-        public Builder columns(List<Column> columns) {
-            this.columns.addAll(columns);
-            return this;
-        }
-
-        public Builder column(Column column) {
-            this.columns.add(column);
-            return this;
-        }
-
-        public Builder primaryKey(PrimaryKey primaryKey) {
-            this.primaryKey = primaryKey;
-            return this;
-        }
-
-        public Builder constraintKey(ConstraintKey constraintKey) {
-            this.constraintKeys.add(constraintKey);
-            return this;
-        }
-
-        public Builder constraintKey(List<ConstraintKey> constraintKeys) {
-            this.constraintKeys.addAll(constraintKeys);
-            return this;
-        }
-
-        public TableSchema build() {
-            return new TableSchema(columns, primaryKey, constraintKeys);
-        }
-    }
-
-    public TableSchema copy() {
-        List<Column> copyColumns = 
columns.stream().map(Column::copy).collect(Collectors.toList());
-        List<ConstraintKey> copyConstraintKeys =
-                
constraintKeys.stream().map(ConstraintKey::copy).collect(Collectors.toList());
-        return TableSchema.builder()
-                .constraintKey(copyConstraintKeys)
-                .columns(copyColumns)
-                .primaryKey(primaryKey == null ? null : primaryKey.copy())
-                .build();
-    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
index f3ec98eba4..2de5b63111 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
@@ -40,6 +40,8 @@ public final class CatalogTable implements Serializable {
 
     private final List<String> partitionKeys;
 
+    private final MetadataSchema metadata;
+
     private final String comment;
 
     private final String catalogName;
@@ -52,7 +54,8 @@ public final class CatalogTable implements Serializable {
                 newTable.getOptions(),
                 newTable.getPartitionKeys(),
                 newTable.getComment(),
-                newTable.getCatalogName());
+                newTable.getCatalogName(),
+                newTable.getMetadataSchema());
     }
 
     public static CatalogTable of(
@@ -62,7 +65,13 @@ public final class CatalogTable implements Serializable {
             List<String> partitionKeys,
             String comment) {
         return new CatalogTable(
-                tableId, tableSchema, options, partitionKeys, comment, 
tableId.getCatalogName());
+                tableId,
+                tableSchema,
+                options,
+                partitionKeys,
+                comment,
+                tableId.getCatalogName(),
+                MetadataSchema.builder().build());
     }
 
     public static CatalogTable of(
@@ -72,16 +81,37 @@ public final class CatalogTable implements Serializable {
             List<String> partitionKeys,
             String comment,
             String catalogName) {
-        return new CatalogTable(tableId, tableSchema, options, partitionKeys, 
comment, catalogName);
+        return new CatalogTable(
+                tableId,
+                tableSchema,
+                options,
+                partitionKeys,
+                comment,
+                catalogName,
+                MetadataSchema.builder().build());
     }
 
-    private CatalogTable(
+    public static CatalogTable of(
             TableIdentifier tableId,
             TableSchema tableSchema,
             Map<String, String> options,
             List<String> partitionKeys,
-            String comment) {
-        this(tableId, tableSchema, options, partitionKeys, comment, 
tableId.getCatalogName());
+            String comment,
+            String catalogName,
+            MetadataSchema metadata) {
+        return new CatalogTable(
+                tableId, tableSchema, options, partitionKeys, comment, 
catalogName, metadata);
+    }
+
+    public static CatalogTable withMetadata(CatalogTable catalogTable, 
MetadataSchema metadata) {
+        return new CatalogTable(
+                catalogTable.getTableId(),
+                catalogTable.getTableSchema(),
+                catalogTable.getOptions(),
+                catalogTable.getPartitionKeys(),
+                catalogTable.getComment(),
+                catalogTable.getCatalogName(),
+                metadata);
     }
 
     private CatalogTable(
@@ -90,7 +120,8 @@ public final class CatalogTable implements Serializable {
             Map<String, String> options,
             List<String> partitionKeys,
             String comment,
-            String catalogName) {
+            String catalogName,
+            MetadataSchema metadata) {
         this.tableId = tableId;
         this.tableSchema = tableSchema;
         // Make sure the options and partitionKeys are mutable
@@ -98,6 +129,7 @@ public final class CatalogTable implements Serializable {
         this.partitionKeys = new ArrayList<>(partitionKeys);
         this.comment = comment;
         this.catalogName = catalogName;
+        this.metadata = metadata;
     }
 
     public CatalogTable copy() {
@@ -107,7 +139,8 @@ public final class CatalogTable implements Serializable {
                 new HashMap<>(options),
                 new ArrayList<>(partitionKeys),
                 comment,
-                catalogName);
+                catalogName,
+                metadata);
     }
 
     public TableIdentifier getTableId() {
@@ -142,6 +175,10 @@ public final class CatalogTable implements Serializable {
         return catalogName;
     }
 
+    public MetadataSchema getMetadataSchema() {
+        return metadata;
+    }
+
     @Override
     public String toString() {
         return "CatalogTable{"
@@ -159,6 +196,8 @@ public final class CatalogTable implements Serializable {
                 + ", catalogName='"
                 + catalogName
                 + '\''
+                + ", metadata="
+                + metadata
                 + '}';
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
index e0e7e9e991..eab4b36578 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
@@ -29,30 +29,24 @@ import lombok.ToString;
 public class MetadataColumn extends Column {
     private static final long serialVersionUID = 1L;
 
-    private final String metadataKey;
-
     protected MetadataColumn(
             String name,
             SeaTunnelDataType<?> dataType,
             Long columnLength,
-            String metadataKey,
             boolean nullable,
             Object defaultValue,
             String comment) {
         super(name, dataType, columnLength, nullable, defaultValue, comment);
-        this.metadataKey = metadataKey;
     }
 
     public static MetadataColumn of(
             String name,
             SeaTunnelDataType<?> dataType,
             Long columnLength,
-            String metadataKey,
             boolean nullable,
             Object defaultValue,
             String comment) {
-        return new MetadataColumn(
-                name, dataType, columnLength, metadataKey, nullable, 
defaultValue, comment);
+        return new MetadataColumn(name, dataType, columnLength, nullable, 
defaultValue, comment);
     }
 
     @Override
@@ -62,26 +56,23 @@ public class MetadataColumn extends Column {
 
     @Override
     public Column copy(SeaTunnelDataType<?> newType) {
-        return MetadataColumn.of(
-                name, newType, columnLength, metadataKey, nullable, 
defaultValue, comment);
+        return MetadataColumn.of(name, newType, columnLength, nullable, 
defaultValue, comment);
     }
 
     @Override
     public Column copy() {
-        return MetadataColumn.of(
-                name, dataType, columnLength, metadataKey, nullable, 
defaultValue, comment);
+        return MetadataColumn.of(name, dataType, columnLength, nullable, 
defaultValue, comment);
     }
 
     @Override
     public Column rename(String newColumnName) {
         return MetadataColumn.of(
-                newColumnName,
-                dataType,
-                columnLength,
-                metadataKey,
-                nullable,
-                defaultValue,
-                comment);
+                newColumnName, dataType, columnLength, nullable, defaultValue, 
comment);
+    }
+
+    public PhysicalColumn toPhysicalColumn() {
+        return PhysicalColumn.of(
+                name, dataType, columnLength, scale, nullable, defaultValue, 
comment);
     }
 
     @Override
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataSchema.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataSchema.java
new file mode 100644
index 0000000000..6c5f658439
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataSchema.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Represent a physical table schema. */
+@EqualsAndHashCode(callSuper = true)
+@Data
+public final class MetadataSchema extends AbstractSchema {
+    private static final long serialVersionUID = 1L;
+
+    public MetadataSchema(List<Column> columns) {
+        super(columns);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static final class Builder {
+        private final List<Column> columns = new ArrayList<>();
+
+        public Builder columns(List<Column> columns) {
+            this.columns.addAll(columns);
+            return this;
+        }
+
+        public Builder column(Column column) {
+            this.columns.add(column);
+            return this;
+        }
+
+        public MetadataSchema build() {
+            return new MetadataSchema(columns);
+        }
+    }
+
+    public MetadataSchema copy() {
+        List<Column> copyColumns = 
columns.stream().map(Column::copy).collect(Collectors.toList());
+        return MetadataSchema.builder().columns(copyColumns).build();
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 2238da2617..3f1d986317 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -17,27 +17,18 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import lombok.AccessLevel;
 import lombok.Data;
-import lombok.Getter;
+import lombok.EqualsAndHashCode;
 
-import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
 /** Represent a physical table schema. */
+@EqualsAndHashCode(callSuper = true)
 @Data
-public final class TableSchema implements Serializable {
+public final class TableSchema extends AbstractSchema {
     private static final long serialVersionUID = 1L;
-    private final List<Column> columns;
-
-    @Getter(AccessLevel.PRIVATE)
-    private final List<String> columnNames;
 
     private final PrimaryKey primaryKey;
 
@@ -45,8 +36,7 @@ public final class TableSchema implements Serializable {
 
     public TableSchema(
             List<Column> columns, PrimaryKey primaryKey, List<ConstraintKey> 
constraintKeys) {
-        this.columns = columns;
-        this.columnNames = 
columns.stream().map(Column::getName).collect(Collectors.toList());
+        super(columns);
         this.primaryKey = primaryKey;
         this.constraintKeys = constraintKeys;
     }
@@ -55,40 +45,6 @@ public final class TableSchema implements Serializable {
         return new Builder();
     }
 
-    public SeaTunnelRowType toPhysicalRowDataType() {
-        SeaTunnelDataType<?>[] fieldTypes =
-                columns.stream()
-                        .filter(Column::isPhysical)
-                        .map(Column::getDataType)
-                        .toArray(SeaTunnelDataType[]::new);
-        String[] fields =
-                columns.stream()
-                        .filter(Column::isPhysical)
-                        .map(Column::getName)
-                        .toArray(String[]::new);
-        return new SeaTunnelRowType(fields, fieldTypes);
-    }
-
-    public String[] getFieldNames() {
-        return columnNames.toArray(new String[0]);
-    }
-
-    public int indexOf(String columnName) {
-        return columnNames.indexOf(columnName);
-    }
-
-    public Column getColumn(String columnName) {
-        return columns.get(indexOf(columnName));
-    }
-
-    public boolean contains(String columnName) {
-        return columnNames.contains(columnName);
-    }
-
-    public List<Column> getColumns() {
-        return Collections.unmodifiableList(columns);
-    }
-
     public static final class Builder {
         private final List<Column> columns = new ArrayList<>();
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
index 42ab203576..ed5fb4d615 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 import java.util.stream.Stream;
 
 import static org.apache.seatunnel.api.table.type.CommonOptions.DELAY;
@@ -52,10 +51,6 @@ public class MetadataUtil {
         row.getOptions().put(EVENT_TIME.getName(), delay);
     }
 
-    public static Long getDelay(SeaTunnelRowAccessor row) {
-        return (Long) row.getOptions().get(DELAY.getName());
-    }
-
     public static String getDatabase(SeaTunnelRowAccessor row) {
         if (row.getTableId() == null) {
             return null;
@@ -74,19 +69,10 @@ public class MetadataUtil {
         return row.getRowKind().shortString();
     }
 
-    public static String getPartitionStr(SeaTunnelRowAccessor row) {
-        Object partition = row.getOptions().get(PARTITION.getName());
-        return Objects.nonNull(partition) ? String.join(",", (String[]) 
partition) : null;
-    }
-
     public static String[] getPartition(SeaTunnelRowAccessor row) {
         return (String[]) row.getOptions().get(PARTITION.getName());
     }
 
-    public static Long getEventTime(SeaTunnelRowAccessor row) {
-        return (Long) row.getOptions().get(EVENT_TIME.getName());
-    }
-
     public static boolean isMetadataField(String fieldName) {
         return METADATA_FIELDS.contains(fieldName);
     }
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
index 53f8a40194..19778eb67e 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
@@ -75,6 +75,13 @@ public class AssertExecutor {
                         Lists.newArrayList(rowType.getFieldNames()),
                         fieldName -> 
fieldName.equals(assertFieldRule.getFieldName()));
 
+        if (index == -1) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Field name %s not found in row type %s",
+                            assertFieldRule.getFieldName(), rowType));
+        }
+
         SeaTunnelDataType<?> type = rowType.getFieldType(index);
         Object value = rowData.getField(index);
         String fieldName = rowType.getFieldName(index);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 153514a53f..85340c4a3a 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -27,6 +27,11 @@ import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.MetadataColumn;
+import org.apache.seatunnel.api.table.catalog.MetadataSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.CommonOptions;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
@@ -62,6 +67,7 @@ import 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJs
 import io.debezium.relational.TableId;
 import lombok.NoArgsConstructor;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -94,8 +100,8 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
     protected DebeziumDeserializationSchema<T> deserializationSchema;
 
     protected IncrementalSource(ReadonlyConfig options, List<CatalogTable> 
catalogTables) {
-        this.catalogTables = catalogTables;
         this.readonlyConfig = options;
+        this.catalogTables = updateCatalogTableMetadata(catalogTables);
         this.startupConfig = getStartupConfig(readonlyConfig);
         this.stopConfig = getStopConfig(readonlyConfig);
         this.stopMode = stopConfig.getStopMode();
@@ -114,6 +120,41 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
                 config.get(SourceOptions.STARTUP_TIMESTAMP));
     }
 
+    private List<CatalogTable> updateCatalogTableMetadata(List<CatalogTable> 
catalogTables) {
+        return catalogTables.stream()
+                .map(
+                        table -> {
+                            if (DeserializeFormat.DEFAULT.equals(
+                                    
readonlyConfig.get(JdbcSourceOptions.FORMAT))) {
+                                return CatalogTable.withMetadata(table, 
getMetadataColumns());
+                            } else {
+                                return table;
+                            }
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private MetadataSchema getMetadataColumns() {
+        List<Column> metadata = new ArrayList<>();
+        metadata.add(
+                MetadataColumn.of(
+                        CommonOptions.EVENT_TIME.getName(),
+                        BasicType.LONG_TYPE,
+                        (Long) null,
+                        true,
+                        null,
+                        null));
+        metadata.add(
+                MetadataColumn.of(
+                        CommonOptions.DELAY.getName(),
+                        BasicType.LONG_TYPE,
+                        (Long) null,
+                        true,
+                        null,
+                        null));
+        return MetadataSchema.builder().columns(metadata).build();
+    }
+
     private StopConfig getStopConfig(ReadonlyConfig config) {
         return new StopConfig(
                 config.get(getStopModeOption()),
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
index ae9b7ccde3..df5c807690 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
@@ -21,12 +21,15 @@ import 
org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.MetadataColumn;
+import org.apache.seatunnel.api.table.catalog.MetadataSchema;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.VectorIndex;
+import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.CommonOptions;
 import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
@@ -196,14 +199,29 @@ public class MilvusConvertUtils {
         options.put(
                 MilvusOptions.ENABLE_DYNAMIC_FIELD, 
String.valueOf(schema.getEnableDynamicField()));
         options.put(MilvusOptions.SHARDS_NUM, 
String.valueOf(collectionResponse.getShardsNum()));
+        MetadataSchema.Builder metadataBuilder = MetadataSchema.builder();
         if (existPartitionKeyField) {
             options.put(MilvusOptions.PARTITION_KEY_FIELD, partitionKeyField);
+            metadataBuilder.column(
+                    MetadataColumn.of(
+                            CommonOptions.PARTITION.getName(),
+                            BasicType.STRING_TYPE,
+                            null,
+                            true,
+                            null,
+                            null));
         } else {
             fillPartitionNames(options, client, database, collection);
         }
 
         return CatalogTable.of(
-                tableId, tableSchema, options, new ArrayList<>(), 
schema.getDescription());
+                tableId,
+                tableSchema,
+                options,
+                new ArrayList<>(),
+                schema.getDescription(),
+                tableId.getCatalogName(),
+                metadataBuilder.build());
     }
 
     private static void fillPartitionNames(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
index 7777d9cd24..dcd0f6f7bd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.lifecycle.Startables;
@@ -129,6 +130,10 @@ public abstract class AbstractDorisIT extends 
TestSuiteBase implements TestResou
             while (beResultSet.next()) {
                 beList.add(beResultSet.getString("Host"));
             }
+            if (beList.isEmpty()) {
+                log.error("doris BE is empty, skip initialization");
+                Assertions.fail("doris BE is empty, skip initialization");
+            }
             if (beList.stream().anyMatch("127.0.0.1"::equals)) {
                 ResultSet resultSet = statement.executeQuery(SHOW_FE);
                 String feIp = null;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index 01ce7eaf0a..edbba3ff08 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -72,7 +72,9 @@ public abstract class AbstractSeaTunnelTransform<T, R> 
implements SeaTunnelTrans
                 tableSchema,
                 inputCatalogTable.getOptions(),
                 inputCatalogTable.getPartitionKeys(),
-                inputCatalogTable.getComment());
+                inputCatalogTable.getComment(),
+                inputCatalogTable.getTableId().getCatalogName(),
+                inputCatalogTable.getMetadataSchema());
     }
 
     public R transform(SeaTunnelRow row) {
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 84e3a9348d..f385b3cfd6 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -137,6 +137,7 @@ public abstract class MultipleFieldOutputTransform extends 
AbstractCatalogSuppor
                             SeaTunnelRow outputRow = new 
SeaTunnelRow(outputFieldValues);
                             outputRow.setTableId(inputRow.getTableId());
                             outputRow.setRowKind(inputRow.getRowKind());
+                            outputRow.setOptions(inputRow.getOptions());
                             return outputRow;
                         }
                     };
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index 8768069ab8..7551098336 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -117,6 +117,7 @@ public abstract class SingleFieldOutputTransform extends 
AbstractCatalogSupportM
                             SeaTunnelRow outputRow = new 
SeaTunnelRow(outputFieldValues);
                             outputRow.setTableId(inputRow.getTableId());
                             outputRow.setRowKind(inputRow.getRowKind());
+                            outputRow.setOptions(inputRow.getOptions());
                             return outputRow;
                         }
                     };
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
index f37028e070..6db61c900d 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
@@ -22,6 +22,8 @@ import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.MetadataColumn;
+import org.apache.seatunnel.api.table.catalog.MetadataSchema;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.CommonOptions;
@@ -42,6 +44,7 @@ import static 
org.apache.seatunnel.api.table.type.MetadataUtil.isMetadataField;
 public class MetadataTransform extends MultipleFieldOutputTransform {
 
     private List<String> fieldNames;
+    private MetadataSchema metadataSchema;
     private Map<String, String> metadataFieldMapping;
 
     public MetadataTransform(ReadonlyConfig config, @NonNull CatalogTable 
inputCatalogTable) {
@@ -65,6 +68,7 @@ public class MetadataTransform extends 
MultipleFieldOutputTransform {
             fieldNames.add(field.getKey());
         }
         this.fieldNames = fieldNames;
+        this.metadataSchema = inputCatalogTable.getMetadataSchema();
         this.metadataFieldMapping = fields;
     }
 
@@ -78,9 +82,8 @@ public class MetadataTransform extends 
MultipleFieldOutputTransform {
         Object[] value = new Object[fieldNames.size()];
         for (Map.Entry<String, String> mapping : 
metadataFieldMapping.entrySet()) {
             String metadataFieldName = mapping.getKey();
-            String mappingFieldName = mapping.getValue();
             int i = fieldNames.indexOf(metadataFieldName);
-            Object fieldValue = null;
+            Object fieldValue;
             switch (CommonOptions.fromName(metadataFieldName)) {
                 case DATABASE:
                     fieldValue = MetadataUtil.getDatabase(inputRow);
@@ -91,18 +94,8 @@ public class MetadataTransform extends 
MultipleFieldOutputTransform {
                 case ROW_KIND:
                     fieldValue = MetadataUtil.getRowKind(inputRow);
                     break;
-                case DELAY:
-                    fieldValue = MetadataUtil.getDelay(inputRow);
-                    break;
-                case EVENT_TIME:
-                    fieldValue = MetadataUtil.getEventTime(inputRow);
-                    break;
-                case PARTITION:
-                    fieldValue = MetadataUtil.getPartitionStr(inputRow);
-                    break;
                 default:
-                    throw TransformCommonError.cannotFindMetadataFieldError(
-                            getPluginName(), mappingFieldName);
+                    fieldValue = inputRow.getOptions().get(metadataFieldName);
             }
             value[i] = fieldValue;
         }
@@ -117,11 +110,11 @@ public class MetadataTransform extends 
MultipleFieldOutputTransform {
             String mappingFieldName = mapping.getValue();
             int i = fieldNames.indexOf(metadataFieldName);
             Column column;
+
             switch (CommonOptions.fromName(metadataFieldName)) {
                 case DATABASE:
                 case TABLE:
                 case ROW_KIND:
-                case PARTITION:
                     column =
                             PhysicalColumn.of(
                                     mappingFieldName,
@@ -132,21 +125,18 @@ public class MetadataTransform extends 
MultipleFieldOutputTransform {
                                     null,
                                     null);
                     break;
-                case DELAY:
-                case EVENT_TIME:
-                    column =
-                            PhysicalColumn.of(
-                                    mappingFieldName,
-                                    BasicType.LONG_TYPE,
-                                    (Long) null,
-                                    null,
-                                    true,
-                                    null,
-                                    null);
-                    break;
                 default:
-                    throw TransformCommonError.cannotFindMetadataFieldError(
-                            getPluginName(), mappingFieldName);
+                    if (metadataSchema.contains(metadataFieldName)) {
+                        column =
+                                ((MetadataColumn)
+                                                metadataSchema
+                                                        
.getColumn(metadataFieldName)
+                                                        
.rename(mappingFieldName))
+                                        .toPhysicalColumn();
+                    } else {
+                        throw 
TransformCommonError.cannotFindMetadataFieldError(
+                                getPluginName(), mappingFieldName);
+                    }
             }
             columns[i] = column;
         }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 2a1481aaee..fff51a8998 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -256,6 +256,7 @@ public class ZetaSQLEngine implements SQLEngine {
         SeaTunnelRow seaTunnelRow = new SeaTunnelRow(outputFields);
         seaTunnelRow.setRowKind(inputRow.getRowKind());
         seaTunnelRow.setTableId(inputRow.getTableId());
+        seaTunnelRow.setOptions(inputRow.getOptions());
         List<LateralView> lateralViews = selectBody.getLateralViews();
         if (CollectionUtils.isEmpty(lateralViews)) {
             return Lists.newArrayList(seaTunnelRow);
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 9dca1ad8bc..6d55b74d7b 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -857,6 +857,7 @@ public class ZetaSQLFunction {
         SeaTunnelRow outputRow = new SeaTunnelRow(fields);
         outputRow.setRowKind(row.getRowKind());
         outputRow.setTableId(row.getTableId());
+        outputRow.setOptions(row.getOptions());
         outputRow.setField(fieldIndex, fieldValue);
         return outputRow;
     }
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
index a3ddf1ced4..63277273f1 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
@@ -19,12 +19,18 @@ package org.apache.seatunnel.transform.metadata;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.MetadataColumn;
+import org.apache.seatunnel.api.table.catalog.MetadataSchema;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.CommonOptions;
 import org.apache.seatunnel.api.table.type.MetadataUtil;
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.junit.jupiter.api.Assertions;
@@ -36,6 +42,8 @@ import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 public class MetadataTransformTest {
@@ -50,6 +58,31 @@ public class MetadataTransformTest {
 
     @BeforeAll
     static void setUp() {
+        List<Column> metadata = new ArrayList<>();
+        metadata.add(
+                MetadataColumn.of(
+                        CommonOptions.EVENT_TIME.getName(),
+                        BasicType.LONG_TYPE,
+                        (Long) null,
+                        true,
+                        null,
+                        null));
+        metadata.add(
+                MetadataColumn.of(
+                        CommonOptions.DELAY.getName(),
+                        BasicType.LONG_TYPE,
+                        (Long) null,
+                        true,
+                        null,
+                        null));
+        metadata.add(
+                MetadataColumn.of(
+                        CommonOptions.PARTITION.getName(),
+                        ArrayType.STRING_ARRAY_TYPE,
+                        (Long) null,
+                        true,
+                        null,
+                        null));
         catalogTable =
                 CatalogTable.of(
                         TableIdentifier.of("catalog", TablePath.DEFAULT),
@@ -97,7 +130,9 @@ public class MetadataTransformTest {
                                 .build(),
                         new HashMap<>(),
                         new ArrayList<>(),
-                        "comment");
+                        "comment",
+                        "test",
+                        MetadataSchema.builder().columns(metadata).build());
         values = new Object[] {"value1", 1, 896657703886127105L, 3.1415916, 
3.14};
         inputRow = new SeaTunnelRow(values);
         inputRow.setTableId(TablePath.DEFAULT.getFullName());
@@ -109,7 +144,7 @@ public class MetadataTransformTest {
 
     @Test
     void testMetadataTransform() {
-        Map<String, String> metadataMapping = new HashMap<>();
+        Map<String, String> metadataMapping = new LinkedHashMap<>();
         metadataMapping.put("Database", "database");
         metadataMapping.put("Table", "table");
         metadataMapping.put("Partition", "partition");
@@ -121,12 +156,40 @@ public class MetadataTransformTest {
         MetadataTransform transform =
                 new MetadataTransform(ReadonlyConfig.fromMap(config), 
catalogTable);
         transform.initRowContainerGenerator();
+
+        Column[] columns = transform.getOutputColumns();
+        Assertions.assertEquals("database", columns[0].getName());
+        Assertions.assertEquals("table", columns[1].getName());
+        Assertions.assertEquals("partition", columns[2].getName());
+        Assertions.assertEquals("rowKind", columns[3].getName());
+        Assertions.assertEquals("ts_ms", columns[4].getName());
+        Assertions.assertEquals("delay", columns[5].getName());
+
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
columns[0].getDataType());
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
columns[1].getDataType());
+        Assertions.assertEquals(ArrayType.STRING_ARRAY_TYPE, 
columns[2].getDataType());
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
columns[3].getDataType());
+        Assertions.assertEquals(BasicType.LONG_TYPE, columns[4].getDataType());
+        Assertions.assertEquals(BasicType.LONG_TYPE, columns[5].getDataType());
+
+        Assertions.assertInstanceOf(PhysicalColumn.class, columns[0]);
+        Assertions.assertInstanceOf(PhysicalColumn.class, columns[5]);
+
         SeaTunnelRow outputRow = transform.map(inputRow);
         Assertions.assertEquals(values.length + 6, outputRow.getArity());
-        Assertions.assertEquals(
-                "SeaTunnelRow{tableId=default.default.default, kind=+I, 
fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, key1,key2, default, "
-                        + eventTime
-                        + ", +I, default, 150]}",
-                outputRow.toString());
+        Assertions.assertEquals("default.default.default", 
outputRow.getTableId());
+        Assertions.assertEquals(RowKind.INSERT, outputRow.getRowKind());
+        Assertions.assertEquals("value1", outputRow.getField(0));
+        Assertions.assertEquals(1, outputRow.getField(1));
+        Assertions.assertEquals(896657703886127105L, outputRow.getField(2));
+        Assertions.assertEquals(3.1415916, outputRow.getField(3));
+        Assertions.assertEquals(3.14, outputRow.getField(4));
+        Assertions.assertEquals("default", outputRow.getField(5));
+        Assertions.assertEquals("default", outputRow.getField(6));
+        Assertions.assertArrayEquals(
+                new String[] {"key1", "key2"}, (String[]) 
outputRow.getField(7));
+        Assertions.assertEquals("+I", outputRow.getField(8));
+        Assertions.assertEquals(eventTime, outputRow.getField(9));
+        Assertions.assertEquals(150L, outputRow.getField(10));
     }
 }


Reply via email to