This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2b1234c7ae [Feature][Paimon] Support specify paimon table write 
properties, partition keys and primary keys (#6535)
2b1234c7ae is described below

commit 2b1234c7ae885bb26540d3e98d75c623db5d556d
Author: dailai <[email protected]>
AuthorDate: Wed Mar 27 10:23:50 2024 +0800

    [Feature][Paimon] Support specify paimon table write properties, partition 
keys and primary keys (#6535)
---
 docs/en/concept/schema-feature.md                  |   1 +
 docs/en/connector-v2/sink/Paimon.md                |  53 ++++-
 docs/zh/concept/schema-feature.md                  |   1 +
 docs/zh/connector-v2/sink/Paimon.md                |  53 ++++-
 .../table/catalog/schema/ReadonlyConfigParser.java |   8 +-
 .../table/catalog/schema/TableSchemaOptions.java   |   6 +
 .../seatunnel/paimon/catalog/PaimonCatalog.java    |  34 +--
 .../paimon/catalog/PaimonCatalogFactory.java       |   7 +-
 .../seatunnel/paimon/config/PaimonSinkConfig.java  |  49 +++-
 .../seatunnel/paimon/data/PaimonTypeMapper.java    |   5 +-
 .../seatunnel/paimon/sink/PaimonSinkFactory.java   |   5 +-
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    |  17 +-
 .../seatunnel/paimon/utils/RowConverter.java       |  29 ++-
 .../seatunnel/paimon/utils/RowTypeConverter.java   |  81 ++++++-
 .../seatunnel/paimon/utils/SchemaUtil.java         |  37 ++-
 .../seatunnel/paimon/utils/RowConverterTest.java   |  57 ++++-
 .../paimon/utils/RowTypeConverterTest.java         |  57 ++++-
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      | 260 ++++++++++++++++++++-
 .../test/resources/fake_cdc_sink_paimon_case1.conf |   2 +-
 ..._case1.conf => fake_cdc_sink_paimon_case3.conf} |  13 +-
 ..._case1.conf => fake_cdc_sink_paimon_case4.conf} |  37 +--
 ..._case1.conf => fake_cdc_sink_paimon_case5.conf} |  13 +-
 ..._case1.conf => fake_cdc_sink_paimon_case6.conf} |  13 +-
 .../test/resources/fake_cdc_sink_paimon_case7.conf | 127 ++++++++++
 24 files changed, 840 insertions(+), 125 deletions(-)

