This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 385814e7f1 [Improve][API] Add metadata schema into catalog table
(#9586)
385814e7f1 is described below
commit 385814e7f11d55918dd5153c14ea80ff1dbc1f59
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jul 31 21:39:01 2025 +0800
[Improve][API] Add metadata schema into catalog table (#9586)
---
docs/en/transform-v2/metadata.md | 2 +-
docs/zh/transform-v2/metadata.md | 2 +-
.../{TableSchema.java => AbstractSchema.java} | 69 +++----------------
.../seatunnel/api/table/catalog/CatalogTable.java | 55 +++++++++++++---
.../api/table/catalog/MetadataColumn.java | 27 +++-----
.../api/table/catalog/MetadataSchema.java | 63 ++++++++++++++++++
.../seatunnel/api/table/catalog/TableSchema.java | 52 ++-------------
.../seatunnel/api/table/type/MetadataUtil.java | 14 ----
.../assertion/excecutor/AssertExecutor.java | 7 ++
.../cdc/base/source/IncrementalSource.java | 43 +++++++++++-
.../seatunnel/milvus/utils/MilvusConvertUtils.java | 20 +++++-
.../e2e/connector/doris/AbstractDorisIT.java | 5 ++
.../common/AbstractSeaTunnelTransform.java | 4 +-
.../common/MultipleFieldOutputTransform.java | 1 +
.../common/SingleFieldOutputTransform.java | 1 +
.../transform/metadata/MetadataTransform.java | 46 +++++--------
.../transform/sql/zeta/ZetaSQLEngine.java | 1 +
.../transform/sql/zeta/ZetaSQLFunction.java | 1 +
.../transform/metadata/MetadataTransformTest.java | 77 ++++++++++++++++++++--
19 files changed, 301 insertions(+), 189 deletions(-)
diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md
index 8656278d20..536038b4d8 100644
--- a/docs/en/transform-v2/metadata.md
+++ b/docs/en/transform-v2/metadata.md
@@ -17,7 +17,7 @@ Metadata transform plugin for adding metadata fields to data
| Partition | string | Contains the partition field of the corresponding
number table of the row, multiple using `,` join |
### note
- `Delay` `Partition` only worked on cdc series connectors for now , except
TiDB-CDC
+ `Delay` `EventTime` only worked on cdc series connectors for now , except
TiDB-CDC
## Options
diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md
index aa4ae3d6b4..2e92c1282f 100644
--- a/docs/zh/transform-v2/metadata.md
+++ b/docs/zh/transform-v2/metadata.md
@@ -17,7 +17,7 @@
| Partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 |
### 注意事项
- `Delay` `Partition`目前只适用于cdc系列连接器,除外TiDB-CDC
+ `Delay` `EventTime`目前只适用于cdc系列连接器,TiDB-CDC除外
## 配置选项
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/AbstractSchema.java
similarity index 54%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/AbstractSchema.java
index 2238da2617..3ed52733a3 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/AbstractSchema.java
@@ -32,27 +32,22 @@ import java.util.stream.Collectors;
/** Represent a physical table schema. */
@Data
-public final class TableSchema implements Serializable {
+public class AbstractSchema implements Serializable {
private static final long serialVersionUID = 1L;
- private final List<Column> columns;
+ protected final List<Column> columns;
@Getter(AccessLevel.PRIVATE)
- private final List<String> columnNames;
+ protected final List<String> columnNames;
- private final PrimaryKey primaryKey;
-
- private final List<ConstraintKey> constraintKeys;
-
- public TableSchema(
- List<Column> columns, PrimaryKey primaryKey, List<ConstraintKey>
constraintKeys) {
+ public AbstractSchema(List<Column> columns) {
this.columns = columns;
this.columnNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
- this.primaryKey = primaryKey;
- this.constraintKeys = constraintKeys;
}
- public static Builder builder() {
- return new Builder();
+ // Lombok requires a no-arg constructor for @Data annotation to work
properly
+ private AbstractSchema() {
+ this.columns = new ArrayList<>();
+ this.columnNames = new ArrayList<>();
}
public SeaTunnelRowType toPhysicalRowDataType() {
@@ -88,52 +83,4 @@ public final class TableSchema implements Serializable {
public List<Column> getColumns() {
return Collections.unmodifiableList(columns);
}
-
- public static final class Builder {
- private final List<Column> columns = new ArrayList<>();
-
- private PrimaryKey primaryKey;
-
- private final List<ConstraintKey> constraintKeys = new ArrayList<>();
-
- public Builder columns(List<Column> columns) {
- this.columns.addAll(columns);
- return this;
- }
-
- public Builder column(Column column) {
- this.columns.add(column);
- return this;
- }
-
- public Builder primaryKey(PrimaryKey primaryKey) {
- this.primaryKey = primaryKey;
- return this;
- }
-
- public Builder constraintKey(ConstraintKey constraintKey) {
- this.constraintKeys.add(constraintKey);
- return this;
- }
-
- public Builder constraintKey(List<ConstraintKey> constraintKeys) {
- this.constraintKeys.addAll(constraintKeys);
- return this;
- }
-
- public TableSchema build() {
- return new TableSchema(columns, primaryKey, constraintKeys);
- }
- }
-
- public TableSchema copy() {
- List<Column> copyColumns =
columns.stream().map(Column::copy).collect(Collectors.toList());
- List<ConstraintKey> copyConstraintKeys =
-
constraintKeys.stream().map(ConstraintKey::copy).collect(Collectors.toList());
- return TableSchema.builder()
- .constraintKey(copyConstraintKeys)
- .columns(copyColumns)
- .primaryKey(primaryKey == null ? null : primaryKey.copy())
- .build();
- }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
index f3ec98eba4..2de5b63111 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
@@ -40,6 +40,8 @@ public final class CatalogTable implements Serializable {
private final List<String> partitionKeys;
+ private final MetadataSchema metadata;
+
private final String comment;
private final String catalogName;
@@ -52,7 +54,8 @@ public final class CatalogTable implements Serializable {
newTable.getOptions(),
newTable.getPartitionKeys(),
newTable.getComment(),
- newTable.getCatalogName());
+ newTable.getCatalogName(),
+ newTable.getMetadataSchema());
}
public static CatalogTable of(
@@ -62,7 +65,13 @@ public final class CatalogTable implements Serializable {
List<String> partitionKeys,
String comment) {
return new CatalogTable(
- tableId, tableSchema, options, partitionKeys, comment,
tableId.getCatalogName());
+ tableId,
+ tableSchema,
+ options,
+ partitionKeys,
+ comment,
+ tableId.getCatalogName(),
+ MetadataSchema.builder().build());
}
public static CatalogTable of(
@@ -72,16 +81,37 @@ public final class CatalogTable implements Serializable {
List<String> partitionKeys,
String comment,
String catalogName) {
- return new CatalogTable(tableId, tableSchema, options, partitionKeys,
comment, catalogName);
+ return new CatalogTable(
+ tableId,
+ tableSchema,
+ options,
+ partitionKeys,
+ comment,
+ catalogName,
+ MetadataSchema.builder().build());
}
- private CatalogTable(
+ public static CatalogTable of(
TableIdentifier tableId,
TableSchema tableSchema,
Map<String, String> options,
List<String> partitionKeys,
- String comment) {
- this(tableId, tableSchema, options, partitionKeys, comment,
tableId.getCatalogName());
+ String comment,
+ String catalogName,
+ MetadataSchema metadata) {
+ return new CatalogTable(
+ tableId, tableSchema, options, partitionKeys, comment,
catalogName, metadata);
+ }
+
+ public static CatalogTable withMetadata(CatalogTable catalogTable,
MetadataSchema metadata) {
+ return new CatalogTable(
+ catalogTable.getTableId(),
+ catalogTable.getTableSchema(),
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment(),
+ catalogTable.getCatalogName(),
+ metadata);
}
private CatalogTable(
@@ -90,7 +120,8 @@ public final class CatalogTable implements Serializable {
Map<String, String> options,
List<String> partitionKeys,
String comment,
- String catalogName) {
+ String catalogName,
+ MetadataSchema metadata) {
this.tableId = tableId;
this.tableSchema = tableSchema;
// Make sure the options and partitionKeys are mutable
@@ -98,6 +129,7 @@ public final class CatalogTable implements Serializable {
this.partitionKeys = new ArrayList<>(partitionKeys);
this.comment = comment;
this.catalogName = catalogName;
+ this.metadata = metadata;
}
public CatalogTable copy() {
@@ -107,7 +139,8 @@ public final class CatalogTable implements Serializable {
new HashMap<>(options),
new ArrayList<>(partitionKeys),
comment,
- catalogName);
+ catalogName,
+ metadata);
}
public TableIdentifier getTableId() {
@@ -142,6 +175,10 @@ public final class CatalogTable implements Serializable {
return catalogName;
}
+ public MetadataSchema getMetadataSchema() {
+ return metadata;
+ }
+
@Override
public String toString() {
return "CatalogTable{"
@@ -159,6 +196,8 @@ public final class CatalogTable implements Serializable {
+ ", catalogName='"
+ catalogName
+ '\''
+ + ", metadata="
+ + metadata
+ '}';
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
index e0e7e9e991..eab4b36578 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
@@ -29,30 +29,24 @@ import lombok.ToString;
public class MetadataColumn extends Column {
private static final long serialVersionUID = 1L;
- private final String metadataKey;
-
protected MetadataColumn(
String name,
SeaTunnelDataType<?> dataType,
Long columnLength,
- String metadataKey,
boolean nullable,
Object defaultValue,
String comment) {
super(name, dataType, columnLength, nullable, defaultValue, comment);
- this.metadataKey = metadataKey;
}
public static MetadataColumn of(
String name,
SeaTunnelDataType<?> dataType,
Long columnLength,
- String metadataKey,
boolean nullable,
Object defaultValue,
String comment) {
- return new MetadataColumn(
- name, dataType, columnLength, metadataKey, nullable,
defaultValue, comment);
+ return new MetadataColumn(name, dataType, columnLength, nullable,
defaultValue, comment);
}
@Override
@@ -62,26 +56,23 @@ public class MetadataColumn extends Column {
@Override
public Column copy(SeaTunnelDataType<?> newType) {
- return MetadataColumn.of(
- name, newType, columnLength, metadataKey, nullable,
defaultValue, comment);
+ return MetadataColumn.of(name, newType, columnLength, nullable,
defaultValue, comment);
}
@Override
public Column copy() {
- return MetadataColumn.of(
- name, dataType, columnLength, metadataKey, nullable,
defaultValue, comment);
+ return MetadataColumn.of(name, dataType, columnLength, nullable,
defaultValue, comment);
}
@Override
public Column rename(String newColumnName) {
return MetadataColumn.of(
- newColumnName,
- dataType,
- columnLength,
- metadataKey,
- nullable,
- defaultValue,
- comment);
+ newColumnName, dataType, columnLength, nullable, defaultValue,
comment);
+ }
+
+ public PhysicalColumn toPhysicalColumn() {
+ return PhysicalColumn.of(
+ name, dataType, columnLength, scale, nullable, defaultValue,
comment);
}
@Override
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataSchema.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataSchema.java
new file mode 100644
index 0000000000..6c5f658439
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataSchema.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Represent a physical table schema. */
+@EqualsAndHashCode(callSuper = true)
+@Data
+public final class MetadataSchema extends AbstractSchema {
+ private static final long serialVersionUID = 1L;
+
+ public MetadataSchema(List<Column> columns) {
+ super(columns);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private final List<Column> columns = new ArrayList<>();
+
+ public Builder columns(List<Column> columns) {
+ this.columns.addAll(columns);
+ return this;
+ }
+
+ public Builder column(Column column) {
+ this.columns.add(column);
+ return this;
+ }
+
+ public MetadataSchema build() {
+ return new MetadataSchema(columns);
+ }
+ }
+
+ public MetadataSchema copy() {
+ List<Column> copyColumns =
columns.stream().map(Column::copy).collect(Collectors.toList());
+ return MetadataSchema.builder().columns(copyColumns).build();
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 2238da2617..3f1d986317 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -17,27 +17,18 @@
package org.apache.seatunnel.api.table.catalog;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import lombok.AccessLevel;
import lombok.Data;
-import lombok.Getter;
+import lombok.EqualsAndHashCode;
-import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/** Represent a physical table schema. */
+@EqualsAndHashCode(callSuper = true)
@Data
-public final class TableSchema implements Serializable {
+public final class TableSchema extends AbstractSchema {
private static final long serialVersionUID = 1L;
- private final List<Column> columns;
-
- @Getter(AccessLevel.PRIVATE)
- private final List<String> columnNames;
private final PrimaryKey primaryKey;
@@ -45,8 +36,7 @@ public final class TableSchema implements Serializable {
public TableSchema(
List<Column> columns, PrimaryKey primaryKey, List<ConstraintKey>
constraintKeys) {
- this.columns = columns;
- this.columnNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
+ super(columns);
this.primaryKey = primaryKey;
this.constraintKeys = constraintKeys;
}
@@ -55,40 +45,6 @@ public final class TableSchema implements Serializable {
return new Builder();
}
- public SeaTunnelRowType toPhysicalRowDataType() {
- SeaTunnelDataType<?>[] fieldTypes =
- columns.stream()
- .filter(Column::isPhysical)
- .map(Column::getDataType)
- .toArray(SeaTunnelDataType[]::new);
- String[] fields =
- columns.stream()
- .filter(Column::isPhysical)
- .map(Column::getName)
- .toArray(String[]::new);
- return new SeaTunnelRowType(fields, fieldTypes);
- }
-
- public String[] getFieldNames() {
- return columnNames.toArray(new String[0]);
- }
-
- public int indexOf(String columnName) {
- return columnNames.indexOf(columnName);
- }
-
- public Column getColumn(String columnName) {
- return columns.get(indexOf(columnName));
- }
-
- public boolean contains(String columnName) {
- return columnNames.contains(columnName);
- }
-
- public List<Column> getColumns() {
- return Collections.unmodifiableList(columns);
- }
-
public static final class Builder {
private final List<Column> columns = new ArrayList<>();
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
index 42ab203576..ed5fb4d615 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
import java.util.stream.Stream;
import static org.apache.seatunnel.api.table.type.CommonOptions.DELAY;
@@ -52,10 +51,6 @@ public class MetadataUtil {
row.getOptions().put(EVENT_TIME.getName(), delay);
}
- public static Long getDelay(SeaTunnelRowAccessor row) {
- return (Long) row.getOptions().get(DELAY.getName());
- }
-
public static String getDatabase(SeaTunnelRowAccessor row) {
if (row.getTableId() == null) {
return null;
@@ -74,19 +69,10 @@ public class MetadataUtil {
return row.getRowKind().shortString();
}
- public static String getPartitionStr(SeaTunnelRowAccessor row) {
- Object partition = row.getOptions().get(PARTITION.getName());
- return Objects.nonNull(partition) ? String.join(",", (String[])
partition) : null;
- }
-
public static String[] getPartition(SeaTunnelRowAccessor row) {
return (String[]) row.getOptions().get(PARTITION.getName());
}
- public static Long getEventTime(SeaTunnelRowAccessor row) {
- return (Long) row.getOptions().get(EVENT_TIME.getName());
- }
-
public static boolean isMetadataField(String fieldName) {
return METADATA_FIELDS.contains(fieldName);
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
index 53f8a40194..19778eb67e 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
@@ -75,6 +75,13 @@ public class AssertExecutor {
Lists.newArrayList(rowType.getFieldNames()),
fieldName ->
fieldName.equals(assertFieldRule.getFieldName()));
+ if (index == -1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field name %s not found in row type %s",
+ assertFieldRule.getFieldName(), rowType));
+ }
+
SeaTunnelDataType<?> type = rowType.getFieldType(index);
Object value = rowData.getField(index);
String fieldName = rowType.getFieldName(index);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 153514a53f..85340c4a3a 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -27,6 +27,11 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.MetadataColumn;
+import org.apache.seatunnel.api.table.catalog.MetadataSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.CommonOptions;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
@@ -62,6 +67,7 @@ import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJs
import io.debezium.relational.TableId;
import lombok.NoArgsConstructor;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -94,8 +100,8 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
protected DebeziumDeserializationSchema<T> deserializationSchema;
protected IncrementalSource(ReadonlyConfig options, List<CatalogTable>
catalogTables) {
- this.catalogTables = catalogTables;
this.readonlyConfig = options;
+ this.catalogTables = updateCatalogTableMetadata(catalogTables);
this.startupConfig = getStartupConfig(readonlyConfig);
this.stopConfig = getStopConfig(readonlyConfig);
this.stopMode = stopConfig.getStopMode();
@@ -114,6 +120,41 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
config.get(SourceOptions.STARTUP_TIMESTAMP));
}
+ private List<CatalogTable> updateCatalogTableMetadata(List<CatalogTable>
catalogTables) {
+ return catalogTables.stream()
+ .map(
+ table -> {
+ if (DeserializeFormat.DEFAULT.equals(
+
readonlyConfig.get(JdbcSourceOptions.FORMAT))) {
+ return CatalogTable.withMetadata(table,
getMetadataColumns());
+ } else {
+ return table;
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ private MetadataSchema getMetadataColumns() {
+ List<Column> metadata = new ArrayList<>();
+ metadata.add(
+ MetadataColumn.of(
+ CommonOptions.EVENT_TIME.getName(),
+ BasicType.LONG_TYPE,
+ (Long) null,
+ true,
+ null,
+ null));
+ metadata.add(
+ MetadataColumn.of(
+ CommonOptions.DELAY.getName(),
+ BasicType.LONG_TYPE,
+ (Long) null,
+ true,
+ null,
+ null));
+ return MetadataSchema.builder().columns(metadata).build();
+ }
+
private StopConfig getStopConfig(ReadonlyConfig config) {
return new StopConfig(
config.get(getStopModeOption()),
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
index ae9b7ccde3..df5c807690 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
@@ -21,12 +21,15 @@ import
org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.MetadataColumn;
+import org.apache.seatunnel.api.table.catalog.MetadataSchema;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.VectorIndex;
+import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.CommonOptions;
import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions;
import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
@@ -196,14 +199,29 @@ public class MilvusConvertUtils {
options.put(
MilvusOptions.ENABLE_DYNAMIC_FIELD,
String.valueOf(schema.getEnableDynamicField()));
options.put(MilvusOptions.SHARDS_NUM,
String.valueOf(collectionResponse.getShardsNum()));
+ MetadataSchema.Builder metadataBuilder = MetadataSchema.builder();
if (existPartitionKeyField) {
options.put(MilvusOptions.PARTITION_KEY_FIELD, partitionKeyField);
+ metadataBuilder.column(
+ MetadataColumn.of(
+ CommonOptions.PARTITION.getName(),
+ BasicType.STRING_TYPE,
+ null,
+ true,
+ null,
+ null));
} else {
fillPartitionNames(options, client, database, collection);
}
return CatalogTable.of(
- tableId, tableSchema, options, new ArrayList<>(),
schema.getDescription());
+ tableId,
+ tableSchema,
+ options,
+ new ArrayList<>(),
+ schema.getDescription(),
+ tableId.getCatalogName(),
+ metadataBuilder.build());
}
private static void fillPartitionNames(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
index 7777d9cd24..dcd0f6f7bd 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.lifecycle.Startables;
@@ -129,6 +130,10 @@ public abstract class AbstractDorisIT extends
TestSuiteBase implements TestResou
while (beResultSet.next()) {
beList.add(beResultSet.getString("Host"));
}
+ if (beList.isEmpty()) {
+ log.error("doris BE is empty, skip initialization");
+ Assertions.fail("doris BE is empty, skip initialization");
+ }
if (beList.stream().anyMatch("127.0.0.1"::equals)) {
ResultSet resultSet = statement.executeQuery(SHOW_FE);
String feIp = null;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index 01ce7eaf0a..edbba3ff08 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -72,7 +72,9 @@ public abstract class AbstractSeaTunnelTransform<T, R>
implements SeaTunnelTrans
tableSchema,
inputCatalogTable.getOptions(),
inputCatalogTable.getPartitionKeys(),
- inputCatalogTable.getComment());
+ inputCatalogTable.getComment(),
+ inputCatalogTable.getTableId().getCatalogName(),
+ inputCatalogTable.getMetadataSchema());
}
public R transform(SeaTunnelRow row) {
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 84e3a9348d..f385b3cfd6 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -137,6 +137,7 @@ public abstract class MultipleFieldOutputTransform extends
AbstractCatalogSuppor
SeaTunnelRow outputRow = new
SeaTunnelRow(outputFieldValues);
outputRow.setTableId(inputRow.getTableId());
outputRow.setRowKind(inputRow.getRowKind());
+ outputRow.setOptions(inputRow.getOptions());
return outputRow;
}
};
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index 8768069ab8..7551098336 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -117,6 +117,7 @@ public abstract class SingleFieldOutputTransform extends
AbstractCatalogSupportM
SeaTunnelRow outputRow = new
SeaTunnelRow(outputFieldValues);
outputRow.setTableId(inputRow.getTableId());
outputRow.setRowKind(inputRow.getRowKind());
+ outputRow.setOptions(inputRow.getOptions());
return outputRow;
}
};
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
index f37028e070..6db61c900d 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
@@ -22,6 +22,8 @@ import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.MetadataColumn;
+import org.apache.seatunnel.api.table.catalog.MetadataSchema;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.CommonOptions;
@@ -42,6 +44,7 @@ import static
org.apache.seatunnel.api.table.type.MetadataUtil.isMetadataField;
public class MetadataTransform extends MultipleFieldOutputTransform {
private List<String> fieldNames;
+ private MetadataSchema metadataSchema;
private Map<String, String> metadataFieldMapping;
public MetadataTransform(ReadonlyConfig config, @NonNull CatalogTable
inputCatalogTable) {
@@ -65,6 +68,7 @@ public class MetadataTransform extends
MultipleFieldOutputTransform {
fieldNames.add(field.getKey());
}
this.fieldNames = fieldNames;
+ this.metadataSchema = inputCatalogTable.getMetadataSchema();
this.metadataFieldMapping = fields;
}
@@ -78,9 +82,8 @@ public class MetadataTransform extends
MultipleFieldOutputTransform {
Object[] value = new Object[fieldNames.size()];
for (Map.Entry<String, String> mapping :
metadataFieldMapping.entrySet()) {
String metadataFieldName = mapping.getKey();
- String mappingFieldName = mapping.getValue();
int i = fieldNames.indexOf(metadataFieldName);
- Object fieldValue = null;
+ Object fieldValue;
switch (CommonOptions.fromName(metadataFieldName)) {
case DATABASE:
fieldValue = MetadataUtil.getDatabase(inputRow);
@@ -91,18 +94,8 @@ public class MetadataTransform extends
MultipleFieldOutputTransform {
case ROW_KIND:
fieldValue = MetadataUtil.getRowKind(inputRow);
break;
- case DELAY:
- fieldValue = MetadataUtil.getDelay(inputRow);
- break;
- case EVENT_TIME:
- fieldValue = MetadataUtil.getEventTime(inputRow);
- break;
- case PARTITION:
- fieldValue = MetadataUtil.getPartitionStr(inputRow);
- break;
default:
- throw TransformCommonError.cannotFindMetadataFieldError(
- getPluginName(), mappingFieldName);
+ fieldValue = inputRow.getOptions().get(metadataFieldName);
}
value[i] = fieldValue;
}
@@ -117,11 +110,11 @@ public class MetadataTransform extends
MultipleFieldOutputTransform {
String mappingFieldName = mapping.getValue();
int i = fieldNames.indexOf(metadataFieldName);
Column column;
+
switch (CommonOptions.fromName(metadataFieldName)) {
case DATABASE:
case TABLE:
case ROW_KIND:
- case PARTITION:
column =
PhysicalColumn.of(
mappingFieldName,
@@ -132,21 +125,18 @@ public class MetadataTransform extends
MultipleFieldOutputTransform {
null,
null);
break;
- case DELAY:
- case EVENT_TIME:
- column =
- PhysicalColumn.of(
- mappingFieldName,
- BasicType.LONG_TYPE,
- (Long) null,
- null,
- true,
- null,
- null);
- break;
default:
- throw TransformCommonError.cannotFindMetadataFieldError(
- getPluginName(), mappingFieldName);
+ if (metadataSchema.contains(metadataFieldName)) {
+ column =
+ ((MetadataColumn)
+ metadataSchema
+
.getColumn(metadataFieldName)
+
.rename(mappingFieldName))
+ .toPhysicalColumn();
+ } else {
+ throw
TransformCommonError.cannotFindMetadataFieldError(
+ getPluginName(), mappingFieldName);
+ }
}
columns[i] = column;
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 2a1481aaee..fff51a8998 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -256,6 +256,7 @@ public class ZetaSQLEngine implements SQLEngine {
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(outputFields);
seaTunnelRow.setRowKind(inputRow.getRowKind());
seaTunnelRow.setTableId(inputRow.getTableId());
+ seaTunnelRow.setOptions(inputRow.getOptions());
List<LateralView> lateralViews = selectBody.getLateralViews();
if (CollectionUtils.isEmpty(lateralViews)) {
return Lists.newArrayList(seaTunnelRow);
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 9dca1ad8bc..6d55b74d7b 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -857,6 +857,7 @@ public class ZetaSQLFunction {
SeaTunnelRow outputRow = new SeaTunnelRow(fields);
outputRow.setRowKind(row.getRowKind());
outputRow.setTableId(row.getTableId());
+ outputRow.setOptions(row.getOptions());
outputRow.setField(fieldIndex, fieldValue);
return outputRow;
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
index a3ddf1ced4..63277273f1 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
@@ -19,12 +19,18 @@ package org.apache.seatunnel.transform.metadata;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.MetadataColumn;
+import org.apache.seatunnel.api.table.catalog.MetadataSchema;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.CommonOptions;
import org.apache.seatunnel.api.table.type.MetadataUtil;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.junit.jupiter.api.Assertions;
@@ -36,6 +42,8 @@ import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
public class MetadataTransformTest {
@@ -50,6 +58,31 @@ public class MetadataTransformTest {
@BeforeAll
static void setUp() {
+ List<Column> metadata = new ArrayList<>();
+ metadata.add(
+ MetadataColumn.of(
+ CommonOptions.EVENT_TIME.getName(),
+ BasicType.LONG_TYPE,
+ (Long) null,
+ true,
+ null,
+ null));
+ metadata.add(
+ MetadataColumn.of(
+ CommonOptions.DELAY.getName(),
+ BasicType.LONG_TYPE,
+ (Long) null,
+ true,
+ null,
+ null));
+ metadata.add(
+ MetadataColumn.of(
+ CommonOptions.PARTITION.getName(),
+ ArrayType.STRING_ARRAY_TYPE,
+ (Long) null,
+ true,
+ null,
+ null));
catalogTable =
CatalogTable.of(
TableIdentifier.of("catalog", TablePath.DEFAULT),
@@ -97,7 +130,9 @@ public class MetadataTransformTest {
.build(),
new HashMap<>(),
new ArrayList<>(),
- "comment");
+ "comment",
+ "test",
+ MetadataSchema.builder().columns(metadata).build());
values = new Object[] {"value1", 1, 896657703886127105L, 3.1415916,
3.14};
inputRow = new SeaTunnelRow(values);
inputRow.setTableId(TablePath.DEFAULT.getFullName());
@@ -109,7 +144,7 @@ public class MetadataTransformTest {
@Test
void testMetadataTransform() {
- Map<String, String> metadataMapping = new HashMap<>();
+ Map<String, String> metadataMapping = new LinkedHashMap<>();
metadataMapping.put("Database", "database");
metadataMapping.put("Table", "table");
metadataMapping.put("Partition", "partition");
@@ -121,12 +156,40 @@ public class MetadataTransformTest {
MetadataTransform transform =
new MetadataTransform(ReadonlyConfig.fromMap(config),
catalogTable);
transform.initRowContainerGenerator();
+
+ Column[] columns = transform.getOutputColumns();
+ Assertions.assertEquals("database", columns[0].getName());
+ Assertions.assertEquals("table", columns[1].getName());
+ Assertions.assertEquals("partition", columns[2].getName());
+ Assertions.assertEquals("rowKind", columns[3].getName());
+ Assertions.assertEquals("ts_ms", columns[4].getName());
+ Assertions.assertEquals("delay", columns[5].getName());
+
+ Assertions.assertEquals(BasicType.STRING_TYPE,
columns[0].getDataType());
+ Assertions.assertEquals(BasicType.STRING_TYPE,
columns[1].getDataType());
+ Assertions.assertEquals(ArrayType.STRING_ARRAY_TYPE,
columns[2].getDataType());
+ Assertions.assertEquals(BasicType.STRING_TYPE,
columns[3].getDataType());
+ Assertions.assertEquals(BasicType.LONG_TYPE, columns[4].getDataType());
+ Assertions.assertEquals(BasicType.LONG_TYPE, columns[5].getDataType());
+
+ Assertions.assertInstanceOf(PhysicalColumn.class, columns[0]);
+ Assertions.assertInstanceOf(PhysicalColumn.class, columns[5]);
+
SeaTunnelRow outputRow = transform.map(inputRow);
Assertions.assertEquals(values.length + 6, outputRow.getArity());
- Assertions.assertEquals(
- "SeaTunnelRow{tableId=default.default.default, kind=+I,
fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, key1,key2, default, "
- + eventTime
- + ", +I, default, 150]}",
- outputRow.toString());
+ Assertions.assertEquals("default.default.default",
outputRow.getTableId());
+ Assertions.assertEquals(RowKind.INSERT, outputRow.getRowKind());
+ Assertions.assertEquals("value1", outputRow.getField(0));
+ Assertions.assertEquals(1, outputRow.getField(1));
+ Assertions.assertEquals(896657703886127105L, outputRow.getField(2));
+ Assertions.assertEquals(3.1415916, outputRow.getField(3));
+ Assertions.assertEquals(3.14, outputRow.getField(4));
+ Assertions.assertEquals("default", outputRow.getField(5));
+ Assertions.assertEquals("default", outputRow.getField(6));
+ Assertions.assertArrayEquals(
+ new String[] {"key1", "key2"}, (String[])
outputRow.getField(7));
+ Assertions.assertEquals("+I", outputRow.getField(8));
+ Assertions.assertEquals(eventTime, outputRow.getField(9));
+ Assertions.assertEquals(150L, outputRow.getField(10));
}
}