This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2b1234c7ae [Feature][Paimon] Support specify paimon table write
properties, partition keys and primary keys (#6535)
2b1234c7ae is described below
commit 2b1234c7ae885bb26540d3e98d75c623db5d556d
Author: dailai <[email protected]>
AuthorDate: Wed Mar 27 10:23:50 2024 +0800
[Feature][Paimon] Support specify paimon table write properties, partition
keys and primary keys (#6535)
---
docs/en/concept/schema-feature.md | 1 +
docs/en/connector-v2/sink/Paimon.md | 53 ++++-
docs/zh/concept/schema-feature.md | 1 +
docs/zh/connector-v2/sink/Paimon.md | 53 ++++-
.../table/catalog/schema/ReadonlyConfigParser.java | 8 +-
.../table/catalog/schema/TableSchemaOptions.java | 6 +
.../seatunnel/paimon/catalog/PaimonCatalog.java | 34 +--
.../paimon/catalog/PaimonCatalogFactory.java | 7 +-
.../seatunnel/paimon/config/PaimonSinkConfig.java | 49 +++-
.../seatunnel/paimon/data/PaimonTypeMapper.java | 5 +-
.../seatunnel/paimon/sink/PaimonSinkFactory.java | 5 +-
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 17 +-
.../seatunnel/paimon/utils/RowConverter.java | 29 ++-
.../seatunnel/paimon/utils/RowTypeConverter.java | 81 ++++++-
.../seatunnel/paimon/utils/SchemaUtil.java | 37 ++-
.../seatunnel/paimon/utils/RowConverterTest.java | 57 ++++-
.../paimon/utils/RowTypeConverterTest.java | 57 ++++-
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 260 ++++++++++++++++++++-
.../test/resources/fake_cdc_sink_paimon_case1.conf | 2 +-
..._case1.conf => fake_cdc_sink_paimon_case3.conf} | 13 +-
..._case1.conf => fake_cdc_sink_paimon_case4.conf} | 37 +--
..._case1.conf => fake_cdc_sink_paimon_case5.conf} | 13 +-
..._case1.conf => fake_cdc_sink_paimon_case6.conf} | 13 +-
.../test/resources/fake_cdc_sink_paimon_case7.conf | 127 ++++++++++
24 files changed, 840 insertions(+), 125 deletions(-)
diff --git a/docs/en/concept/schema-feature.md
b/docs/en/concept/schema-feature.md
index 15f8186cce..9ae2c3d39e 100644
--- a/docs/en/concept/schema-feature.md
+++ b/docs/en/concept/schema-feature.md
@@ -64,6 +64,7 @@ columns = [
| type | Yes | - | The data type of the column
|
| nullable | No | true | If the column can be nullable
|
| columnLength | No | 0 | The length of the column which
will be useful when you need to define the length |
+| columnScale | No | - | The scale of the column which will
be useful when you need to define the scale |
| defaultValue | No | null | The default value of the column
|
| comment | No | null | The comment of the column
|
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index 5e9d3c431f..707a0dc0db 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -12,14 +12,17 @@ Sink connector for Apache Paimon. It can support cdc mode
、auto create table.
## Options
-| name | type | required | default value |
Description |
-|------------------|--------|----------|------------------------------|---------------------------------|
-| warehouse | String | Yes | - | Paimon
warehouse path |
-| database | String | Yes | - | The
database you want to access |
-| table | String | Yes | - | The
table you want to access |
-| hdfs_site_path | String | No | - |
|
-| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | The
schema save mode |
-| data_save_mode | Enum | no | APPEND_DATA | The
data save mode |
+| name | type | required | default value
|
Description
|
+|-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| warehouse | String | Yes | -
| Paimon warehouse path
|
+| database | String | Yes | -
| The database you want to access
|
+| table | String | Yes | -
| The table you want to access
|
+| hdfs_site_path | String | No | -
|
|
+| schema_save_mode | Enum | No |
CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode
|
+| data_save_mode | Enum | No | APPEND_DATA
| The data save mode
|
+| paimon.table.primary-keys | String | No | -
| Default comma-separated list of columns (primary key) that identify a row
in tables.(Notice: The partition field needs to be included in the primary key
fields) |
+| paimon.table.partition-keys | String | No | -
| Default comma-separated list of partition fields to use when creating
tables.
|
+| paimon.table.write-props | Map | No | -
| Properties passed through to paimon table initialization,
[reference](https://paimon.apache.org/docs/0.6/maintenance/configurations/#coreoptions).
|
## Examples
@@ -54,6 +57,40 @@ sink {
}
```
+### Single table with write props of paimon
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role"]
+ }
+}
+
+sink {
+ Paimon {
+ catalog_name="seatunnel_test"
+ warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="seatunnel"
+ table="role"
+ paimon.table.write-props = {
+ bucket = 2
+ file.format = "parquet"
+ }
+ paimon.table.partition-keys = "dt"
+ paimon.table.primary-keys = "pk_id,dt"
+ }
+}
+```
+
### Multiple table
```hocon
diff --git a/docs/zh/concept/schema-feature.md
b/docs/zh/concept/schema-feature.md
index cc69b6d83e..adb4089298 100644
--- a/docs/zh/concept/schema-feature.md
+++ b/docs/zh/concept/schema-feature.md
@@ -64,6 +64,7 @@ columns = [
| type | Yes | - | 列的数据类型 |
| nullable | No | true | 列是否可空 |
| columnLength | No | 0 | 列的长度,当您需要定义长度时将很有用 |
+| columnScale | No | - | 列的精度,当您需要定义精度时将很有用 |
| defaultValue | No | null | 列的默认值 |
| comment | No | null | 列的注释 |
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index b1b4baef9b..306bc12b56 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -12,14 +12,17 @@ Apache Paimon数据连接器。支持cdc写以及自动建表。
## 连接器选项
-| 名称 | 类型 | 是否必须 | 默认值 | 描述
|
-|------------------|--------|------|------------------------------|--------------------|
-| warehouse | String | Yes | - | Paimon
warehouse路径 |
-| database | String | Yes | - | 数据库名称
|
-| table | String | Yes | - | 表名
|
-| hdfs_site_path | String | No | - |
|
-| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式
|
-| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式
|
+| 名称 | 类型 | 是否必须 | 默认值 |
描述
|
+|-----------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------|
+| warehouse | 字符串 | 是 | - |
Paimon warehouse路径
|
+| database | 字符串 | 是 | - |
数据库名称
|
+| table | 字符串 | 是 | - | 表名
|
+| hdfs_site_path | 字符串 | 否 | - |
|
+| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
Schema保存模式
|
+| data_save_mode | 枚举 | 否 | APPEND_DATA |
数据保存模式
|
+| paimon.table.primary-keys | 字符串 | 否 | - |
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)
|
+| paimon.table.partition-keys | 字符串 | 否 | - |
分区字段列表,多字段使用逗号分隔
|
+| paimon.table.write-props | Map | 否 | - |
Paimon表初始化指定的属性,
[参考](https://paimon.apache.org/docs/0.6/maintenance/configurations/#coreoptions)
|
## 示例
@@ -54,6 +57,40 @@ sink {
}
```
+### 指定paimon的写属性的单表
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role"]
+ }
+}
+
+sink {
+ Paimon {
+ catalog_name="seatunnel_test"
+ warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="seatunnel"
+ table="role"
+ paimon.table.write-props = {
+ bucket = 2
+ file.format = "parquet"
+ }
+ paimon.table.partition-keys = "dt"
+ paimon.table.primary-keys = "pk_id,dt"
+ }
+}
+```
+
### 多表
```hocon
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
index bac7f7b7a8..e043c0ecd7 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
@@ -133,6 +133,11 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
Integer columnLength =
columnConfig.get(
TableSchemaOptions.ColumnOptions.COLUMN_LENGTH);
+
+ Integer columnScale =
+ columnConfig.get(
+
TableSchemaOptions.ColumnOptions.COLUMN_SCALE);
+
Boolean nullable =
columnConfig.get(TableSchemaOptions.ColumnOptions.NULLABLE);
Object defaultValue =
@@ -143,7 +148,8 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
return PhysicalColumn.of(
name,
seaTunnelDataType,
- columnLength,
+ Long.valueOf(columnLength),
+ columnScale,
nullable,
defaultValue,
comment);
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
index 492fe1909c..9ede187ea9 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
@@ -86,6 +86,12 @@ public class TableSchemaOptions {
.noDefaultValue()
.withDescription("SeaTunnel Schema Column Type");
+ public static final Option<Integer> COLUMN_SCALE =
+ Options.key("columnScale")
+ .intType()
+ .noDefaultValue()
+ .withDescription("SeaTunnel Schema Column scale");
+
public static final Option<Integer> COLUMN_LENGTH =
Options.key("columnLength")
.intType()
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index 7312ed28b0..8d3395af3c 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -17,10 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -28,11 +27,11 @@ import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistExce
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
@@ -48,14 +47,14 @@ public class PaimonCatalog implements Catalog, PaimonTable {
private static final String DEFAULT_DATABASE = "default";
private String catalogName;
- private ReadonlyConfig readonlyConfig;
+ private PaimonSinkConfig paimonSinkConfig;
private PaimonCatalogLoader paimonCatalogLoader;
private org.apache.paimon.catalog.Catalog catalog;
- public PaimonCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
- this.readonlyConfig = readonlyConfig;
+ public PaimonCatalog(String catalogName, PaimonSinkConfig
paimonSinkConfig) {
+ this.paimonSinkConfig = paimonSinkConfig;
this.catalogName = catalogName;
- this.paimonCatalogLoader = new PaimonCatalogLoader(new
PaimonSinkConfig(readonlyConfig));
+ this.paimonCatalogLoader = new PaimonCatalogLoader(paimonSinkConfig);
}
@Override
@@ -135,10 +134,9 @@ public class PaimonCatalog implements Catalog, PaimonTable
{
public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
try {
- catalog.createTable(
- toIdentifier(tablePath),
- SchemaUtil.toPaimonSchema(table.getTableSchema()),
- ignoreIfExists);
+ Schema paimonSchema =
+ SchemaUtil.toPaimonSchema(table.getTableSchema(),
this.paimonSinkConfig);
+ catalog.createTable(toIdentifier(tablePath), paimonSchema,
ignoreIfExists);
} catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException
e) {
throw new TableAlreadyExistException(this.catalogName, tablePath);
} catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException
e) {
@@ -183,18 +181,8 @@ public class PaimonCatalog implements Catalog, PaimonTable
{
TableSchema.Builder builder = TableSchema.builder();
dataFields.forEach(
dataField -> {
- String name = dataField.name();
- SeaTunnelDataType<?> seaTunnelType =
- SchemaUtil.toSeaTunnelType(dataField.type());
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- name,
- seaTunnelType,
- (Long) null,
- true,
- null,
- dataField.description());
- builder.column(physicalColumn);
+ Column column =
SchemaUtil.toSeaTunnelType(dataField.type());
+ builder.column(column);
});
List<String> partitionKeys = schema.partitionKeys();
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
index 4d94f385d9..b8c8eb1088 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
@@ -30,7 +30,7 @@ import com.google.auto.service.AutoService;
public class PaimonCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig
readonlyConfig) {
- return new PaimonCatalog(catalogName, readonlyConfig);
+ return new PaimonCatalog(catalogName, new
PaimonSinkConfig(readonlyConfig));
}
@Override
@@ -48,7 +48,10 @@ public class PaimonCatalogFactory implements CatalogFactory {
.optional(
PaimonSinkConfig.HDFS_SITE_PATH,
PaimonSinkConfig.SCHEMA_SAVE_MODE,
- PaimonSinkConfig.DATA_SAVE_MODE)
+ PaimonSinkConfig.DATA_SAVE_MODE,
+ PaimonSinkConfig.PRIMARY_KEYS,
+ PaimonSinkConfig.PARTITION_KEYS,
+ PaimonSinkConfig.WRITE_PROPS)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index 589fd94816..d369c74bca 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -17,17 +17,26 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.config;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
+
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
-import lombok.Getter;
+import lombok.Data;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static java.util.stream.Collectors.toList;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
-@Getter
+@Data
public class PaimonSinkConfig extends PaimonConfig {
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
@@ -41,6 +50,27 @@ public class PaimonSinkConfig extends PaimonConfig {
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("data_save_mode");
+ public static final Option<String> PRIMARY_KEYS =
+ Options.key("paimon.table.primary-keys")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Default comma-separated list of columns that
identify a row in tables (primary key)");
+
+ public static final Option<String> PARTITION_KEYS =
+ Options.key("paimon.table.partition-keys")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Default comma-separated list of partition fields
to use when creating tables.");
+
+ public static final Option<Map<String, String>> WRITE_PROPS =
+ Options.key("paimon.table.write-props")
+ .mapType()
+ .defaultValue(new HashMap<>())
+ .withDescription(
+ "Properties passed through to paimon table
initialization, such as 'file.format',
'bucket'(org.apache.paimon.CoreOptions)");
+
private String catalogName;
private String warehouse;
private String namespace;
@@ -48,6 +78,10 @@ public class PaimonSinkConfig extends PaimonConfig {
private String hdfsSitePath;
private SchemaSaveMode schemaSaveMode;
private DataSaveMode dataSaveMode;
+ private Integer bucket;
+ private List<String> primaryKeys;
+ private List<String> partitionKeys;
+ private Map<String, String> writeProps;
public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
this.catalogName =
checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME));
@@ -57,10 +91,21 @@ public class PaimonSinkConfig extends PaimonConfig {
this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH);
this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
+ this.primaryKeys = stringToList(readonlyConfig.get(PRIMARY_KEYS), ",");
+ this.partitionKeys = stringToList(readonlyConfig.get(PARTITION_KEYS),
",");
+ this.writeProps = readonlyConfig.get(WRITE_PROPS);
}
protected <T> T checkArgumentNotNull(T argument) {
checkNotNull(argument);
return argument;
}
+
+ @VisibleForTesting
+ public static List<String> stringToList(String value, String regex) {
+ if (value == null || value.isEmpty()) {
+ return ImmutableList.of();
+ }
+ return
Arrays.stream(value.split(regex)).map(String::trim).collect(toList());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
index 1f8b1cff32..cbf512f61d 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.data;
import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
@@ -40,11 +39,11 @@ public class PaimonTypeMapper implements
TypeConverter<DataType> {
@Override
public Column convert(DataType dataType) {
- return
PhysicalColumn.builder().dataType(RowTypeConverter.convert(dataType)).build();
+ return RowTypeConverter.convert(dataType);
}
@Override
public DataType reconvert(Column column) {
- return RowTypeConverter.reconvert(column.getDataType());
+ return RowTypeConverter.reconvert(column);
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
index c0b4d997ea..2f5b316dd5 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
@@ -53,7 +53,10 @@ public class PaimonSinkFactory implements TableSinkFactory {
.optional(
PaimonConfig.HDFS_SITE_PATH,
PaimonSinkConfig.SCHEMA_SAVE_MODE,
- PaimonSinkConfig.DATA_SAVE_MODE)
+ PaimonSinkConfig.DATA_SAVE_MODE,
+ PaimonSinkConfig.PRIMARY_KEYS,
+ PaimonSinkConfig.PARTITION_KEYS,
+ PaimonSinkConfig.WRITE_PROPS)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 7b2e8327a9..a858c3ee7f 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -30,6 +30,8 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
@@ -74,6 +76,8 @@ public class PaimonSinkWriter
private final JobContext jobContext;
+ private TableSchema tableSchema;
+
public PaimonSinkWriter(
Context context,
Table table,
@@ -88,6 +92,7 @@ public class PaimonSinkWriter
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
this.jobContext = jobContext;
+ this.tableSchema = ((FileStoreTable) table).schema();
}
public PaimonSinkWriter(
@@ -96,15 +101,7 @@ public class PaimonSinkWriter
SeaTunnelRowType seaTunnelRowType,
List<PaimonSinkState> states,
JobContext jobContext) {
- this.table = table;
- this.tableWriteBuilder =
- JobContextUtil.isBatchJob(jobContext)
- ? this.table.newBatchWriteBuilder().withOverwrite()
- : this.table.newStreamWriteBuilder();
- this.tableWrite = tableWriteBuilder.newWrite();
- this.seaTunnelRowType = seaTunnelRowType;
- this.context = context;
- this.jobContext = jobContext;
+ this(context, table, seaTunnelRowType, jobContext);
if (Objects.isNull(states) || states.isEmpty()) {
return;
}
@@ -132,7 +129,7 @@ public class PaimonSinkWriter
@Override
public void write(SeaTunnelRow element) throws IOException {
- InternalRow rowData = RowConverter.convert(element, seaTunnelRowType);
+ InternalRow rowData = RowConverter.reconvert(element,
seaTunnelRowType, tableSchema);
try {
tableWrite.write(rowData);
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 6b9a6bf01c..fe1c24da80 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -41,15 +41,19 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** The converter for converting {@link InternalRow} and {@link SeaTunnelRow}
*/
@@ -129,7 +133,7 @@ public class RowConverter {
* @param dataType SeaTunnel array data type
* @return Paimon array object {@link BinaryArray}
*/
- public static BinaryArray convert(Object array, SeaTunnelDataType<?>
dataType) {
+ public static BinaryArray reconvert(Object array, SeaTunnelDataType<?>
dataType) {
int length = ((Object[]) array).length;
BinaryArray binaryArray = new BinaryArray();
BinaryArrayWriter binaryArrayWriter;
@@ -327,10 +331,12 @@ public class RowConverter {
*
* @param seaTunnelRow SeaTunnel row object
* @param seaTunnelRowType SeaTunnel row type
+ * @param tableSchema Paimon table schema
* @return Paimon row object
*/
- public static InternalRow convert(
- SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType) {
+ public static InternalRow reconvert(
+ SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType,
TableSchema tableSchema) {
+ List<DataField> fields = tableSchema.fields();
BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields());
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
// Convert SeaTunnel RowKind to Paimon RowKind
@@ -390,8 +396,12 @@ public class RowConverter {
i, Timestamp.fromLocalDateTime(date.atTime(time)),
3);
break;
case TIMESTAMP:
+ String fieldName = seaTunnelRowType.getFieldName(i);
+ DataField dataField = SchemaUtil.getDataField(fields,
fieldName);
+ int precision = ((TimestampType)
dataField.type()).getPrecision();
LocalDateTime datetime = (LocalDateTime)
seaTunnelRow.getField(i);
- binaryWriter.writeTimestamp(i,
Timestamp.fromLocalDateTime(datetime), 9);
+ binaryWriter.writeTimestamp(
+ i, Timestamp.fromLocalDateTime(datetime),
precision);
break;
case MAP:
MapType<?, ?> mapType = (MapType<?, ?>)
seaTunnelRowType.getFieldType(i);
@@ -404,13 +414,14 @@ public class RowConverter {
Object[] values = field.values().toArray(new Object[0]);
binaryWriter.writeMap(
i,
- BinaryMap.valueOf(convert(keys, keyType),
convert(values, valueType)),
+ BinaryMap.valueOf(
+ reconvert(keys, keyType),
reconvert(values, valueType)),
new InternalMapSerializer(paimonKeyType,
paimonValueType));
break;
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>)
seaTunnelRowType.getFieldType(i);
BinaryArray paimonArray =
- convert(seaTunnelRow.getField(i),
arrayType.getElementType());
+ reconvert(seaTunnelRow.getField(i),
arrayType.getElementType());
binaryWriter.writeArray(
i,
paimonArray,
@@ -420,8 +431,10 @@ public class RowConverter {
case ROW:
SeaTunnelDataType<?> rowType =
seaTunnelRowType.getFieldType(i);
Object row = seaTunnelRow.getField(i);
- InternalRow paimonRow = convert((SeaTunnelRow) row,
(SeaTunnelRowType) rowType);
- RowType paimonRowType =
RowTypeConverter.reconvert((SeaTunnelRowType) rowType);
+ InternalRow paimonRow =
+ reconvert((SeaTunnelRow) row, (SeaTunnelRowType)
rowType, tableSchema);
+ RowType paimonRowType =
+ RowTypeConverter.reconvert((SeaTunnelRowType)
rowType, tableSchema);
binaryWriter.writeRow(i, paimonRow, new
InternalRowSerializer(paimonRowType));
break;
default:
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
index 16863ebff5..5a4f5dbe8a 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
@@ -25,6 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
@@ -33,6 +36,7 @@ import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeDefaultVisitor;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
@@ -50,6 +54,8 @@ import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
/** The converter for converting {@link RowType} and {@link SeaTunnelRowType}
*/
public class RowTypeConverter {
@@ -77,16 +83,21 @@ public class RowTypeConverter {
* @param dataType Paimon data type
* @return SeaTunnel data type {@link SeaTunnelDataType}
*/
- public static SeaTunnelDataType convert(DataType dataType) {
+ public static Column convert(DataType dataType) {
+ PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder =
PhysicalColumn.builder();
SeaTunnelDataType<?> seaTunnelDataType;
PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor =
PaimonToSeaTunnelTypeVisitor.INSTANCE;
switch (dataType.getTypeRoot()) {
case CHAR:
- seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((CharType) dataType);
+ CharType charType = (CharType) dataType;
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit(charType);
+ physicalColumnBuilder.columnLength((long)
charType.getLength());
break;
case VARCHAR:
- seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((VarCharType) dataType);
+ VarCharType varCharType = (VarCharType) dataType;
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit(varCharType);
+ physicalColumnBuilder.columnLength((long)
varCharType.getLength());
break;
case BOOLEAN:
seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((BooleanType) dataType);
@@ -95,10 +106,15 @@ public class RowTypeConverter {
seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((BinaryType) dataType);
break;
case VARBINARY:
- seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((VarBinaryType) dataType);
+ VarBinaryType varBinaryType = (VarBinaryType) dataType;
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit(varBinaryType);
+ physicalColumnBuilder.columnLength((long)
varBinaryType.getLength());
break;
case DECIMAL:
- seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((DecimalType) dataType);
+ DecimalType decimalType = (DecimalType) dataType;
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit(decimalType);
+ physicalColumnBuilder.columnLength((long)
decimalType.getPrecision());
+ physicalColumnBuilder.scale(decimalType.getScale());
break;
case TINYINT:
seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((TinyIntType) dataType);
@@ -122,14 +138,21 @@ public class RowTypeConverter {
seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((DateType) dataType);
break;
case TIME_WITHOUT_TIME_ZONE:
- seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((TimeType) dataType);
+ TimeType timeType = (TimeType) dataType;
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit(timeType);
+ physicalColumnBuilder.scale(timeType.getPrecision());
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
- seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((TimestampType) dataType);
+ TimestampType timestampType = (TimestampType) dataType;
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit(timestampType);
+ physicalColumnBuilder.scale(timestampType.getPrecision());
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ LocalZonedTimestampType localZonedTimestampType =
+ (LocalZonedTimestampType) dataType;
seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((LocalZonedTimestampType) dataType);
+
physicalColumnBuilder.scale(localZonedTimestampType.getPrecision());
break;
case ARRAY:
seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((ArrayType) dataType);
@@ -148,7 +171,7 @@ public class RowTypeConverter {
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
errorMsg);
}
- return seaTunnelDataType;
+ return physicalColumnBuilder.dataType(seaTunnelDataType).build();
}
/**
@@ -157,20 +180,39 @@ public class RowTypeConverter {
* @param seaTunnelRowType SeaTunnel row type {@link SeaTunnelRowType}
* @return Paimon row type {@link RowType}
*/
- public static RowType reconvert(SeaTunnelRowType seaTunnelRowType) {
+ public static RowType reconvert(SeaTunnelRowType seaTunnelRowType,
TableSchema tableSchema) {
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ List<DataField> fields = tableSchema.fields();
DataType[] dataTypes =
Arrays.stream(fieldTypes)
.map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit)
.toArray(DataType[]::new);
DataField[] dataFields = new DataField[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
- DataField dataField = new DataField(i,
seaTunnelRowType.getFieldName(i), dataTypes[i]);
+ DataType dataType = dataTypes[i];
+ DataTypeRoot typeRoot = dataType.getTypeRoot();
+ String fieldName = seaTunnelRowType.getFieldName(i);
+ if (typeRoot.equals(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+ ||
typeRoot.equals(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
+ DataField dataField = SchemaUtil.getDataField(fields,
fieldName);
+ dataType = new TimestampType(((TimestampType)
dataField.type()).getPrecision());
+ }
+ DataField dataField = new DataField(i, fieldName, dataType);
dataFields[i] = dataField;
}
return DataTypes.ROW(dataFields);
}
+ /**
+ * Mapping SeaTunnel data type of column {@link Column} to Paimon data
type {@link DataType}
+ *
+ * @param column SeaTunnel data type {@link Column}
+ * @return Paimon data type {@link DataType}
+ */
+ public static DataType reconvert(Column column) {
+ return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(column);
+ }
+
/**
* Mapping SeaTunnel data type {@link SeaTunnelDataType} to Paimon data
type {@link DataType}
*
@@ -192,6 +234,21 @@ public class RowTypeConverter {
private SeaTunnelTypeToPaimonVisitor() {}
+ public DataType visit(Column column) {
+ SeaTunnelDataType<?> dataType = column.getDataType();
+ Integer scale = column.getScale();
+ switch (dataType.getSqlType()) {
+ case TIMESTAMP:
+ return DataTypes.TIMESTAMP(
+ Objects.isNull(scale) ?
TimestampType.DEFAULT_PRECISION : scale);
+ case TIME:
+ return DataTypes.TIME(
+ Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION
: scale);
+ default:
+ return visit(dataType);
+ }
+ }
+
public DataType visit(SeaTunnelDataType<?> dataType) {
switch (dataType.getSqlType()) {
case TINYINT:
@@ -220,8 +277,10 @@ public class RowTypeConverter {
return DataTypes.BOOLEAN();
case DATE:
return DataTypes.DATE();
+ case TIME:
+ return DataTypes.TIME(TimeType.MAX_PRECISION);
case TIMESTAMP:
- return DataTypes.TIMESTAMP(6);
+ return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
case MAP:
SeaTunnelDataType<?> keyType =
((org.apache.seatunnel.api.table.type.MapType<?,
?>) dataType)
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index c03a77149c..65129dc8b7 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -18,14 +18,16 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
/** The util seatunnel schema to paimon schema */
@@ -35,20 +37,39 @@ public class SchemaUtil {
return PaimonTypeMapper.INSTANCE.reconvert(column);
}
- public static Schema toPaimonSchema(TableSchema tableSchema) {
+ public static Schema toPaimonSchema(
+ TableSchema tableSchema, PaimonSinkConfig paimonSinkConfig) {
Schema.Builder paiSchemaBuilder = Schema.newBuilder();
for (int i = 0; i < tableSchema.getColumns().size(); i++) {
Column column = tableSchema.getColumns().get(i);
paiSchemaBuilder.column(column.getName(), toPaimonType(column));
}
- PrimaryKey primaryKey = tableSchema.getPrimaryKey();
- if (Objects.nonNull(primaryKey) && primaryKey.getColumnNames().size()
> 0) {
- paiSchemaBuilder.primaryKey(primaryKey.getColumnNames());
+ List<String> primaryKeys = paimonSinkConfig.getPrimaryKeys();
+ if (primaryKeys.isEmpty() &&
Objects.nonNull(tableSchema.getPrimaryKey())) {
+ primaryKeys = tableSchema.getPrimaryKey().getColumnNames();
+ }
+ if (!primaryKeys.isEmpty()) {
+ paiSchemaBuilder.primaryKey(primaryKeys);
+ }
+ List<String> partitionKeys = paimonSinkConfig.getPartitionKeys();
+ if (!partitionKeys.isEmpty()) {
+ paiSchemaBuilder.partitionKeys(partitionKeys);
+ }
+ Map<String, String> writeProps = paimonSinkConfig.getWriteProps();
+ if (!writeProps.isEmpty()) {
+ paiSchemaBuilder.options(writeProps);
}
return paiSchemaBuilder.build();
}
- public static SeaTunnelDataType<?> toSeaTunnelType(DataType dataType) {
- return PaimonTypeMapper.INSTANCE.convert(dataType).getDataType();
+ public static Column toSeaTunnelType(DataType dataType) {
+ return PaimonTypeMapper.INSTANCE.convert(dataType);
+ }
+
+ public static DataField getDataField(List<DataField> fields, String
fieldName) {
+ return fields.parallelStream()
+ .filter(field -> field.name().equals(fieldName))
+ .findFirst()
+ .get();
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
index fcb9090a57..eec61aea6d 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
@@ -38,7 +38,10 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -48,7 +51,10 @@ import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** Unit tests for {@link RowConverter} */
@@ -60,6 +66,45 @@ public class RowConverterTest {
private SeaTunnelRowType seaTunnelRowType;
+ private TableSchema tableSchema;
+
+ public static final RowType DEFAULT_ROW_TYPE =
+ RowType.of(
+ new DataType[] {
+ DataTypes.TINYINT(),
+ DataTypes.SMALLINT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.FLOAT(),
+ DataTypes.DOUBLE(),
+ DataTypes.DECIMAL(10, 10),
+ DataTypes.STRING(),
+ DataTypes.BYTES(),
+ DataTypes.BOOLEAN(),
+ DataTypes.DATE(),
+ DataTypes.TIMESTAMP(),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
+ DataTypes.ARRAY(DataTypes.STRING())
+ },
+ new String[] {
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_string",
+ "c_bytes",
+ "c_boolean",
+ "c_date",
+ "c_timestamp",
+ "c_map",
+ "c_array"
+ });
+
+ public static final List<String> KEY_NAME_LIST =
Arrays.asList("c_tinyint");
+
@BeforeEach
public void before() {
seaTunnelRowType =
@@ -171,11 +216,21 @@ public class RowConverterTest {
binaryRowWriter.writeArray(
13, binaryArray2, new
InternalArraySerializer(DataTypes.STRING()));
internalRow = binaryRow;
+
+ tableSchema =
+ new TableSchema(
+ 0,
+ TableSchema.newFields(DEFAULT_ROW_TYPE),
+ DEFAULT_ROW_TYPE.getFieldCount(),
+ Collections.EMPTY_LIST,
+ KEY_NAME_LIST,
+ Collections.EMPTY_MAP,
+ "");
}
@Test
public void seaTunnelToPaimon() {
- InternalRow convert = RowConverter.convert(seaTunnelRow,
seaTunnelRowType);
+ InternalRow convert = RowConverter.reconvert(seaTunnelRow,
seaTunnelRowType, tableSchema);
Assertions.assertEquals(convert, internalRow);
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
index f828be0650..5e614aeda5 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
@@ -26,7 +26,9 @@ import
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -34,12 +36,55 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
public class RowTypeConverterTest {
private SeaTunnelRowType seaTunnelRowType;
private RowType rowType;
+ private TableSchema tableSchema;
+
+ public static final RowType DEFAULT_ROW_TYPE =
+ RowType.of(
+ new DataType[] {
+ DataTypes.TINYINT(),
+ DataTypes.SMALLINT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.FLOAT(),
+ DataTypes.DOUBLE(),
+ DataTypes.DECIMAL(10, 10),
+ DataTypes.STRING(),
+ DataTypes.BYTES(),
+ DataTypes.BOOLEAN(),
+ DataTypes.DATE(),
+ DataTypes.TIMESTAMP(),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
+ DataTypes.ARRAY(DataTypes.STRING())
+ },
+ new String[] {
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_string",
+ "c_bytes",
+ "c_boolean",
+ "c_date",
+ "c_timestamp",
+ "c_map",
+ "c_array"
+ });
+
+ public static final List<String> KEY_NAME_LIST =
Arrays.asList("c_tinyint");
+
@BeforeEach
public void before() {
seaTunnelRowType =
@@ -93,6 +138,16 @@ public class RowTypeConverterTest {
new DataField(
12, "c_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING())),
new DataField(13, "c_array",
DataTypes.ARRAY(DataTypes.STRING())));
+
+ tableSchema =
+ new TableSchema(
+ 0,
+ TableSchema.newFields(DEFAULT_ROW_TYPE),
+ DEFAULT_ROW_TYPE.getFieldCount(),
+ Collections.EMPTY_LIST,
+ KEY_NAME_LIST,
+ Collections.EMPTY_MAP,
+ "");
}
@Test
@@ -103,7 +158,7 @@ public class RowTypeConverterTest {
@Test
public void seaTunnelToPaimon() {
- RowType convert = RowTypeConverter.reconvert(seaTunnelRowType);
+ RowType convert = RowTypeConverter.reconvert(seaTunnelRowType,
tableSchema);
Assertions.assertEquals(convert, rowType);
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index a960f7d4d3..d2d88c1dbc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -25,17 +25,23 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.TimestampType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -52,6 +58,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.awaitility.Awaitility.given;
@@ -67,7 +74,6 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements
TestResource {
private static final String NAMESPACE_TAR = "paimon.tar.gz";
private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE +
"/";
private static final String TARGET_TABLE = "st_test";
- private static final String TARGET_DATABASE = "seatunnel_namespace";
private static final String FAKE_TABLE1 = "FakeTable1";
private static final String FAKE_DATABASE1 = "FakeDatabase1";
private static final String FAKE_TABLE2 = "FakeTable1";
@@ -95,7 +101,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
// copy paimon to local
container.executeExtraCommands(containerExtendedFactory);
List<PaimonRecord> paimonRecords =
- loadPaimonData(TARGET_DATABASE,
TARGET_TABLE);
+ loadPaimonData("seatunnel_namespace1",
TARGET_TABLE);
Assertions.assertEquals(2, paimonRecords.size());
paimonRecords.forEach(
paimonRecord -> {
@@ -107,8 +113,6 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
}
});
});
-
- cleanPaimonTable(container);
}
@TestTemplate
@@ -152,18 +156,221 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
}
});
});
+ }
+
+ @TestTemplate
+ public void testFakeCDCSinkPaimonWithMultipleBucket(TestContainer
container) throws Exception {
+ Container.ExecResult execResult =
container.executeJob("/fake_cdc_sink_paimon_case3.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ Table table = getTable("seatunnel_namespace3",
TARGET_TABLE);
+ String bucket =
table.options().get(CoreOptions.BUCKET.key());
+
Assertions.assertTrue(StringUtils.isNoneBlank(bucket));
+ Assertions.assertEquals(2,
Integer.valueOf(bucket));
+ List<PaimonRecord> paimonRecords =
+ loadPaimonData("seatunnel_namespace3",
TARGET_TABLE);
+ Assertions.assertEquals(2, paimonRecords.size());
+ paimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 1) {
+ Assertions.assertEquals("A_1",
paimonRecord.getName());
+ }
+ if (paimonRecord.getPkId() == 3) {
+ Assertions.assertEquals("C",
paimonRecord.getName());
+ }
+ });
+ });
+ }
+
+ @TestTemplate
+ public void testFakeCDCSinkPaimonWithPartition(TestContainer container)
throws Exception {
+ Container.ExecResult execResult =
container.executeJob("/fake_cdc_sink_paimon_case4.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ Table table = getTable("seatunnel_namespace4",
TARGET_TABLE);
+ List<String> partitionKeys = table.partitionKeys();
+ List<String> primaryKeys = table.primaryKeys();
+
Assertions.assertTrue(partitionKeys.contains("dt"));
+ Assertions.assertEquals(2, primaryKeys.size());
+
Assertions.assertTrue(primaryKeys.contains("pk_id"));
+ Assertions.assertTrue(primaryKeys.contains("dt"));
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ TableRead tableRead = readBuilder.newRead();
+ List<PaimonRecord> result = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
tableRead.createReader(plan)) {
+ reader.forEachRemaining(
+ row -> {
+ result.add(
+ new PaimonRecord(
+ row.getLong(0),
+
row.getString(1).toString(),
+
row.getString(2).toString()));
+ log.info(
+ "key_id:"
+ + row.getLong(0)
+ + ", name:"
+ + row.getString(1)
+ + ", dt:"
+ +
row.getString(2));
+ });
+ }
+ Assertions.assertEquals(2, result.size());
+ List<PaimonRecord> filterRecords =
+ result.stream()
+ .filter(record -> record.pkId == 1)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(1, filterRecords.size());
+ PaimonRecord paimonRecord = filterRecords.get(0);
+ Assertions.assertEquals("A_1",
paimonRecord.getName());
+ Assertions.assertEquals("2024-03-20",
paimonRecord.getDt());
+ });
+ }
+
+ @TestTemplate
+ public void testFakeCDCSinkPaimonWithParquet(TestContainer container)
throws Exception {
+ Container.ExecResult execResult =
container.executeJob("/fake_cdc_sink_paimon_case5.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ Table table = getTable("seatunnel_namespace5",
TARGET_TABLE);
+ String fileFormat =
table.options().get(CoreOptions.FILE_FORMAT.key());
+
Assertions.assertTrue(StringUtils.isNoneBlank(fileFormat));
+ Assertions.assertEquals("parquet", fileFormat);
+ List<PaimonRecord> paimonRecords =
+ loadPaimonData("seatunnel_namespace5",
TARGET_TABLE);
+ Assertions.assertEquals(2, paimonRecords.size());
+ paimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 1) {
+ Assertions.assertEquals("A_1",
paimonRecord.getName());
+ }
+ if (paimonRecord.getPkId() == 3) {
+ Assertions.assertEquals("C",
paimonRecord.getName());
+ }
+ });
+ });
+ }
- cleanPaimonTable(container);
+ @TestTemplate
+ public void testFakeCDCSinkPaimonWithAvro(TestContainer container) throws
Exception {
+ Container.ExecResult execResult =
container.executeJob("/fake_cdc_sink_paimon_case6.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ Table table = getTable("seatunnel_namespace6",
TARGET_TABLE);
+ String fileFormat =
table.options().get(CoreOptions.FILE_FORMAT.key());
+
Assertions.assertTrue(StringUtils.isNoneBlank(fileFormat));
+ Assertions.assertEquals("avro", fileFormat);
+ List<PaimonRecord> paimonRecords =
+ loadPaimonData("seatunnel_namespace6",
TARGET_TABLE);
+ Assertions.assertEquals(2, paimonRecords.size());
+ paimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 1) {
+ Assertions.assertEquals("A_1",
paimonRecord.getName());
+ }
+ if (paimonRecord.getPkId() == 3) {
+ Assertions.assertEquals("C",
paimonRecord.getName());
+ }
+ });
+ });
}
- protected final ContainerExtendedFactory cleanContainerExtendedFactory =
- genericContainer ->
- genericContainer.execInContainer("sh", "-c", "rm -rf " +
CATALOG_DIR + "**");
+ @TestTemplate
+ public void testFakeCDCSinkPaimonWithTimestampN(TestContainer container)
throws Exception {
+ Container.ExecResult execResult =
container.executeJob("/fake_cdc_sink_paimon_case7.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
- private void cleanPaimonTable(TestContainer container)
- throws IOException, InterruptedException {
- // clean table
- container.executeExtraCommands(cleanContainerExtendedFactory);
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ FileStoreTable table =
+ (FileStoreTable)
getTable("seatunnel_namespace7", TARGET_TABLE);
+ List<DataField> fields = table.schema().fields();
+ for (DataField field : fields) {
+ if (field.name().equalsIgnoreCase("one_time"))
{
+ Assertions.assertEquals(
+ 0, ((TimestampType)
field.type()).getPrecision());
+ }
+ if (field.name().equalsIgnoreCase("two_time"))
{
+ Assertions.assertEquals(
+ 3, ((TimestampType)
field.type()).getPrecision());
+ }
+ if
(field.name().equalsIgnoreCase("three_time")) {
+ Assertions.assertEquals(
+ 6, ((TimestampType)
field.type()).getPrecision());
+ }
+ if
(field.name().equalsIgnoreCase("four_time")) {
+ Assertions.assertEquals(
+ 9, ((TimestampType)
field.type()).getPrecision());
+ }
+ }
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ TableRead tableRead = readBuilder.newRead();
+ List<PaimonRecord> result = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
tableRead.createReader(plan)) {
+ reader.forEachRemaining(
+ row ->
+ result.add(
+ new PaimonRecord(
+ row.getLong(0),
+
row.getString(1).toString(),
+
row.getTimestamp(2, 0),
+
row.getTimestamp(3, 3),
+
row.getTimestamp(4, 6),
+
row.getTimestamp(5, 9))));
+ }
+ Assertions.assertEquals(2, result.size());
+ for (PaimonRecord paimonRecord : result) {
+ Assertions.assertEquals(
+ paimonRecord.oneTime.toString(),
"2024-03-10T10:00:12");
+ Assertions.assertEquals(
+ paimonRecord.twoTime.toString(),
"2024-03-10T10:00:00.123");
+ Assertions.assertEquals(
+ paimonRecord.threeTime.toString(),
+ "2024-03-10T10:00:00.123456");
+ Assertions.assertEquals(
+ paimonRecord.fourTime.toString(),
+ "2024-03-10T10:00:00.123456789");
+ }
+ });
}
protected final ContainerExtendedFactory containerExtendedFactory =
@@ -256,5 +463,34 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
public class PaimonRecord {
private Long pkId;
private String name;
+ private String dt;
+ private Timestamp oneTime;
+ private Timestamp twoTime;
+ private Timestamp threeTime;
+ private Timestamp fourTime;
+
+ public PaimonRecord(Long pkId, String name) {
+ this.pkId = pkId;
+ this.name = name;
+ }
+
+ public PaimonRecord(Long pkId, String name, String dt) {
+ this(pkId, name);
+ this.dt = dt;
+ }
+
+ public PaimonRecord(
+ Long pkId,
+ String name,
+ Timestamp oneTime,
+ Timestamp twoTime,
+ Timestamp threeTime,
+ Timestamp fourTime) {
+ this(pkId, name);
+ this.oneTime = oneTime;
+ this.twoTime = twoTime;
+ this.threeTime = threeTime;
+ this.fourTime = fourTime;
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
index 59e3a0cf72..50ce13aa68 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
@@ -80,7 +80,7 @@ source {
sink {
Paimon {
warehouse = "file:///tmp/paimon"
- database = "seatunnel_namespace"
+ database = "seatunnel_namespace1"
table = "st_test"
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
similarity index 92%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
index 59e3a0cf72..f5db1c8253 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
@@ -60,14 +60,14 @@ source {
{
kind = INSERT
fields = [3, "C", 100]
- }
+ },
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
- fields = [1, "A_1", 100]
+ fields = [1, "A_1", 19]
},
{
kind = DELETE
@@ -77,10 +77,17 @@ source {
}
}
+transform {
+
+}
+
sink {
Paimon {
warehouse = "file:///tmp/paimon"
- database = "seatunnel_namespace"
+ database = "seatunnel_namespace3"
table = "st_test"
+ paimon.table.write-props = {
+ bucket = 2
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
similarity index 72%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
index 59e3a0cf72..9a287a61b1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
@@ -29,58 +29,63 @@ source {
fields {
pk_id = bigint
name = string
- score = int
- }
- primaryKey {
- name = "pk_id"
- columnNames = [pk_id]
+ dt = string
}
}
rows = [
{
kind = INSERT
- fields = [1, "A", 100]
+ fields = [1, "A", "2024-03-19"]
},
{
kind = INSERT
- fields = [2, "B", 100]
+ fields = [2, "B", "2024-03-19"]
},
{
kind = INSERT
- fields = [3, "C", 100]
+ fields = [3, "C", "2024-03-19"]
},
{
kind = INSERT
- fields = [3, "C", 100]
+ fields = [3, "C", "2024-03-19"]
},
{
kind = INSERT
- fields = [3, "C", 100]
+ fields = [3, "C", "2024-03-19"]
},
{
kind = INSERT
- fields = [3, "C", 100]
- }
+ fields = [3, "C", "2024-03-19"]
+ },
{
kind = UPDATE_BEFORE
- fields = [1, "A", 100]
+ fields = [1, "A", "2024-03-19"]
},
{
kind = UPDATE_AFTER
- fields = [1, "A_1", 100]
+ fields = [1, "A_1", "2024-03-20"]
},
{
kind = DELETE
- fields = [2, "B", 100]
+ fields = [2, "B", "2024-03-19"]
}
]
}
}
+transform {
+
+}
+
sink {
Paimon {
warehouse = "file:///tmp/paimon"
- database = "seatunnel_namespace"
+ database = "seatunnel_namespace4"
table = "st_test"
+ paimon.table.write-props = {
+ bucket = 2
+ }
+ paimon.table.partition-keys = "dt"
+ paimon.table.primary-keys = "pk_id,dt"
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
similarity index 91%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
index 59e3a0cf72..65df2115f4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
@@ -60,14 +60,14 @@ source {
{
kind = INSERT
fields = [3, "C", 100]
- }
+ },
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
- fields = [1, "A_1", 100]
+ fields = [1, "A_1", 19]
},
{
kind = DELETE
@@ -77,10 +77,17 @@ source {
}
}
+transform {
+
+}
+
sink {
Paimon {
warehouse = "file:///tmp/paimon"
- database = "seatunnel_namespace"
+ database = "seatunnel_namespace5"
table = "st_test"
+ paimon.table.write-props = {
+ file.format = "parquet"
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
similarity index 91%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
index 59e3a0cf72..102747ef0f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
@@ -60,14 +60,14 @@ source {
{
kind = INSERT
fields = [3, "C", 100]
- }
+ },
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
- fields = [1, "A_1", 100]
+ fields = [1, "A_1", 19]
},
{
kind = DELETE
@@ -77,10 +77,17 @@ source {
}
}
+transform {
+
+}
+
sink {
Paimon {
warehouse = "file:///tmp/paimon"
- database = "seatunnel_namespace"
+ database = "seatunnel_namespace6"
table = "st_test"
+ paimon.table.write-props = {
+ file.format = "avro"
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
new file mode 100644
index 0000000000..6578c72358
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
@@ -0,0 +1,127 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ columns = [
+ {
+ name = pk_id
+ type = bigint
+ nullable = false
+ comment = "primary key id"
+ },
+ {
+ name = name
+ type = "string"
+ nullable = true
+ comment = "name"
+ },
+ {
+ name = one_time
+ type = timestamp
+ nullable = false
+ comment = "one time"
+ columnScale = 0
+ },
+ {
+ name = two_time
+ type = timestamp
+ nullable = false
+ comment = "two time"
+ columnScale = 3
+ },
+ {
+ name = three_time
+ type = timestamp
+ nullable = false
+ comment = "three time"
+ columnScale = 6
+ },
+ {
+ name = four_time
+ type = timestamp
+ nullable = false
+ comment = "four time"
+ columnScale = 9
+ }
+ ]
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ }
+ ]
+ }
+}
+
+transform {
+
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "seatunnel_namespace7"
+ table = "st_test"
+ }
+}