diff --git a/docs/en/concept/schema-feature.md 
b/docs/en/concept/schema-feature.md
index 15f8186cce..9ae2c3d39e 100644
--- a/docs/en/concept/schema-feature.md
+++ b/docs/en/concept/schema-feature.md
@@ -64,6 +64,7 @@ columns = [
 | type         | Yes      | -             | The data type of the column        
                                              |
 | nullable     | No       | true          | If the column can be nullable      
                                              |
 | columnLength | No       | 0             | The length of the column which 
will be useful when you need to define the length |
+| columnScale  | No       | -             | The scale of the column which will 
be useful when you need to define the scale   |
 | defaultValue | No       | null          | The default value of the column    
                                              |
 | comment      | No       | null          | The comment of the column          
                                              |
 
diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index 5e9d3c431f..707a0dc0db 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -12,14 +12,17 @@ Sink connector for Apache Paimon. It can support cdc mode 
、auto create table.
 
 ## Options
 
-|       name       |  type  | required |        default value         |        
   Description           |
-|------------------|--------|----------|------------------------------|---------------------------------|
-| warehouse        | String | Yes      | -                            | Paimon 
warehouse path           |
-| database         | String | Yes      | -                            | The 
database you want to access |
-| table            | String | Yes      | -                            | The 
table you want to access    |
-| hdfs_site_path   | String | No       | -                            |        
                         |
-| schema_save_mode | Enum   | no       | CREATE_SCHEMA_WHEN_NOT_EXIST | The 
schema save mode            |
-| data_save_mode   | Enum   | no       | APPEND_DATA                  | The 
data save mode              |
+|            name             |  type  | required |        default value       
  |                                                                           
Description                                                                     
       |
+|-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| warehouse                   | String | Yes      | -                          
  | Paimon warehouse path                                                       
                                                                                
     |
+| database                    | String | Yes      | -                          
  | The database you want to access                                             
                                                                                
     |
+| table                       | String | Yes      | -                          
  | The table you want to access                                                
                                                                                
     |
+| hdfs_site_path              | String | No       | -                          
  |                                                                             
                                                                                
     |
+| schema_save_mode            | Enum   | No       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode                             
                                                                                
                                |
+| data_save_mode              | Enum   | No       | APPEND_DATA                
  | The data save mode                                                          
                                                                                
     |
+| paimon.table.primary-keys   | String | No       | -                          
  | Default comma-separated list of columns (primary key) that identify a row 
in tables.(Notice: The partition field needs to be included in the primary key 
fields) |
+| paimon.table.partition-keys | String | No       | -                          
  | Default comma-separated list of partition fields to use when creating 
tables.                                                                         
           |
+| paimon.table.write-props    | Map    | No       | -                          
  | Properties passed through to paimon table initialization, 
[reference](https://paimon.apache.org/docs/0.6/maintenance/configurations/#coreoptions).
               |
 
 ## Examples
 
@@ -54,6 +57,40 @@ sink {
 }
 ```
 
+### Single table with write props of paimon
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  Mysql-CDC {
+    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+    username = "root"
+    password = "******"
+    table-names = ["seatunnel.role"]
+  }
+}
+
+sink {
+  Paimon {
+    catalog_name="seatunnel_test"
+    warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+    database="seatunnel"
+    table="role"
+    paimon.table.write-props = {
+        bucket = 2
+        file.format = "parquet"
+    }
+    paimon.table.partition-keys = "dt"
+    paimon.table.primary-keys = "pk_id,dt"
+  }
+}
+```
+
 ### Multiple table
 
 ```hocon
diff --git a/docs/zh/concept/schema-feature.md 
b/docs/zh/concept/schema-feature.md
index cc69b6d83e..adb4089298 100644
--- a/docs/zh/concept/schema-feature.md
+++ b/docs/zh/concept/schema-feature.md
@@ -64,6 +64,7 @@ columns = [
 | type         | Yes  | -    | 列的数据类型             |
 | nullable     | No   | true | 列是否可空              |
 | columnLength | No   | 0    | 列的长度,当您需要定义长度时将很有用 |
+| columnScale  | No   | -    | 列的精度,当您需要定义精度时将很有用 |
 | defaultValue | No   | null | 列的默认值              |
 | comment      | No   | null | 列的注释               |
 
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
index b1b4baef9b..306bc12b56 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -12,14 +12,17 @@ Apache Paimon数据连接器。支持cdc写以及自动建表。
 
 ## 连接器选项
 
-|        名称        |   类型   | 是否必须 |             默认值              |         描述 
        |
-|------------------|--------|------|------------------------------|--------------------|
-| warehouse        | String | Yes  | -                            | Paimon 
warehouse路径 |
-| database         | String | Yes  | -                            | 数据库名称      
        |
-| table            | String | Yes  | -                            | 表名         
        |
-| hdfs_site_path   | String | No   | -                            |            
        |
-| schema_save_mode | Enum   | no   | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式 
        |
-| data_save_mode   | Enum   | no   | APPEND_DATA                  | 数据保存模式     
        |
+|             名称              | 类型  | 是否必须 |             默认值              |    
                                            描述                                  
               |
+|-----------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------|
+| warehouse                   | 字符串 | 是    | -                            | 
Paimon warehouse路径                                                              
                  |
+| database                    | 字符串 | 是    | -                            | 
数据库名称                                                                           
                  |
+| table                       | 字符串 | 是    | -                            | 表名 
                                                                                
               |
+| hdfs_site_path              | 字符串 | 否    | -                            |    
                                                                                
               |
+| schema_save_mode            | 枚举  | 否    | CREATE_SCHEMA_WHEN_NOT_EXIST | 
Schema保存模式                                                                      
                  |
+| data_save_mode              | 枚举  | 否    | APPEND_DATA                  | 
数据保存模式                                                                          
                  |
+| paimon.table.primary-keys   | 字符串 | 否    | -                            | 
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)                                            
                  |
+| paimon.table.partition-keys | 字符串 | 否    | -                            | 
分区字段列表,多字段使用逗号分隔                                                                
                  |
+| paimon.table.write-props    | Map | 否    | -                            | 
Paimon表初始化指定的属性, 
[参考](https://paimon.apache.org/docs/0.6/maintenance/configurations/#coreoptions)
 |
 
 ## 示例
 
@@ -54,6 +57,40 @@ sink {
 }
 ```
 
+### 指定paimon的写属性的单表
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  Mysql-CDC {
+    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+    username = "root"
+    password = "******"
+    table-names = ["seatunnel.role"]
+  }
+}
+
+sink {
+  Paimon {
+    catalog_name="seatunnel_test"
+    warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+    database="seatunnel"
+    table="role"
+    paimon.table.write-props = {
+        bucket = 2
+        file.format = "parquet"
+    }
+    paimon.table.partition-keys = "dt"
+    paimon.table.primary-keys = "pk_id,dt"
+  }
+}
+```
+
 ### 多表
 
 ```hocon
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
index bac7f7b7a8..e043c0ecd7 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
@@ -133,6 +133,11 @@ public class ReadonlyConfigParser implements 
TableSchemaParser<ReadonlyConfig> {
                                 Integer columnLength =
                                         columnConfig.get(
                                                 
TableSchemaOptions.ColumnOptions.COLUMN_LENGTH);
+
+                                Integer columnScale =
+                                        columnConfig.get(
+                                                
TableSchemaOptions.ColumnOptions.COLUMN_SCALE);
+
                                 Boolean nullable =
                                         
columnConfig.get(TableSchemaOptions.ColumnOptions.NULLABLE);
                                 Object defaultValue =
@@ -143,7 +148,8 @@ public class ReadonlyConfigParser implements 
TableSchemaParser<ReadonlyConfig> {
                                 return PhysicalColumn.of(
                                         name,
                                         seaTunnelDataType,
-                                        columnLength,
+                                        Long.valueOf(columnLength),
+                                        columnScale,
                                         nullable,
                                         defaultValue,
                                         comment);
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
index 492fe1909c..9ede187ea9 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
@@ -86,6 +86,12 @@ public class TableSchemaOptions {
                         .noDefaultValue()
                         .withDescription("SeaTunnel Schema Column Type");
 
+        public static final Option<Integer> COLUMN_SCALE =
+                Options.key("columnScale")
+                        .intType()
+                        .noDefaultValue()
+                        .withDescription("SeaTunnel Schema Column scale");
+
         public static final Option<Integer> COLUMN_LENGTH =
                 Options.key("columnLength")
                         .intType()
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index 7312ed28b0..8d3395af3c 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -17,10 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
 
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -28,11 +27,11 @@ import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistExce
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
 
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
@@ -48,14 +47,14 @@ public class PaimonCatalog implements Catalog, PaimonTable {
     private static final String DEFAULT_DATABASE = "default";
 
     private String catalogName;
-    private ReadonlyConfig readonlyConfig;
+    private PaimonSinkConfig paimonSinkConfig;
     private PaimonCatalogLoader paimonCatalogLoader;
     private org.apache.paimon.catalog.Catalog catalog;
 
-    public PaimonCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
-        this.readonlyConfig = readonlyConfig;
+    public PaimonCatalog(String catalogName, PaimonSinkConfig 
paimonSinkConfig) {
+        this.paimonSinkConfig = paimonSinkConfig;
         this.catalogName = catalogName;
-        this.paimonCatalogLoader = new PaimonCatalogLoader(new 
PaimonSinkConfig(readonlyConfig));
+        this.paimonCatalogLoader = new PaimonCatalogLoader(paimonSinkConfig);
     }
 
     @Override
@@ -135,10 +134,9 @@ public class PaimonCatalog implements Catalog, PaimonTable 
{
     public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
         try {
-            catalog.createTable(
-                    toIdentifier(tablePath),
-                    SchemaUtil.toPaimonSchema(table.getTableSchema()),
-                    ignoreIfExists);
+            Schema paimonSchema =
+                    SchemaUtil.toPaimonSchema(table.getTableSchema(), 
this.paimonSinkConfig);
+            catalog.createTable(toIdentifier(tablePath), paimonSchema, 
ignoreIfExists);
         } catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException 
e) {
             throw new TableAlreadyExistException(this.catalogName, tablePath);
         } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException 
e) {
@@ -183,18 +181,8 @@ public class PaimonCatalog implements Catalog, PaimonTable 
{
         TableSchema.Builder builder = TableSchema.builder();
         dataFields.forEach(
                 dataField -> {
-                    String name = dataField.name();
-                    SeaTunnelDataType<?> seaTunnelType =
-                            SchemaUtil.toSeaTunnelType(dataField.type());
-                    PhysicalColumn physicalColumn =
-                            PhysicalColumn.of(
-                                    name,
-                                    seaTunnelType,
-                                    (Long) null,
-                                    true,
-                                    null,
-                                    dataField.description());
-                    builder.column(physicalColumn);
+                    Column column = 
SchemaUtil.toSeaTunnelType(dataField.type());
+                    builder.column(column);
                 });
 
         List<String> partitionKeys = schema.partitionKeys();
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
index 4d94f385d9..b8c8eb1088 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
@@ -30,7 +30,7 @@ import com.google.auto.service.AutoService;
 public class PaimonCatalogFactory implements CatalogFactory {
     @Override
     public Catalog createCatalog(String catalogName, ReadonlyConfig 
readonlyConfig) {
-        return new PaimonCatalog(catalogName, readonlyConfig);
+        return new PaimonCatalog(catalogName, new 
PaimonSinkConfig(readonlyConfig));
     }
 
     @Override
@@ -48,7 +48,10 @@ public class PaimonCatalogFactory implements CatalogFactory {
                 .optional(
                         PaimonSinkConfig.HDFS_SITE_PATH,
                         PaimonSinkConfig.SCHEMA_SAVE_MODE,
-                        PaimonSinkConfig.DATA_SAVE_MODE)
+                        PaimonSinkConfig.DATA_SAVE_MODE,
+                        PaimonSinkConfig.PRIMARY_KEYS,
+                        PaimonSinkConfig.PARTITION_KEYS,
+                        PaimonSinkConfig.WRITE_PROPS)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index 589fd94816..d369c74bca 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -17,17 +17,26 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.config;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
+
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
-import lombok.Getter;
+import lombok.Data;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static java.util.stream.Collectors.toList;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
 
-@Getter
+@Data
 public class PaimonSinkConfig extends PaimonConfig {
     public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
             Options.key("schema_save_mode")
@@ -41,6 +50,27 @@ public class PaimonSinkConfig extends PaimonConfig {
                     .defaultValue(DataSaveMode.APPEND_DATA)
                     .withDescription("data_save_mode");
 
+    public static final Option<String> PRIMARY_KEYS =
+            Options.key("paimon.table.primary-keys")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Default comma-separated list of columns that 
identify a row in tables (primary key)");
+
+    public static final Option<String> PARTITION_KEYS =
+            Options.key("paimon.table.partition-keys")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Default comma-separated list of partition fields 
to use when creating tables.");
+
+    public static final Option<Map<String, String>> WRITE_PROPS =
+            Options.key("paimon.table.write-props")
+                    .mapType()
+                    .defaultValue(new HashMap<>())
+                    .withDescription(
+                            "Properties passed through to paimon table 
initialization, such as 'file.format', 
'bucket'(org.apache.paimon.CoreOptions)");
+
     private String catalogName;
     private String warehouse;
     private String namespace;
@@ -48,6 +78,10 @@ public class PaimonSinkConfig extends PaimonConfig {
     private String hdfsSitePath;
     private SchemaSaveMode schemaSaveMode;
     private DataSaveMode dataSaveMode;
+    private Integer bucket;
+    private List<String> primaryKeys;
+    private List<String> partitionKeys;
+    private Map<String, String> writeProps;
 
     public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
         this.catalogName = 
checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME));
@@ -57,10 +91,21 @@ public class PaimonSinkConfig extends PaimonConfig {
         this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH);
         this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
         this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
+        this.primaryKeys = stringToList(readonlyConfig.get(PRIMARY_KEYS), ",");
+        this.partitionKeys = stringToList(readonlyConfig.get(PARTITION_KEYS), 
",");
+        this.writeProps = readonlyConfig.get(WRITE_PROPS);
     }
 
     protected <T> T checkArgumentNotNull(T argument) {
         checkNotNull(argument);
         return argument;
     }
+
+    @VisibleForTesting
+    public static List<String> stringToList(String value, String regex) {
+        if (value == null || value.isEmpty()) {
+            return ImmutableList.of();
+        }
+        return 
Arrays.stream(value.split(regex)).map(String::trim).collect(toList());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
index 1f8b1cff32..cbf512f61d 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.paimon.data;
 
 import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.converter.TypeConverter;
 import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
@@ -40,11 +39,11 @@ public class PaimonTypeMapper implements 
TypeConverter<DataType> {
 
     @Override
     public Column convert(DataType dataType) {
-        return 
PhysicalColumn.builder().dataType(RowTypeConverter.convert(dataType)).build();
+        return RowTypeConverter.convert(dataType);
     }
 
     @Override
     public DataType reconvert(Column column) {
-        return RowTypeConverter.reconvert(column.getDataType());
+        return RowTypeConverter.reconvert(column);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
index c0b4d997ea..2f5b316dd5 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
@@ -53,7 +53,10 @@ public class PaimonSinkFactory implements TableSinkFactory {
                 .optional(
                         PaimonConfig.HDFS_SITE_PATH,
                         PaimonSinkConfig.SCHEMA_SAVE_MODE,
-                        PaimonSinkConfig.DATA_SAVE_MODE)
+                        PaimonSinkConfig.DATA_SAVE_MODE,
+                        PaimonSinkConfig.PRIMARY_KEYS,
+                        PaimonSinkConfig.PARTITION_KEYS,
+                        PaimonSinkConfig.WRITE_PROPS)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 7b2e8327a9..a858c3ee7f 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -30,6 +30,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
@@ -74,6 +76,8 @@ public class PaimonSinkWriter
 
     private final JobContext jobContext;
 
+    private TableSchema tableSchema;
+
     public PaimonSinkWriter(
             Context context,
             Table table,
@@ -88,6 +92,7 @@ public class PaimonSinkWriter
         this.seaTunnelRowType = seaTunnelRowType;
         this.context = context;
         this.jobContext = jobContext;
+        this.tableSchema = ((FileStoreTable) table).schema();
     }
 
     public PaimonSinkWriter(
@@ -96,15 +101,7 @@ public class PaimonSinkWriter
             SeaTunnelRowType seaTunnelRowType,
             List<PaimonSinkState> states,
             JobContext jobContext) {
-        this.table = table;
-        this.tableWriteBuilder =
-                JobContextUtil.isBatchJob(jobContext)
-                        ? this.table.newBatchWriteBuilder().withOverwrite()
-                        : this.table.newStreamWriteBuilder();
-        this.tableWrite = tableWriteBuilder.newWrite();
-        this.seaTunnelRowType = seaTunnelRowType;
-        this.context = context;
-        this.jobContext = jobContext;
+        this(context, table, seaTunnelRowType, jobContext);
         if (Objects.isNull(states) || states.isEmpty()) {
             return;
         }
@@ -132,7 +129,7 @@ public class PaimonSinkWriter
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
-        InternalRow rowData = RowConverter.convert(element, seaTunnelRowType);
+        InternalRow rowData = RowConverter.reconvert(element, 
seaTunnelRowType, tableSchema);
         try {
             tableWrite.write(rowData);
         } catch (Exception e) {
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 6b9a6bf01c..fe1c24da80 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -41,15 +41,19 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.serializer.InternalArraySerializer;
 import org.apache.paimon.data.serializer.InternalMapSerializer;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
 
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** The converter for converting {@link InternalRow} and {@link SeaTunnelRow} 
*/
@@ -129,7 +133,7 @@ public class RowConverter {
      * @param dataType SeaTunnel array data type
      * @return Paimon array object {@link BinaryArray}
      */
-    public static BinaryArray convert(Object array, SeaTunnelDataType<?> 
dataType) {
+    public static BinaryArray reconvert(Object array, SeaTunnelDataType<?> 
dataType) {
         int length = ((Object[]) array).length;
         BinaryArray binaryArray = new BinaryArray();
         BinaryArrayWriter binaryArrayWriter;
@@ -327,10 +331,12 @@ public class RowConverter {
      *
      * @param seaTunnelRow SeaTunnel row object
      * @param seaTunnelRowType SeaTunnel row type
+     * @param tableSchema Paimon table schema
      * @return Paimon row object
      */
-    public static InternalRow convert(
-            SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType) {
+    public static InternalRow reconvert(
+            SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, 
TableSchema tableSchema) {
+        List<DataField> fields = tableSchema.fields();
         BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields());
         BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
         // Convert SeaTunnel RowKind to Paimon RowKind
@@ -390,8 +396,12 @@ public class RowConverter {
                             i, Timestamp.fromLocalDateTime(date.atTime(time)), 
3);
                     break;
                 case TIMESTAMP:
+                    String fieldName = seaTunnelRowType.getFieldName(i);
+                    DataField dataField = SchemaUtil.getDataField(fields, 
fieldName);
+                    int precision = ((TimestampType) 
dataField.type()).getPrecision();
                     LocalDateTime datetime = (LocalDateTime) 
seaTunnelRow.getField(i);
-                    binaryWriter.writeTimestamp(i, 
Timestamp.fromLocalDateTime(datetime), 9);
+                    binaryWriter.writeTimestamp(
+                            i, Timestamp.fromLocalDateTime(datetime), 
precision);
                     break;
                 case MAP:
                     MapType<?, ?> mapType = (MapType<?, ?>) 
seaTunnelRowType.getFieldType(i);
@@ -404,13 +414,14 @@ public class RowConverter {
                     Object[] values = field.values().toArray(new Object[0]);
                     binaryWriter.writeMap(
                             i,
-                            BinaryMap.valueOf(convert(keys, keyType), 
convert(values, valueType)),
+                            BinaryMap.valueOf(
+                                    reconvert(keys, keyType), 
reconvert(values, valueType)),
                             new InternalMapSerializer(paimonKeyType, 
paimonValueType));
                     break;
                 case ARRAY:
                     ArrayType<?, ?> arrayType = (ArrayType<?, ?>) 
seaTunnelRowType.getFieldType(i);
                     BinaryArray paimonArray =
-                            convert(seaTunnelRow.getField(i), 
arrayType.getElementType());
+                            reconvert(seaTunnelRow.getField(i), 
arrayType.getElementType());
                     binaryWriter.writeArray(
                             i,
                             paimonArray,
@@ -420,8 +431,10 @@ public class RowConverter {
                 case ROW:
                     SeaTunnelDataType<?> rowType = 
seaTunnelRowType.getFieldType(i);
                     Object row = seaTunnelRow.getField(i);
-                    InternalRow paimonRow = convert((SeaTunnelRow) row, 
(SeaTunnelRowType) rowType);
-                    RowType paimonRowType = 
RowTypeConverter.reconvert((SeaTunnelRowType) rowType);
+                    InternalRow paimonRow =
+                            reconvert((SeaTunnelRow) row, (SeaTunnelRowType) 
rowType, tableSchema);
+                    RowType paimonRowType =
+                            RowTypeConverter.reconvert((SeaTunnelRowType) 
rowType, tableSchema);
                     binaryWriter.writeRow(i, paimonRow, new 
InternalRowSerializer(paimonRowType));
                     break;
                 default:
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
index 16863ebff5..5a4f5dbe8a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
 
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
@@ -25,6 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BinaryType;
@@ -33,6 +36,7 @@ import org.apache.paimon.types.CharType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeDefaultVisitor;
+import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.DateType;
 import org.apache.paimon.types.DecimalType;
@@ -50,6 +54,8 @@ import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.types.VarCharType;
 
 import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
 
 /** The converter for converting {@link RowType} and {@link SeaTunnelRowType} 
*/
 public class RowTypeConverter {
@@ -77,16 +83,21 @@ public class RowTypeConverter {
      * @param dataType Paimon data type
      * @return SeaTunnel data type {@link SeaTunnelDataType}
      */
-    public static SeaTunnelDataType convert(DataType dataType) {
+    public static Column convert(DataType dataType) {
+        PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder = 
PhysicalColumn.builder();
         SeaTunnelDataType<?> seaTunnelDataType;
         PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor =
                 PaimonToSeaTunnelTypeVisitor.INSTANCE;
         switch (dataType.getTypeRoot()) {
             case CHAR:
-                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((CharType) dataType);
+                CharType charType = (CharType) dataType;
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit(charType);
+                physicalColumnBuilder.columnLength((long) 
charType.getLength());
                 break;
             case VARCHAR:
-                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((VarCharType) dataType);
+                VarCharType varCharType = (VarCharType) dataType;
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit(varCharType);
+                physicalColumnBuilder.columnLength((long) 
varCharType.getLength());
                 break;
             case BOOLEAN:
                 seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((BooleanType) dataType);
@@ -95,10 +106,15 @@ public class RowTypeConverter {
                 seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((BinaryType) dataType);
                 break;
             case VARBINARY:
-                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((VarBinaryType) dataType);
+                VarBinaryType varBinaryType = (VarBinaryType) dataType;
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit(varBinaryType);
+                physicalColumnBuilder.columnLength((long) 
varBinaryType.getLength());
                 break;
             case DECIMAL:
-                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((DecimalType) dataType);
+                DecimalType decimalType = (DecimalType) dataType;
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit(decimalType);
+                physicalColumnBuilder.columnLength((long) 
decimalType.getPrecision());
+                physicalColumnBuilder.scale(decimalType.getScale());
                 break;
             case TINYINT:
                 seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((TinyIntType) dataType);
@@ -122,14 +138,21 @@ public class RowTypeConverter {
                 seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((DateType) dataType);
                 break;
             case TIME_WITHOUT_TIME_ZONE:
-                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((TimeType) dataType);
+                TimeType timeType = (TimeType) dataType;
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit(timeType);
+                physicalColumnBuilder.scale(timeType.getPrecision());
                 break;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((TimestampType) dataType);
+                TimestampType timestampType = (TimestampType) dataType;
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit(timestampType);
+                physicalColumnBuilder.scale(timestampType.getPrecision());
                 break;
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                LocalZonedTimestampType localZonedTimestampType =
+                        (LocalZonedTimestampType) dataType;
                 seaTunnelDataType =
                         
paimonToSeaTunnelTypeVisitor.visit((LocalZonedTimestampType) dataType);
+                
physicalColumnBuilder.scale(localZonedTimestampType.getPrecision());
                 break;
             case ARRAY:
                 seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((ArrayType) dataType);
@@ -148,7 +171,7 @@ public class RowTypeConverter {
                 throw new PaimonConnectorException(
                         CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, 
errorMsg);
         }
-        return seaTunnelDataType;
+        return physicalColumnBuilder.dataType(seaTunnelDataType).build();
     }
 
     /**
@@ -157,20 +180,39 @@ public class RowTypeConverter {
      * @param seaTunnelRowType SeaTunnel row type {@link SeaTunnelRowType}
      * @return Paimon row type {@link RowType}
      */
-    public static RowType reconvert(SeaTunnelRowType seaTunnelRowType) {
+    public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, 
TableSchema tableSchema) {
         SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        List<DataField> fields = tableSchema.fields();
         DataType[] dataTypes =
                 Arrays.stream(fieldTypes)
                         .map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit)
                         .toArray(DataType[]::new);
         DataField[] dataFields = new DataField[dataTypes.length];
         for (int i = 0; i < dataTypes.length; i++) {
-            DataField dataField = new DataField(i, 
seaTunnelRowType.getFieldName(i), dataTypes[i]);
+            DataType dataType = dataTypes[i];
+            DataTypeRoot typeRoot = dataType.getTypeRoot();
+            String fieldName = seaTunnelRowType.getFieldName(i);
+            if (typeRoot.equals(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+                    || 
typeRoot.equals(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
+                DataField dataField = SchemaUtil.getDataField(fields, 
fieldName);
+                dataType = new TimestampType(((TimestampType) 
dataField.type()).getPrecision());
+            }
+            DataField dataField = new DataField(i, fieldName, dataType);
             dataFields[i] = dataField;
         }
         return DataTypes.ROW(dataFields);
     }
 
+    /**
+     * Mapping SeaTunnel data type of column {@link Column} to Paimon data 
type {@link DataType}
+     *
+     * @param column SeaTunnel data type {@link Column}
+     * @return Paimon data type {@link DataType}
+     */
+    public static DataType reconvert(Column column) {
+        return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(column);
+    }
+
     /**
      * Mapping SeaTunnel data type {@link SeaTunnelDataType} to Paimon data 
type {@link DataType}
      *
@@ -192,6 +234,21 @@ public class RowTypeConverter {
 
         private SeaTunnelTypeToPaimonVisitor() {}
 
+        public DataType visit(Column column) {
+            SeaTunnelDataType<?> dataType = column.getDataType();
+            Integer scale = column.getScale();
+            switch (dataType.getSqlType()) {
+                case TIMESTAMP:
+                    return DataTypes.TIMESTAMP(
+                            Objects.isNull(scale) ? 
TimestampType.DEFAULT_PRECISION : scale);
+                case TIME:
+                    return DataTypes.TIME(
+                            Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION 
: scale);
+                default:
+                    return visit(dataType);
+            }
+        }
+
         public DataType visit(SeaTunnelDataType<?> dataType) {
             switch (dataType.getSqlType()) {
                 case TINYINT:
@@ -220,8 +277,10 @@ public class RowTypeConverter {
                     return DataTypes.BOOLEAN();
                 case DATE:
                     return DataTypes.DATE();
+                case TIME:
+                    return DataTypes.TIME(TimeType.MAX_PRECISION);
                 case TIMESTAMP:
-                    return DataTypes.TIMESTAMP(6);
+                    return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
                 case MAP:
                     SeaTunnelDataType<?> keyType =
                             ((org.apache.seatunnel.api.table.type.MapType<?, 
?>) dataType)
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index c03a77149c..65129dc8b7 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -18,14 +18,16 @@
 package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
 
 import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
 
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /** The util seatunnel schema to paimon schema */
@@ -35,20 +37,39 @@ public class SchemaUtil {
         return PaimonTypeMapper.INSTANCE.reconvert(column);
     }
 
-    public static Schema toPaimonSchema(TableSchema tableSchema) {
+    public static Schema toPaimonSchema(
+            TableSchema tableSchema, PaimonSinkConfig paimonSinkConfig) {
         Schema.Builder paiSchemaBuilder = Schema.newBuilder();
         for (int i = 0; i < tableSchema.getColumns().size(); i++) {
             Column column = tableSchema.getColumns().get(i);
             paiSchemaBuilder.column(column.getName(), toPaimonType(column));
         }
-        PrimaryKey primaryKey = tableSchema.getPrimaryKey();
-        if (Objects.nonNull(primaryKey) && primaryKey.getColumnNames().size() 
> 0) {
-            paiSchemaBuilder.primaryKey(primaryKey.getColumnNames());
+        List<String> primaryKeys = paimonSinkConfig.getPrimaryKeys();
+        if (primaryKeys.isEmpty() && 
Objects.nonNull(tableSchema.getPrimaryKey())) {
+            primaryKeys = tableSchema.getPrimaryKey().getColumnNames();
+        }
+        if (!primaryKeys.isEmpty()) {
+            paiSchemaBuilder.primaryKey(primaryKeys);
+        }
+        List<String> partitionKeys = paimonSinkConfig.getPartitionKeys();
+        if (!partitionKeys.isEmpty()) {
+            paiSchemaBuilder.partitionKeys(partitionKeys);
+        }
+        Map<String, String> writeProps = paimonSinkConfig.getWriteProps();
+        if (!writeProps.isEmpty()) {
+            paiSchemaBuilder.options(writeProps);
         }
         return paiSchemaBuilder.build();
     }
 
-    public static SeaTunnelDataType<?> toSeaTunnelType(DataType dataType) {
-        return PaimonTypeMapper.INSTANCE.convert(dataType).getDataType();
+    public static Column toSeaTunnelType(DataType dataType) {
+        return PaimonTypeMapper.INSTANCE.convert(dataType);
+    }
+
+    public static DataField getDataField(List<DataField> fields, String 
fieldName) {
+        return fields.parallelStream()
+                .filter(field -> field.name().equals(fieldName))
+                .findFirst()
+                .get();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
index fcb9090a57..eec61aea6d 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
@@ -38,7 +38,10 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.serializer.InternalArraySerializer;
 import org.apache.paimon.data.serializer.InternalMapSerializer;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -48,7 +51,10 @@ import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Unit tests for {@link RowConverter} */
@@ -60,6 +66,45 @@ public class RowConverterTest {
 
     private SeaTunnelRowType seaTunnelRowType;
 
+    private TableSchema tableSchema;
+
+    public static final RowType DEFAULT_ROW_TYPE =
+            RowType.of(
+                    new DataType[] {
+                        DataTypes.TINYINT(),
+                        DataTypes.SMALLINT(),
+                        DataTypes.INT(),
+                        DataTypes.BIGINT(),
+                        DataTypes.FLOAT(),
+                        DataTypes.DOUBLE(),
+                        DataTypes.DECIMAL(10, 10),
+                        DataTypes.STRING(),
+                        DataTypes.BYTES(),
+                        DataTypes.BOOLEAN(),
+                        DataTypes.DATE(),
+                        DataTypes.TIMESTAMP(),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
+                        DataTypes.ARRAY(DataTypes.STRING())
+                    },
+                    new String[] {
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_string",
+                        "c_bytes",
+                        "c_boolean",
+                        "c_date",
+                        "c_timestamp",
+                        "c_map",
+                        "c_array"
+                    });
+
+    public static final List<String> KEY_NAME_LIST = 
Arrays.asList("c_tinyint");
+
     @BeforeEach
     public void before() {
         seaTunnelRowType =
@@ -171,11 +216,21 @@ public class RowConverterTest {
         binaryRowWriter.writeArray(
                 13, binaryArray2, new 
InternalArraySerializer(DataTypes.STRING()));
         internalRow = binaryRow;
+
+        tableSchema =
+                new TableSchema(
+                        0,
+                        TableSchema.newFields(DEFAULT_ROW_TYPE),
+                        DEFAULT_ROW_TYPE.getFieldCount(),
+                        Collections.EMPTY_LIST,
+                        KEY_NAME_LIST,
+                        Collections.EMPTY_MAP,
+                        "");
     }
 
     @Test
     public void seaTunnelToPaimon() {
-        InternalRow convert = RowConverter.convert(seaTunnelRow, 
seaTunnelRowType);
+        InternalRow convert = RowConverter.reconvert(seaTunnelRow, 
seaTunnelRowType, tableSchema);
         Assertions.assertEquals(convert, internalRow);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
index f828be0650..5e614aeda5 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
@@ -26,7 +26,9 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
@@ -34,12 +36,55 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 public class RowTypeConverterTest {
 
     private SeaTunnelRowType seaTunnelRowType;
 
     private RowType rowType;
 
+    private TableSchema tableSchema;
+
+    public static final RowType DEFAULT_ROW_TYPE =
+            RowType.of(
+                    new DataType[] {
+                        DataTypes.TINYINT(),
+                        DataTypes.SMALLINT(),
+                        DataTypes.INT(),
+                        DataTypes.BIGINT(),
+                        DataTypes.FLOAT(),
+                        DataTypes.DOUBLE(),
+                        DataTypes.DECIMAL(10, 10),
+                        DataTypes.STRING(),
+                        DataTypes.BYTES(),
+                        DataTypes.BOOLEAN(),
+                        DataTypes.DATE(),
+                        DataTypes.TIMESTAMP(),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
+                        DataTypes.ARRAY(DataTypes.STRING())
+                    },
+                    new String[] {
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_string",
+                        "c_bytes",
+                        "c_boolean",
+                        "c_date",
+                        "c_timestamp",
+                        "c_map",
+                        "c_array"
+                    });
+
+    public static final List<String> KEY_NAME_LIST = 
Arrays.asList("c_tinyint");
+
     @BeforeEach
     public void before() {
         seaTunnelRowType =
@@ -93,6 +138,16 @@ public class RowTypeConverterTest {
                         new DataField(
                                 12, "c_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING())),
                         new DataField(13, "c_array", 
DataTypes.ARRAY(DataTypes.STRING())));
+
+        tableSchema =
+                new TableSchema(
+                        0,
+                        TableSchema.newFields(DEFAULT_ROW_TYPE),
+                        DEFAULT_ROW_TYPE.getFieldCount(),
+                        Collections.EMPTY_LIST,
+                        KEY_NAME_LIST,
+                        Collections.EMPTY_MAP,
+                        "");
     }
 
     @Test
@@ -103,7 +158,7 @@ public class RowTypeConverterTest {
 
     @Test
     public void seaTunnelToPaimon() {
-        RowType convert = RowTypeConverter.reconvert(seaTunnelRowType);
+        RowType convert = RowTypeConverter.reconvert(seaTunnelRowType, 
tableSchema);
         Assertions.assertEquals(convert, rowType);
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index a960f7d4d3..d2d88c1dbc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -25,17 +25,23 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.TimestampType;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -52,6 +58,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.awaitility.Awaitility.given;
 
@@ -67,7 +74,6 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements 
TestResource {
     private static final String NAMESPACE_TAR = "paimon.tar.gz";
     private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE + 
"/";
     private static final String TARGET_TABLE = "st_test";
-    private static final String TARGET_DATABASE = "seatunnel_namespace";
     private static final String FAKE_TABLE1 = "FakeTable1";
     private static final String FAKE_DATABASE1 = "FakeDatabase1";
     private static final String FAKE_TABLE2 = "FakeTable1";
@@ -95,7 +101,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
                             // copy paimon to local
                             
container.executeExtraCommands(containerExtendedFactory);
                             List<PaimonRecord> paimonRecords =
-                                    loadPaimonData(TARGET_DATABASE, 
TARGET_TABLE);
+                                    loadPaimonData("seatunnel_namespace1", 
TARGET_TABLE);
                             Assertions.assertEquals(2, paimonRecords.size());
                             paimonRecords.forEach(
                                     paimonRecord -> {
@@ -107,8 +113,6 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
                                         }
                                     });
                         });
-
-        cleanPaimonTable(container);
     }
 
     @TestTemplate
@@ -152,18 +156,221 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
                                         }
                                     });
                         });
+    }
+
+    @TestTemplate
+    public void testFakeCDCSinkPaimonWithMultipleBucket(TestContainer 
container) throws Exception {
+        Container.ExecResult execResult = 
container.executeJob("/fake_cdc_sink_paimon_case3.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(30L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy paimon to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Table table = getTable("seatunnel_namespace3", 
TARGET_TABLE);
+                            String bucket = 
table.options().get(CoreOptions.BUCKET.key());
+                            
Assertions.assertTrue(StringUtils.isNoneBlank(bucket));
+                            Assertions.assertEquals(2, 
Integer.valueOf(bucket));
+                            List<PaimonRecord> paimonRecords =
+                                    loadPaimonData("seatunnel_namespace3", 
TARGET_TABLE);
+                            Assertions.assertEquals(2, paimonRecords.size());
+                            paimonRecords.forEach(
+                                    paimonRecord -> {
+                                        if (paimonRecord.getPkId() == 1) {
+                                            Assertions.assertEquals("A_1", 
paimonRecord.getName());
+                                        }
+                                        if (paimonRecord.getPkId() == 3) {
+                                            Assertions.assertEquals("C", 
paimonRecord.getName());
+                                        }
+                                    });
+                        });
+    }
+
+    @TestTemplate
+    public void testFakeCDCSinkPaimonWithPartition(TestContainer container) 
throws Exception {
+        Container.ExecResult execResult = 
container.executeJob("/fake_cdc_sink_paimon_case4.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(30L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy paimon to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Table table = getTable("seatunnel_namespace4", 
TARGET_TABLE);
+                            List<String> partitionKeys = table.partitionKeys();
+                            List<String> primaryKeys = table.primaryKeys();
+                            
Assertions.assertTrue(partitionKeys.contains("dt"));
+                            Assertions.assertEquals(2, primaryKeys.size());
+                            
Assertions.assertTrue(primaryKeys.contains("pk_id"));
+                            Assertions.assertTrue(primaryKeys.contains("dt"));
+                            ReadBuilder readBuilder = table.newReadBuilder();
+                            TableScan.Plan plan = readBuilder.newScan().plan();
+                            TableRead tableRead = readBuilder.newRead();
+                            List<PaimonRecord> result = new ArrayList<>();
+                            try (RecordReader<InternalRow> reader = 
tableRead.createReader(plan)) {
+                                reader.forEachRemaining(
+                                        row -> {
+                                            result.add(
+                                                    new PaimonRecord(
+                                                            row.getLong(0),
+                                                            
row.getString(1).toString(),
+                                                            
row.getString(2).toString()));
+                                            log.info(
+                                                    "key_id:"
+                                                            + row.getLong(0)
+                                                            + ", name:"
+                                                            + row.getString(1)
+                                                            + ", dt:"
+                                                            + 
row.getString(2));
+                                        });
+                            }
+                            Assertions.assertEquals(2, result.size());
+                            List<PaimonRecord> filterRecords =
+                                    result.stream()
+                                            .filter(record -> record.pkId == 1)
+                                            .collect(Collectors.toList());
+                            Assertions.assertEquals(1, filterRecords.size());
+                            PaimonRecord paimonRecord = filterRecords.get(0);
+                            Assertions.assertEquals("A_1", 
paimonRecord.getName());
+                            Assertions.assertEquals("2024-03-20", 
paimonRecord.getDt());
+                        });
+    }
+
+    @TestTemplate
+    public void testFakeCDCSinkPaimonWithParquet(TestContainer container) 
throws Exception {
+        Container.ExecResult execResult = 
container.executeJob("/fake_cdc_sink_paimon_case5.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(30L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy paimon to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Table table = getTable("seatunnel_namespace5", 
TARGET_TABLE);
+                            String fileFormat = 
table.options().get(CoreOptions.FILE_FORMAT.key());
+                            
Assertions.assertTrue(StringUtils.isNoneBlank(fileFormat));
+                            Assertions.assertEquals("parquet", fileFormat);
+                            List<PaimonRecord> paimonRecords =
+                                    loadPaimonData("seatunnel_namespace5", 
TARGET_TABLE);
+                            Assertions.assertEquals(2, paimonRecords.size());
+                            paimonRecords.forEach(
+                                    paimonRecord -> {
+                                        if (paimonRecord.getPkId() == 1) {
+                                            Assertions.assertEquals("A_1", 
paimonRecord.getName());
+                                        }
+                                        if (paimonRecord.getPkId() == 3) {
+                                            Assertions.assertEquals("C", 
paimonRecord.getName());
+                                        }
+                                    });
+                        });
+    }
 
-        cleanPaimonTable(container);
+    @TestTemplate
+    public void testFakeCDCSinkPaimonWithAvro(TestContainer container) throws 
Exception {
+        Container.ExecResult execResult = 
container.executeJob("/fake_cdc_sink_paimon_case6.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(30L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy paimon to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Table table = getTable("seatunnel_namespace6", 
TARGET_TABLE);
+                            String fileFormat = 
table.options().get(CoreOptions.FILE_FORMAT.key());
+                            
Assertions.assertTrue(StringUtils.isNoneBlank(fileFormat));
+                            Assertions.assertEquals("avro", fileFormat);
+                            List<PaimonRecord> paimonRecords =
+                                    loadPaimonData("seatunnel_namespace6", 
TARGET_TABLE);
+                            Assertions.assertEquals(2, paimonRecords.size());
+                            paimonRecords.forEach(
+                                    paimonRecord -> {
+                                        if (paimonRecord.getPkId() == 1) {
+                                            Assertions.assertEquals("A_1", 
paimonRecord.getName());
+                                        }
+                                        if (paimonRecord.getPkId() == 3) {
+                                            Assertions.assertEquals("C", 
paimonRecord.getName());
+                                        }
+                                    });
+                        });
     }
 
-    protected final ContainerExtendedFactory cleanContainerExtendedFactory =
-            genericContainer ->
-                    genericContainer.execInContainer("sh", "-c", "rm -rf  " + 
CATALOG_DIR + "**");
+    @TestTemplate
+    public void testFakeCDCSinkPaimonWithTimestampN(TestContainer container) 
throws Exception {
+        Container.ExecResult execResult = 
container.executeJob("/fake_cdc_sink_paimon_case7.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
 
-    private void cleanPaimonTable(TestContainer container)
-            throws IOException, InterruptedException {
-        // clean table
-        container.executeExtraCommands(cleanContainerExtendedFactory);
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(30L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy paimon to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            FileStoreTable table =
+                                    (FileStoreTable) 
getTable("seatunnel_namespace7", TARGET_TABLE);
+                            List<DataField> fields = table.schema().fields();
+                            for (DataField field : fields) {
+                                if (field.name().equalsIgnoreCase("one_time")) 
{
+                                    Assertions.assertEquals(
+                                            0, ((TimestampType) 
field.type()).getPrecision());
+                                }
+                                if (field.name().equalsIgnoreCase("two_time")) 
{
+                                    Assertions.assertEquals(
+                                            3, ((TimestampType) 
field.type()).getPrecision());
+                                }
+                                if 
(field.name().equalsIgnoreCase("three_time")) {
+                                    Assertions.assertEquals(
+                                            6, ((TimestampType) 
field.type()).getPrecision());
+                                }
+                                if 
(field.name().equalsIgnoreCase("four_time")) {
+                                    Assertions.assertEquals(
+                                            9, ((TimestampType) 
field.type()).getPrecision());
+                                }
+                            }
+                            ReadBuilder readBuilder = table.newReadBuilder();
+                            TableScan.Plan plan = readBuilder.newScan().plan();
+                            TableRead tableRead = readBuilder.newRead();
+                            List<PaimonRecord> result = new ArrayList<>();
+                            try (RecordReader<InternalRow> reader = 
tableRead.createReader(plan)) {
+                                reader.forEachRemaining(
+                                        row ->
+                                                result.add(
+                                                        new PaimonRecord(
+                                                                row.getLong(0),
+                                                                
row.getString(1).toString(),
+                                                                
row.getTimestamp(2, 0),
+                                                                
row.getTimestamp(3, 3),
+                                                                
row.getTimestamp(4, 6),
+                                                                
row.getTimestamp(5, 9))));
+                            }
+                            Assertions.assertEquals(2, result.size());
+                            for (PaimonRecord paimonRecord : result) {
+                                Assertions.assertEquals(
+                                        paimonRecord.oneTime.toString(), 
"2024-03-10T10:00:12");
+                                Assertions.assertEquals(
+                                        paimonRecord.twoTime.toString(), 
"2024-03-10T10:00:00.123");
+                                Assertions.assertEquals(
+                                        paimonRecord.threeTime.toString(),
+                                        "2024-03-10T10:00:00.123456");
+                                Assertions.assertEquals(
+                                        paimonRecord.fourTime.toString(),
+                                        "2024-03-10T10:00:00.123456789");
+                            }
+                        });
     }
 
     protected final ContainerExtendedFactory containerExtendedFactory =
@@ -256,5 +463,34 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
     public class PaimonRecord {
         private Long pkId;
         private String name;
+        private String dt;
+        private Timestamp oneTime;
+        private Timestamp twoTime;
+        private Timestamp threeTime;
+        private Timestamp fourTime;
+
+        public PaimonRecord(Long pkId, String name) {
+            this.pkId = pkId;
+            this.name = name;
+        }
+
+        public PaimonRecord(Long pkId, String name, String dt) {
+            this(pkId, name);
+            this.dt = dt;
+        }
+
+        public PaimonRecord(
+                Long pkId,
+                String name,
+                Timestamp oneTime,
+                Timestamp twoTime,
+                Timestamp threeTime,
+                Timestamp fourTime) {
+            this(pkId, name);
+            this.oneTime = oneTime;
+            this.twoTime = twoTime;
+            this.threeTime = threeTime;
+            this.fourTime = fourTime;
+        }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
index 59e3a0cf72..50ce13aa68 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
@@ -80,7 +80,7 @@ source {
 sink {
   Paimon {
     warehouse = "file:///tmp/paimon"
-    database = "seatunnel_namespace"
+    database = "seatunnel_namespace1"
     table = "st_test"
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
similarity index 92%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
index 59e3a0cf72..f5db1c8253 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
@@ -60,14 +60,14 @@ source {
       {
         kind = INSERT
         fields = [3, "C", 100]
-      }
+      },
       {
         kind = UPDATE_BEFORE
         fields = [1, "A", 100]
       },
       {
         kind = UPDATE_AFTER
-        fields = [1, "A_1", 100]
+        fields = [1, "A_1", 19]
       },
       {
         kind = DELETE
@@ -77,10 +77,17 @@ source {
   }
 }
 
+transform {
+
+}
+
 sink {
   Paimon {
     warehouse = "file:///tmp/paimon"
-    database = "seatunnel_namespace"
+    database = "seatunnel_namespace3"
     table = "st_test"
+    paimon.table.write-props = {
+        bucket = 2
+    }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
similarity index 72%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
index 59e3a0cf72..9a287a61b1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
@@ -29,58 +29,63 @@ source {
       fields {
         pk_id = bigint
         name = string
-        score = int
-      }
-      primaryKey {
-        name = "pk_id"
-        columnNames = [pk_id]
+        dt = string
       }
     }
     rows = [
       {
         kind = INSERT
-        fields = [1, "A", 100]
+        fields = [1, "A", "2024-03-19"]
       },
       {
         kind = INSERT
-        fields = [2, "B", 100]
+        fields = [2, "B", "2024-03-19"]
       },
       {
         kind = INSERT
-        fields = [3, "C", 100]
+        fields = [3, "C", "2024-03-19"]
       },
       {
         kind = INSERT
-        fields = [3, "C", 100]
+        fields = [3, "C", "2024-03-19"]
       },
       {
         kind = INSERT
-        fields = [3, "C", 100]
+        fields = [3, "C", "2024-03-19"]
       },
       {
         kind = INSERT
-        fields = [3, "C", 100]
-      }
+        fields = [3, "C", "2024-03-19"]
+      },
       {
         kind = UPDATE_BEFORE
-        fields = [1, "A", 100]
+        fields = [1, "A", "2024-03-19"]
       },
       {
         kind = UPDATE_AFTER
-        fields = [1, "A_1", 100]
+        fields = [1, "A_1", "2024-03-20"]
       },
       {
         kind = DELETE
-        fields = [2, "B", 100]
+        fields = [2, "B", "2024-03-19"]
       }
     ]
   }
 }
 
+transform {
+
+}
+
 sink {
   Paimon {
     warehouse = "file:///tmp/paimon"
-    database = "seatunnel_namespace"
+    database = "seatunnel_namespace4"
     table = "st_test"
+    paimon.table.write-props = {
+        bucket = 2
+    }
+    paimon.table.partition-keys = "dt"
+    paimon.table.primary-keys = "pk_id,dt"
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
similarity index 91%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
index 59e3a0cf72..65df2115f4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
@@ -60,14 +60,14 @@ source {
       {
         kind = INSERT
         fields = [3, "C", 100]
-      }
+      },
       {
         kind = UPDATE_BEFORE
         fields = [1, "A", 100]
       },
       {
         kind = UPDATE_AFTER
-        fields = [1, "A_1", 100]
+        fields = [1, "A_1", 19]
       },
       {
         kind = DELETE
@@ -77,10 +77,17 @@ source {
   }
 }
 
+transform {
+
+}
+
 sink {
   Paimon {
     warehouse = "file:///tmp/paimon"
-    database = "seatunnel_namespace"
+    database = "seatunnel_namespace5"
     table = "st_test"
+    paimon.table.write-props = {
+        file.format = "parquet"
+    }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
similarity index 91%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
index 59e3a0cf72..102747ef0f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
@@ -60,14 +60,14 @@ source {
       {
         kind = INSERT
         fields = [3, "C", 100]
-      }
+      },
       {
         kind = UPDATE_BEFORE
         fields = [1, "A", 100]
       },
       {
         kind = UPDATE_AFTER
-        fields = [1, "A_1", 100]
+        fields = [1, "A_1", 19]
       },
       {
         kind = DELETE
@@ -77,10 +77,17 @@ source {
   }
 }
 
+transform {
+
+}
+
 sink {
   Paimon {
     warehouse = "file:///tmp/paimon"
-    database = "seatunnel_namespace"
+    database = "seatunnel_namespace6"
     table = "st_test"
+    paimon.table.write-props = {
+        file.format = "avro"
+    }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
new file mode 100644
index 0000000000..6578c72358
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
@@ -0,0 +1,127 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      columns = [
+         {
+            name = pk_id
+            type = bigint
+            nullable = false
+            comment = "primary key id"
+         },
+         {
+            name = name
+            type = "string"
+            nullable = true
+            comment = "name"
+         },
+         {
+            name = one_time
+            type = timestamp
+            nullable = false
+            comment = "one time"
+            columnScale = 0
+         },
+          {
+             name = two_time
+             type = timestamp
+             nullable = false
+             comment = "two time"
+             columnScale = 3
+          },
+         {
+            name = three_time
+            type = timestamp
+            nullable = false
+            comment = "three time"
+            columnScale = 6
+         },
+          {
+             name = four_time
+             type = timestamp
+             nullable = false
+             comment = "four time"
+             columnScale = 9
+          }
+      ]
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", 
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", 
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", 
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", 
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", 
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", 
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", 
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "A_1", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", 
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+      },
+      {
+        kind = DELETE
+        fields = [2, "B", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", 
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+      }
+    ]
+  }
+}
+
+transform {
+
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "seatunnel_namespace7"
+    table = "st_test"
+  }
+}


Reply via email to