This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b0abbd2d89 [Feature][Connector-V2] Support multi-table sink feature 
for paimon #5652 (#6449)
b0abbd2d89 is described below

commit b0abbd2d89d07409c6c90f1b278e8cca65d1b9ed
Author: dailai <[email protected]>
AuthorDate: Tue Mar 19 10:48:32 2024 +0800

    [Feature][Connector-V2] Support multi-table sink feature for paimon #5652 
(#6449)
---
 docs/en/connector-v2/sink/Paimon.md                |  82 +++++--
 docs/zh/connector-v2/sink/Paimon.md                |  87 +++++++
 seatunnel-connectors-v2/connector-paimon/pom.xml   |   5 +
 .../seatunnel/paimon/catalog/PaimonCatalog.java    | 215 +++++++++++++++++
 .../PaimonCatalogFactory.java}                     |  26 ++-
 .../paimon/catalog/PaimonCatalogLoader.java        |  61 +++++
 .../seatunnel/paimon/catalog/PaimonTable.java      |  28 +++
 .../seatunnel/paimon/config/PaimonConfig.java      |   9 +-
 .../seatunnel/paimon/config/PaimonSinkConfig.java  |  66 ++++++
 .../seatunnel/paimon/data/PaimonTypeMapper.java    |  50 ++++
 .../paimon/handler/PaimonSaveModeHandler.java      |  58 +++++
 .../seatunnel/paimon/sink/PaimonSink.java          | 146 ++++++------
 .../seatunnel/paimon/sink/PaimonSinkFactory.java   |  68 +++++-
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    |  57 ++++-
 ...aimonSinkFactory.java => SupportLoadTable.java} |  26 +--
 .../sink/commit/PaimonAggregatedCommitter.java     |  34 ++-
 .../seatunnel/paimon/utils/JobContextUtil.java     |  32 +++
 .../seatunnel/paimon/utils/RowConverter.java       |  12 +-
 .../seatunnel/paimon/utils/RowKindConverter.java   |  51 ++++
 .../seatunnel/paimon/utils/RowTypeConverter.java   |  85 ++++++-
 .../seatunnel/paimon/utils/SchemaUtil.java         |  54 +++++
 .../paimon/utils/RowTypeConverterTest.java         |   2 +-
 .../connector-paimon-e2e/pom.xml                   |   9 +
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      | 260 +++++++++++++++++++++
 .../test/resources/fake_cdc_sink_paimon_case1.conf |  86 +++++++
 .../test/resources/fake_cdc_sink_paimon_case2.conf | 142 +++++++++++
 .../seatunnel-hadoop3-3.1.4-uber/pom.xml           |   5 +
 27 files changed, 1596 insertions(+), 160 deletions(-)

diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index 6fa721a1e6..5e9d3c431f 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -4,7 +4,7 @@
 
 ## Description
 
-Write data to Apache Paimon.
+Sink connector for Apache Paimon. It can support cdc mode 、auto create table.
 
 ## Key features
 
@@ -12,40 +12,76 @@ Write data to Apache Paimon.
 
 ## Options
 
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| warehouse      | String | Yes      | -             |
-| database       | String | Yes      | -             |
-| table          | String | Yes      | -             |
-| hdfs_site_path | String | No       | -             |
+|       name       |  type  | required |        default value         |        
   Description           |
+|------------------|--------|----------|------------------------------|---------------------------------|
+| warehouse        | String | Yes      | -                            | Paimon 
warehouse path           |
+| database         | String | Yes      | -                            | The 
database you want to access |
+| table            | String | Yes      | -                            | The 
table you want to access    |
+| hdfs_site_path   | String | No       | -                            |        
                         |
+| schema_save_mode | Enum   | no       | CREATE_SCHEMA_WHEN_NOT_EXIST | The 
schema save mode            |
+| data_save_mode   | Enum   | no       | APPEND_DATA                  | The 
data save mode              |
 
-### warehouse [string]
-
-Paimon warehouse path
-
-### database [string]
+## Examples
 
-The database you want to access
+### Single table
 
-### table [String]
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
 
-The table you want to access
+source {
+  Mysql-CDC {
+    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+    username = "root"
+    password = "******"
+    table-names = ["seatunnel.role"]
+  }
+}
 
-## Examples
+transform {
+}
 
-```hocon
 sink {
   Paimon {
-    warehouse = "/tmp/paimon"
-    database = "default"
-    table = "st_test"
+    catalog_name="seatunnel_test"
+    warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+    database="seatunnel"
+    table="role"
   }
 }
 ```
 
-## Changelog
+### Multiple table
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  Mysql-CDC {
+    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+    username = "root"
+    password = "******"
+    table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
+  }
+}
 
-### next version
+transform {
+}
 
-- Add Paimon Sink Connector
+sink {
+  Paimon {
+    catalog_name="seatunnel_test"
+    warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+    database="${database_name}"
+    table="${table_name}"
+  }
+}
+```
 
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
new file mode 100644
index 0000000000..b1b4baef9b
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -0,0 +1,87 @@
+# Paimon
+
+> Paimon 数据连接器
+
+## 描述
+
+Apache Paimon数据连接器。支持cdc写以及自动建表。
+
+## 主要特性
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+## 连接器选项
+
+|        名称        |   类型   | 是否必须 |             默认值              |         描述 
        |
+|------------------|--------|------|------------------------------|--------------------|
+| warehouse        | String | Yes  | -                            | Paimon 
warehouse路径 |
+| database         | String | Yes  | -                            | 数据库名称      
        |
+| table            | String | Yes  | -                            | 表名         
        |
+| hdfs_site_path   | String | No   | -                            |            
        |
+| schema_save_mode | Enum   | no   | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式 
        |
+| data_save_mode   | Enum   | no   | APPEND_DATA                  | 数据保存模式     
        |
+
+## 示例
+
+### 单表
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  Mysql-CDC {
+    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+    username = "root"
+    password = "******"
+    table-names = ["seatunnel.role"]
+  }
+}
+
+transform {
+}
+
+sink {
+  Paimon {
+    catalog_name="seatunnel_test"
+    warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+    database="seatunnel"
+    table="role"
+  }
+}
+```
+
+### 多表
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  Mysql-CDC {
+    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+    username = "root"
+    password = "******"
+    table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
+  }
+}
+
+transform {
+}
+
+sink {
+  Paimon {
+    catalog_name="seatunnel_test"
+    warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+    database="${database_name}"
+    table="${table_name}"
+  }
+}
+```
+
diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml 
b/seatunnel-connectors-v2/connector-paimon/pom.xml
index 8bcb1c3507..499165ea6f 100644
--- a/seatunnel-connectors-v2/connector-paimon/pom.xml
+++ b/seatunnel-connectors-v2/connector-paimon/pom.xml
@@ -34,6 +34,11 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.paimon</groupId>
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
new file mode 100644
index 0000000000..7312ed28b0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+@Slf4j
+public class PaimonCatalog implements Catalog, PaimonTable {
+    private static final String DEFAULT_DATABASE = "default";
+
+    private String catalogName;
+    private ReadonlyConfig readonlyConfig;
+    private PaimonCatalogLoader paimonCatalogLoader;
+    private org.apache.paimon.catalog.Catalog catalog;
+
+    public PaimonCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
+        this.readonlyConfig = readonlyConfig;
+        this.catalogName = catalogName;
+        this.paimonCatalogLoader = new PaimonCatalogLoader(new 
PaimonSinkConfig(readonlyConfig));
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        this.catalog = paimonCatalogLoader.loadCatalog();
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        if (catalog != null && catalog instanceof Closeable) {
+            try {
+                ((Closeable) catalog).close();
+            } catch (IOException e) {
+                log.error("Error while closing IcebergCatalog.", e);
+                throw new CatalogException(e);
+            }
+        }
+    }
+
+    @Override
+    public String name() {
+        return this.catalogName;
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return DEFAULT_DATABASE;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return catalog.databaseExists(databaseName);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return catalog.listDatabases();
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        try {
+            return catalog.listTables(databaseName);
+        } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException 
e) {
+            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
+        }
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        return catalog.tableExists(toIdentifier(tablePath));
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        try {
+            FileStoreTable paimonFileStoreTableTable = (FileStoreTable) 
getPaimonTable(tablePath);
+            return toCatalogTable(paimonFileStoreTableTable, tablePath);
+        } catch (Exception e) {
+            throw new TableNotExistException(this.catalogName, tablePath);
+        }
+    }
+
+    @Override
+    public Table getPaimonTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        try {
+            return catalog.getTable(toIdentifier(tablePath));
+        } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+            throw new TableNotExistException(this.catalogName, tablePath);
+        }
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        try {
+            catalog.createTable(
+                    toIdentifier(tablePath),
+                    SchemaUtil.toPaimonSchema(table.getTableSchema()),
+                    ignoreIfExists);
+        } catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException 
e) {
+            throw new TableAlreadyExistException(this.catalogName, tablePath);
+        } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException 
e) {
+            throw new DatabaseNotExistException(this.catalogName, 
tablePath.getDatabaseName());
+        }
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists);
+        } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+            throw new TableNotExistException(this.catalogName, tablePath);
+        }
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        try {
+            catalog.createDatabase(tablePath.getDatabaseName(), 
ignoreIfExists);
+        } catch 
(org.apache.paimon.catalog.Catalog.DatabaseAlreadyExistException e) {
+            throw new DatabaseAlreadyExistException(this.catalogName, 
tablePath.getDatabaseName());
+        }
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        try {
+            catalog.dropDatabase(tablePath.getDatabaseName(), 
ignoreIfNotExists, true);
+        } catch (Exception e) {
+            throw new DatabaseNotExistException(this.catalogName, 
tablePath.getDatabaseName());
+        }
+    }
+
+    private CatalogTable toCatalogTable(
+            FileStoreTable paimonFileStoreTableTable, TablePath tablePath) {
+        org.apache.paimon.schema.TableSchema schema = 
paimonFileStoreTableTable.schema();
+        List<DataField> dataFields = schema.fields();
+        TableSchema.Builder builder = TableSchema.builder();
+        dataFields.forEach(
+                dataField -> {
+                    String name = dataField.name();
+                    SeaTunnelDataType<?> seaTunnelType =
+                            SchemaUtil.toSeaTunnelType(dataField.type());
+                    PhysicalColumn physicalColumn =
+                            PhysicalColumn.of(
+                                    name,
+                                    seaTunnelType,
+                                    (Long) null,
+                                    true,
+                                    null,
+                                    dataField.description());
+                    builder.column(physicalColumn);
+                });
+
+        List<String> partitionKeys = schema.partitionKeys();
+
+        return CatalogTable.of(
+                org.apache.seatunnel.api.table.catalog.TableIdentifier.of(
+                        catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName()),
+                builder.build(),
+                paimonFileStoreTableTable.options(),
+                partitionKeys,
+                null,
+                catalogName);
+    }
+
+    private Identifier toIdentifier(TablePath tablePath) {
+        return Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
similarity index 59%
copy from 
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
copy to 
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
index dfae43c482..4d94f385d9 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
@@ -15,17 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 
 import com.google.auto.service.AutoService;
 
 @AutoService(Factory.class)
-public class PaimonSinkFactory implements TableSinkFactory {
+public class PaimonCatalogFactory implements CatalogFactory {
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig 
readonlyConfig) {
+        return new PaimonCatalog(catalogName, readonlyConfig);
+    }
 
     @Override
     public String factoryIdentifier() {
@@ -35,10 +41,14 @@ public class PaimonSinkFactory implements TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(PaimonConfig.WAREHOUSE)
-                .required(PaimonConfig.DATABASE)
-                .required(PaimonConfig.TABLE)
-                .optional(PaimonConfig.HDFS_SITE_PATH)
+                .required(
+                        PaimonSinkConfig.WAREHOUSE,
+                        PaimonSinkConfig.DATABASE,
+                        PaimonSinkConfig.TABLE)
+                .optional(
+                        PaimonSinkConfig.HDFS_SITE_PATH,
+                        PaimonSinkConfig.SCHEMA_SAVE_MODE,
+                        PaimonSinkConfig.DATA_SAVE_MODE)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
new file mode 100644
index 0000000000..bec66dbe3f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.Options;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;
+
+@Slf4j
+public class PaimonCatalogLoader implements Serializable {
+    private PaimonSinkConfig config;
+
+    public PaimonCatalogLoader(PaimonSinkConfig config) {
+        this.config = config;
+    }
+
+    public Catalog loadCatalog() {
+        // When using the seatunel engine, set the current class loader to 
prevent loading failures
+        
Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader());
+        final String warehouse = config.getWarehouse();
+        final Map<String, String> optionsMap = new HashMap<>();
+        optionsMap.put(WAREHOUSE.key(), warehouse);
+        final Options options = Options.fromMap(optionsMap);
+        final Configuration hadoopConf = new Configuration();
+        String hdfsSitePathOptional = config.getHdfsSitePath();
+        if (StringUtils.isNotBlank(hdfsSitePathOptional)) {
+            hadoopConf.addResource(new Path(hdfsSitePathOptional));
+        }
+        final CatalogContext catalogContext = CatalogContext.create(options, 
hadoopConf);
+        return CatalogFactory.createCatalog(catalogContext);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java
new file mode 100644
index 0000000000..55b18f79ab
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+
+import org.apache.paimon.table.Table;
+
+public interface PaimonTable {
+    Table getPaimonTable(TablePath tablePath) throws CatalogException, 
TableNotExistException;
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
index b5299d8755..0396e6223a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
@@ -22,13 +22,14 @@ import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 
+import java.io.Serializable;
 import java.util.List;
 
 /**
  * Utility class to store configuration options, used by {@link 
SeaTunnelSource} and {@link
  * SeaTunnelSink}.
  */
-public class PaimonConfig {
+public class PaimonConfig implements Serializable {
 
     public static final Option<String> WAREHOUSE =
             Options.key("warehouse")
@@ -36,6 +37,12 @@ public class PaimonConfig {
                     .noDefaultValue()
                     .withDescription("The warehouse path of paimon");
 
+    public static final Option<String> CATALOG_NAME =
+            Options.key("catalog_name")
+                    .stringType()
+                    .defaultValue("paimon")
+                    .withDescription(" the iceberg catalog name");
+
     public static final Option<String> DATABASE =
             Options.key("database")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
new file mode 100644
index 0000000000..589fd94816
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+
+import lombok.Getter;
+
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+@Getter
+public class PaimonSinkConfig extends PaimonConfig {
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")
+                    .enumType(SchemaSaveMode.class)
+                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+                    .withDescription("schema_save_mode");
+
+    public static final Option<DataSaveMode> DATA_SAVE_MODE =
+            Options.key("data_save_mode")
+                    .enumType(DataSaveMode.class)
+                    .defaultValue(DataSaveMode.APPEND_DATA)
+                    .withDescription("data_save_mode");
+
+    private String catalogName;
+    private String warehouse;
+    private String namespace;
+    private String table;
+    private String hdfsSitePath;
+    private SchemaSaveMode schemaSaveMode;
+    private DataSaveMode dataSaveMode;
+
+    public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
+        this.catalogName = 
checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME));
+        this.warehouse = checkArgumentNotNull(readonlyConfig.get(WAREHOUSE));
+        this.namespace = checkArgumentNotNull(readonlyConfig.get(DATABASE));
+        this.table = checkArgumentNotNull(readonlyConfig.get(TABLE));
+        this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH);
+        this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
+        this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
+    }
+
+    protected <T> T checkArgumentNotNull(T argument) {
+        checkNotNull(argument);
+        return argument;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
new file mode 100644
index 0000000000..1f8b1cff32
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.data;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
+
+import org.apache.paimon.types.DataType;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(TypeConverter.class)
+public class PaimonTypeMapper implements TypeConverter<DataType> {
+    public static final PaimonTypeMapper INSTANCE = new PaimonTypeMapper();
+
+    @Override
+    public String identifier() {
+        return PaimonSink.PLUGIN_NAME;
+    }
+
+    @Override
+    public Column convert(DataType dataType) {
+        return 
PhysicalColumn.builder().dataType(RowTypeConverter.convert(dataType)).build();
+    }
+
+    @Override
+    public DataType reconvert(Column column) {
+        return RowTypeConverter.reconvert(column.getDataType());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
new file mode 100644
index 0000000000..b479ebf14b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.handler;
+
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
+import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;
+
+import org.apache.paimon.table.Table;
+
+public class PaimonSaveModeHandler extends DefaultSaveModeHandler {
+
+    private SupportLoadTable<Table> supportLoadTable;
+    private Catalog catalog;
+    private CatalogTable catalogTable;
+
+    public PaimonSaveModeHandler(
+            SupportLoadTable supportLoadTable,
+            SchemaSaveMode schemaSaveMode,
+            DataSaveMode dataSaveMode,
+            Catalog catalog,
+            CatalogTable catalogTable,
+            String customSql) {
+        super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql);
+        this.supportLoadTable = supportLoadTable;
+        this.catalog = catalog;
+        this.catalogTable = catalogTable;
+    }
+
+    @Override
+    public void handleSchemaSaveMode() {
+        super.handleSchemaSaveMode();
+        TablePath tablePath = catalogTable.getTablePath();
+        Table paimonTable = ((PaimonCatalog) 
catalog).getPaimonTable(tablePath);
+        // load paimon table and set it into paimon sink
+        this.supportLoadTable.setLoadTable(paimonTable);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index ac1a0b97ed..cdec4b0c76 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -17,54 +17,46 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
-import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitter;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.table.Table;
 
-import com.google.auto.service.AutoService;
-
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.HDFS_SITE_PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
 
-@AutoService(SeaTunnelSink.class)
 public class PaimonSink
         implements SeaTunnelSink<
-                SeaTunnelRow, PaimonSinkState, PaimonCommitInfo, 
PaimonAggregatedCommitInfo> {
+                        SeaTunnelRow,
+                        PaimonSinkState,
+                        PaimonCommitInfo,
+                        PaimonAggregatedCommitInfo>,
+                SupportSaveMode,
+                SupportLoadTable<Table>,
+                SupportMultiTableSink {
 
     private static final long serialVersionUID = 1L;
 
@@ -72,79 +64,44 @@ public class PaimonSink
 
     private SeaTunnelRowType seaTunnelRowType;
 
-    private Config pluginConfig;
-
     private Table table;
 
-    @Override
-    public String getPluginName() {
-        return PLUGIN_NAME;
-    }
+    private JobContext jobContext;
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig, WAREHOUSE.key(), DATABASE.key(), 
TABLE.key());
-        if (!result.isSuccess()) {
-            throw new PaimonConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        // initialize paimon table
-        final String warehouse = pluginConfig.getString(WAREHOUSE.key());
-        final String database = pluginConfig.getString(DATABASE.key());
-        final String table = pluginConfig.getString(TABLE.key());
-        final Map<String, String> optionsMap = new HashMap<>();
-        optionsMap.put(WAREHOUSE.key(), warehouse);
-        final Options options = Options.fromMap(optionsMap);
-        final Configuration hadoopConf = new Configuration();
-        if (pluginConfig.hasPath(HDFS_SITE_PATH.key())) {
-            hadoopConf.addResource(new 
Path(pluginConfig.getString(HDFS_SITE_PATH.key())));
-        }
-        final CatalogContext catalogContext = CatalogContext.create(options, 
hadoopConf);
-        try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) {
-            Identifier identifier = Identifier.create(database, table);
-            this.table = catalog.getTable(identifier);
-        } catch (Exception e) {
-            String errorMsg =
-                    String.format(
-                            "Failed to get table [%s] from database [%s] on 
warehouse [%s]",
-                            database, table, warehouse);
-            throw new PaimonConnectorException(
-                    PaimonConnectorErrorCode.GET_TABLE_FAILED, errorMsg, e);
-        }
-    }
+    private ReadonlyConfig readonlyConfig;
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    private PaimonSinkConfig paimonSinkConfig;
+
+    private CatalogTable catalogTable;
+
+    public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
+        this.readonlyConfig = readonlyConfig;
+        this.paimonSinkConfig = new PaimonSinkConfig(readonlyConfig);
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
+    public String getPluginName() {
+        return PLUGIN_NAME;
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
-        return new PaimonSinkWriter(context, table, seaTunnelRowType);
+        return new PaimonSinkWriter(context, table, seaTunnelRowType, 
jobContext);
     }
 
     @Override
     public Optional<SinkAggregatedCommitter<PaimonCommitInfo, 
PaimonAggregatedCommitInfo>>
             createAggregatedCommitter() throws IOException {
-        return Optional.of(new PaimonAggregatedCommitter(table));
+        return Optional.of(new PaimonAggregatedCommitter(table, jobContext));
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState> 
restoreWriter(
             SinkWriter.Context context, List<PaimonSinkState> states) throws 
IOException {
-        return new PaimonSinkWriter(context, table, seaTunnelRowType, states);
+        return new PaimonSinkWriter(context, table, seaTunnelRowType, states, 
jobContext);
     }
 
     @Override
@@ -156,4 +113,43 @@ public class PaimonSink
     public Optional<Serializer<PaimonCommitInfo>> getCommitInfoSerializer() {
         return Optional.of(new DefaultSerializer<>());
     }
+
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+    }
+
+    @Override
+    public Optional<SaveModeHandler> getSaveModeHandler() {
+        org.apache.seatunnel.api.table.factory.CatalogFactory catalogFactory =
+                discoverFactory(
+                        Thread.currentThread().getContextClassLoader(),
+                        
org.apache.seatunnel.api.table.factory.CatalogFactory.class,
+                        "Paimon");
+        if (catalogFactory == null) {
+            throw new PaimonConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            getPluginName(),
+                            PluginType.SINK,
+                            "Cannot find paimon catalog factory"));
+        }
+        org.apache.seatunnel.api.table.catalog.Catalog catalog =
+                
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), 
readonlyConfig);
+        catalog.open();
+        return Optional.of(
+                new PaimonSaveModeHandler(
+                        this,
+                        paimonSinkConfig.getSchemaSaveMode(),
+                        paimonSinkConfig.getDataSaveMode(),
+                        catalog,
+                        catalogTable,
+                        null));
+    }
+
+    @Override
+    public void setLoadTable(Table table) {
+        this.table = table;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
index dfae43c482..c0b4d997ea 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
@@ -17,16 +17,30 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+
+import org.apache.commons.lang3.StringUtils;
 
 import com.google.auto.service.AutoService;
 
 @AutoService(Factory.class)
 public class PaimonSinkFactory implements TableSinkFactory {
 
+    public static final String REPLACE_TABLE_NAME_KEY = "${table_name}";
+
+    public static final String REPLACE_SCHEMA_NAME_KEY = "${schema_name}";
+
+    public static final String REPLACE_DATABASE_NAME_KEY = "${database_name}";
+
     @Override
     public String factoryIdentifier() {
         return "Paimon";
@@ -35,10 +49,56 @@ public class PaimonSinkFactory implements TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(PaimonConfig.WAREHOUSE)
-                .required(PaimonConfig.DATABASE)
-                .required(PaimonConfig.TABLE)
-                .optional(PaimonConfig.HDFS_SITE_PATH)
+                .required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE, 
PaimonConfig.TABLE)
+                .optional(
+                        PaimonConfig.HDFS_SITE_PATH,
+                        PaimonSinkConfig.SCHEMA_SAVE_MODE,
+                        PaimonSinkConfig.DATA_SAVE_MODE)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        CatalogTable catalogTable =
+                renameCatalogTable(new PaimonSinkConfig(readonlyConfig), 
context.getCatalogTable());
+        return () -> new PaimonSink(context.getOptions(), catalogTable);
+    }
+
+    private CatalogTable renameCatalogTable(
+            PaimonSinkConfig paimonSinkConfig, CatalogTable catalogTable) {
+        TableIdentifier tableId = catalogTable.getTableId();
+        String tableName;
+        String namespace;
+        if (StringUtils.isNotEmpty(paimonSinkConfig.getTable())) {
+            tableName = replaceName(paimonSinkConfig.getTable(), tableId);
+        } else {
+            tableName = tableId.getTableName();
+        }
+
+        if (StringUtils.isNotEmpty(paimonSinkConfig.getNamespace())) {
+            namespace = replaceName(paimonSinkConfig.getNamespace(), tableId);
+        } else {
+            namespace = tableId.getSchemaName();
+        }
+
+        TableIdentifier newTableId =
+                TableIdentifier.of(
+                        tableId.getCatalogName(), namespace, 
tableId.getSchemaName(), tableName);
+
+        return CatalogTable.of(newTableId, catalogTable);
+    }
+
+    private String replaceName(String original, TableIdentifier tableId) {
+        if (tableId.getTableName() != null) {
+            original = original.replace(REPLACE_TABLE_NAME_KEY, 
tableId.getTableName());
+        }
+        if (tableId.getSchemaName() != null) {
+            original = original.replace(REPLACE_SCHEMA_NAME_KEY, 
tableId.getSchemaName());
+        }
+        if (tableId.getDatabaseName() != null) {
+            original = original.replace(REPLACE_DATABASE_NAME_KEY, 
tableId.getDatabaseName());
+        }
+        return original;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 930f62045f..7b2e8327a9 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -17,21 +17,28 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableCommit;
+import org.apache.paimon.table.sink.TableWrite;
+import org.apache.paimon.table.sink.WriteBuilder;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -46,13 +53,14 @@ import java.util.stream.Collectors;
 
 @Slf4j
 public class PaimonSinkWriter
-        implements SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState> 
{
+        implements SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>,
+                SupportMultiTableSinkWriter<Void> {
 
     private String commitUser = UUID.randomUUID().toString();
 
-    private final BatchWriteBuilder tableWriteBuilder;
+    private final WriteBuilder tableWriteBuilder;
 
-    private final BatchTableWrite tableWrite;
+    private final TableWrite tableWrite;
 
     private long checkpointId = 0;
 
@@ -64,37 +72,58 @@ public class PaimonSinkWriter
 
     private final SinkWriter.Context context;
 
-    public PaimonSinkWriter(Context context, Table table, SeaTunnelRowType 
seaTunnelRowType) {
+    private final JobContext jobContext;
+
+    public PaimonSinkWriter(
+            Context context,
+            Table table,
+            SeaTunnelRowType seaTunnelRowType,
+            JobContext jobContext) {
         this.table = table;
-        this.tableWriteBuilder = 
this.table.newBatchWriteBuilder().withOverwrite();
+        this.tableWriteBuilder =
+                JobContextUtil.isBatchJob(jobContext)
+                        ? this.table.newBatchWriteBuilder().withOverwrite()
+                        : this.table.newStreamWriteBuilder();
         this.tableWrite = tableWriteBuilder.newWrite();
         this.seaTunnelRowType = seaTunnelRowType;
         this.context = context;
+        this.jobContext = jobContext;
     }
 
     public PaimonSinkWriter(
             Context context,
             Table table,
             SeaTunnelRowType seaTunnelRowType,
-            List<PaimonSinkState> states) {
+            List<PaimonSinkState> states,
+            JobContext jobContext) {
         this.table = table;
-        this.tableWriteBuilder = 
this.table.newBatchWriteBuilder().withOverwrite();
+        this.tableWriteBuilder =
+                JobContextUtil.isBatchJob(jobContext)
+                        ? this.table.newBatchWriteBuilder().withOverwrite()
+                        : this.table.newStreamWriteBuilder();
         this.tableWrite = tableWriteBuilder.newWrite();
         this.seaTunnelRowType = seaTunnelRowType;
         this.context = context;
+        this.jobContext = jobContext;
         if (Objects.isNull(states) || states.isEmpty()) {
             return;
         }
         this.commitUser = states.get(0).getCommitUser();
         this.checkpointId = states.get(0).getCheckpointId();
-        try (BatchTableCommit tableCommit = tableWriteBuilder.newCommit()) {
+        try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
             List<CommitMessage> commitables =
                     states.stream()
                             .map(PaimonSinkState::getCommittables)
                             .flatMap(List::stream)
                             .collect(Collectors.toList());
             log.info("Trying to recommit states {}", commitables);
-            tableCommit.commit(commitables);
+            if (JobContextUtil.isBatchJob(jobContext)) {
+                log.debug("Trying to recommit states batch mode");
+                ((BatchTableCommit) tableCommit).commit(commitables);
+            } else {
+                log.debug("Trying to recommit states streaming mode");
+                ((StreamTableCommit) 
tableCommit).commit(Objects.hash(commitables), commitables);
+            }
         } catch (Exception e) {
             throw new PaimonConnectorException(
                     PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
@@ -117,7 +146,13 @@ public class PaimonSinkWriter
     @Override
     public Optional<PaimonCommitInfo> prepareCommit() throws IOException {
         try {
-            List<CommitMessage> fileCommittables = tableWrite.prepareCommit();
+            List<CommitMessage> fileCommittables;
+            if (JobContextUtil.isBatchJob(jobContext)) {
+                fileCommittables = ((BatchTableWrite) 
tableWrite).prepareCommit();
+            } else {
+                fileCommittables =
+                        ((StreamTableWrite) tableWrite).prepareCommit(false, 
committables.size());
+            }
             committables.addAll(fileCommittables);
             return Optional.of(new PaimonCommitInfo(fileCommittables));
         } catch (Exception e) {
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
similarity index 52%
copy from 
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
copy to 
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
index dfae43c482..734762e23c 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
@@ -17,28 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
 
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(Factory.class)
-public class PaimonSinkFactory implements TableSinkFactory {
-
-    @Override
-    public String factoryIdentifier() {
-        return "Paimon";
-    }
-
-    @Override
-    public OptionRule optionRule() {
-        return OptionRule.builder()
-                .required(PaimonConfig.WAREHOUSE)
-                .required(PaimonConfig.DATABASE)
-                .required(PaimonConfig.TABLE)
-                .optional(PaimonConfig.HDFS_SITE_PATH)
-                .build();
-    }
+public interface SupportLoadTable<T> {
+    void setLoadTable(T table);
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index 987d8fbb80..2c0be5d424 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -17,14 +17,20 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
 
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.TableCommit;
+import org.apache.paimon.table.sink.WriteBuilder;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -32,35 +38,49 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /** Paimon connector aggregated committer class */
 @Slf4j
 public class PaimonAggregatedCommitter
-        implements SinkAggregatedCommitter<PaimonCommitInfo, 
PaimonAggregatedCommitInfo> {
+        implements SinkAggregatedCommitter<PaimonCommitInfo, 
PaimonAggregatedCommitInfo>,
+                SupportMultiTableSinkAggregatedCommitter {
 
     private static final long serialVersionUID = 1L;
 
     private final Lock.Factory localFactory = Lock.emptyFactory();
 
-    private final Table table;
+    private final WriteBuilder tableWriteBuilder;
 
-    public PaimonAggregatedCommitter(Table table) {
-        this.table = table;
+    private final JobContext jobContext;
+
+    public PaimonAggregatedCommitter(Table table, JobContext jobContext) {
+        this.jobContext = jobContext;
+        this.tableWriteBuilder =
+                JobContextUtil.isBatchJob(jobContext)
+                        ? table.newBatchWriteBuilder()
+                        : table.newStreamWriteBuilder();
     }
 
     @Override
     public List<PaimonAggregatedCommitInfo> commit(
             List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) throws 
IOException {
-        try (BatchTableCommit tableCommit =
-                table.newBatchWriteBuilder().withOverwrite().newCommit()) {
+        try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
             List<CommitMessage> fileCommittables =
                     aggregatedCommitInfo.stream()
                             .map(PaimonAggregatedCommitInfo::getCommittables)
                             .flatMap(List::stream)
                             .flatMap(List::stream)
                             .collect(Collectors.toList());
-            tableCommit.commit(fileCommittables);
+            if (JobContextUtil.isBatchJob(jobContext)) {
+                log.debug("Trying to commit states batch mode");
+                ((BatchTableCommit) tableCommit).commit(fileCommittables);
+            } else {
+                log.debug("Trying to commit states streaming mode");
+                ((StreamTableCommit) tableCommit)
+                        .commit(Objects.hash(fileCommittables), 
fileCommittables);
+            }
         } catch (Exception e) {
             throw new PaimonConnectorException(
                     PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED,
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
new file mode 100644
index 0000000000..3a4d9b72d4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.common.constants.JobMode;
+
+import lombok.extern.slf4j.Slf4j;
+
+/** Job env util */
+@Slf4j
+public class JobContextUtil {
+
+    public static boolean isBatchJob(JobContext jobContext) {
+        return jobContext.getJobMode().equals(JobMode.BATCH);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 44f8fb2624..6b9a6bf01c 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -333,6 +333,10 @@ public class RowConverter {
             SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType) {
         BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields());
         BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
+        // Convert SeaTunnel RowKind to Paimon RowKind
+        org.apache.paimon.types.RowKind rowKind =
+                
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind());
+        binaryRow.setRowKind(rowKind);
         SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
         for (int i = 0; i < fieldTypes.length; i++) {
             // judge the field is or not equals null
@@ -393,8 +397,8 @@ public class RowConverter {
                     MapType<?, ?> mapType = (MapType<?, ?>) 
seaTunnelRowType.getFieldType(i);
                     SeaTunnelDataType<?> keyType = mapType.getKeyType();
                     SeaTunnelDataType<?> valueType = mapType.getValueType();
-                    DataType paimonKeyType = RowTypeConverter.convert(keyType);
-                    DataType paimonValueType = 
RowTypeConverter.convert(valueType);
+                    DataType paimonKeyType = 
RowTypeConverter.reconvert(keyType);
+                    DataType paimonValueType = 
RowTypeConverter.reconvert(valueType);
                     Map<?, ?> field = (Map<?, ?>) seaTunnelRow.getField(i);
                     Object[] keys = field.keySet().toArray(new Object[0]);
                     Object[] values = field.values().toArray(new Object[0]);
@@ -411,13 +415,13 @@ public class RowConverter {
                             i,
                             paimonArray,
                             new InternalArraySerializer(
-                                    
RowTypeConverter.convert(arrayType.getElementType())));
+                                    
RowTypeConverter.reconvert(arrayType.getElementType())));
                     break;
                 case ROW:
                     SeaTunnelDataType<?> rowType = 
seaTunnelRowType.getFieldType(i);
                     Object row = seaTunnelRow.getField(i);
                     InternalRow paimonRow = convert((SeaTunnelRow) row, 
(SeaTunnelRowType) rowType);
-                    RowType paimonRowType = 
RowTypeConverter.convert((SeaTunnelRowType) rowType);
+                    RowType paimonRowType = 
RowTypeConverter.reconvert((SeaTunnelRowType) rowType);
                     binaryWriter.writeRow(i, paimonRow, new 
InternalRowSerializer(paimonRowType));
                     break;
                 default:
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
new file mode 100644
index 0000000000..ce6a172e43
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
+
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+
+import org.apache.paimon.data.InternalRow;
+
+public class RowKindConverter {
+
+    /**
+     * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link 
InternalRow}
+     *
+     * @param seaTunnelRowInd
+     * @return
+     */
+    public static org.apache.paimon.types.RowKind 
convertSeaTunnelRowKind2PaimonRowKind(
+            RowKind seaTunnelRowInd) {
+        switch (seaTunnelRowInd) {
+            case DELETE:
+                return org.apache.paimon.types.RowKind.DELETE;
+            case UPDATE_AFTER:
+                return org.apache.paimon.types.RowKind.UPDATE_AFTER;
+            case UPDATE_BEFORE:
+                return org.apache.paimon.types.RowKind.UPDATE_BEFORE;
+            case INSERT:
+                return org.apache.paimon.types.RowKind.INSERT;
+            default:
+                throw new PaimonConnectorException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        "Unsupported rowKind type " + 
seaTunnelRowInd.shortString());
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
index 4dfd6b69fa..16863ebff5 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
@@ -43,6 +43,7 @@ import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
 import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.types.VarBinaryType;
@@ -70,13 +71,93 @@ public class RowTypeConverter {
         return new SeaTunnelRowType(fieldNames, dataTypes);
     }
 
+    /**
+     * Convert Paimon row type {@link DataType} to SeaTunnel row type {@link 
SeaTunnelDataType}
+     *
+     * @param dataType Paimon data type
+     * @return SeaTunnel data type {@link SeaTunnelDataType}
+     */
+    public static SeaTunnelDataType convert(DataType dataType) {
+        SeaTunnelDataType<?> seaTunnelDataType;
+        PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor =
+                PaimonToSeaTunnelTypeVisitor.INSTANCE;
+        switch (dataType.getTypeRoot()) {
+            case CHAR:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((CharType) dataType);
+                break;
+            case VARCHAR:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((VarCharType) dataType);
+                break;
+            case BOOLEAN:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((BooleanType) dataType);
+                break;
+            case BINARY:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((BinaryType) dataType);
+                break;
+            case VARBINARY:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((VarBinaryType) dataType);
+                break;
+            case DECIMAL:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((DecimalType) dataType);
+                break;
+            case TINYINT:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((TinyIntType) dataType);
+                break;
+            case SMALLINT:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((SmallIntType) dataType);
+                break;
+            case INTEGER:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((IntType) dataType);
+                break;
+            case BIGINT:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((BigIntType) dataType);
+                break;
+            case FLOAT:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((FloatType) dataType);
+                break;
+            case DOUBLE:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((DoubleType) dataType);
+                break;
+            case DATE:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((DateType) dataType);
+                break;
+            case TIME_WITHOUT_TIME_ZONE:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((TimeType) dataType);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((TimestampType) dataType);
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                seaTunnelDataType =
+                        
paimonToSeaTunnelTypeVisitor.visit((LocalZonedTimestampType) dataType);
+                break;
+            case ARRAY:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((ArrayType) dataType);
+                break;
+            case MAP:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((MapType) dataType);
+                break;
+            case ROW:
+                seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((RowType) dataType);
+                break;
+            default:
+                String errorMsg =
+                        String.format(
+                                "Paimon dataType not support this genericType 
[%s]",
+                                dataType.asSQLString());
+                throw new PaimonConnectorException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, 
errorMsg);
+        }
+        return seaTunnelDataType;
+    }
+
     /**
      * Convert SeaTunnel row type {@link SeaTunnelRowType} to Paimon row type 
{@link RowType}
      *
      * @param seaTunnelRowType SeaTunnel row type {@link SeaTunnelRowType}
      * @return Paimon row type {@link RowType}
      */
-    public static RowType convert(SeaTunnelRowType seaTunnelRowType) {
+    public static RowType reconvert(SeaTunnelRowType seaTunnelRowType) {
         SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
         DataType[] dataTypes =
                 Arrays.stream(fieldTypes)
@@ -96,7 +177,7 @@ public class RowTypeConverter {
      * @param dataType SeaTunnel data type {@link SeaTunnelDataType}
      * @return Paimon data type {@link DataType}
      */
-    public static DataType convert(SeaTunnelDataType<?> dataType) {
+    public static DataType reconvert(SeaTunnelDataType<?> dataType) {
         return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(dataType);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
new file mode 100644
index 0000000000..c03a77149c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
+
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataType;
+
+import java.util.Objects;
+
+/** The util seatunnel schema to paimon schema */
+public class SchemaUtil {
+
+    public static DataType toPaimonType(Column column) {
+        return PaimonTypeMapper.INSTANCE.reconvert(column);
+    }
+
+    public static Schema toPaimonSchema(TableSchema tableSchema) {
+        Schema.Builder paiSchemaBuilder = Schema.newBuilder();
+        for (int i = 0; i < tableSchema.getColumns().size(); i++) {
+            Column column = tableSchema.getColumns().get(i);
+            paiSchemaBuilder.column(column.getName(), toPaimonType(column));
+        }
+        PrimaryKey primaryKey = tableSchema.getPrimaryKey();
+        if (Objects.nonNull(primaryKey) && primaryKey.getColumnNames().size() 
> 0) {
+            paiSchemaBuilder.primaryKey(primaryKey.getColumnNames());
+        }
+        return paiSchemaBuilder.build();
+    }
+
+    public static SeaTunnelDataType<?> toSeaTunnelType(DataType dataType) {
+        return PaimonTypeMapper.INSTANCE.convert(dataType).getDataType();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
index f32b87f007..f828be0650 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
@@ -103,7 +103,7 @@ public class RowTypeConverterTest {
 
     @Test
     public void seaTunnelToPaimon() {
-        RowType convert = RowTypeConverter.convert(seaTunnelRowType);
+        RowType convert = RowTypeConverter.reconvert(seaTunnelRowType);
         Assertions.assertEquals(convert, rowType);
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
index 4af6e8436e..69ea9a9f74 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
@@ -30,16 +30,25 @@
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-fake</artifactId>
             <version>${project.version}</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-paimon</artifactId>
             <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
+            <classifier>optional</classifier>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-assert</artifactId>
             <version>${project.version}</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
new file mode 100644
index 0000000000..a960f7d4d3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason =
+                "Spark and Flink engine can not auto create paimon table on 
worker node in local file(e.g flink tm) by savemode feature which can lead 
error")
+@Slf4j
+public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource {
+    private static final String CATALOG_ROOT_DIR = "/tmp/";
+    private static final String NAMESPACE = "paimon";
+    private static final String NAMESPACE_TAR = "paimon.tar.gz";
+    private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE + 
"/";
+    private static final String TARGET_TABLE = "st_test";
+    private static final String TARGET_DATABASE = "seatunnel_namespace";
+    private static final String FAKE_TABLE1 = "FakeTable1";
+    private static final String FAKE_DATABASE1 = "FakeDatabase1";
+    private static final String FAKE_TABLE2 = "FakeTable1";
+    private static final String FAKE_DATABASE2 = "FakeDatabase2";
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {}
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {}
+
+    @TestTemplate
+    public void testFakeCDCSinkPaimon(TestContainer container) throws 
Exception {
+        Container.ExecResult execResult = 
container.executeJob("/fake_cdc_sink_paimon_case1.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(30L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy paimon to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            List<PaimonRecord> paimonRecords =
+                                    loadPaimonData(TARGET_DATABASE, 
TARGET_TABLE);
+                            Assertions.assertEquals(2, paimonRecords.size());
+                            paimonRecords.forEach(
+                                    paimonRecord -> {
+                                        if (paimonRecord.getPkId() == 1) {
+                                            Assertions.assertEquals("A_1", 
paimonRecord.getName());
+                                        }
+                                        if (paimonRecord.getPkId() == 3) {
+                                            Assertions.assertEquals("C", 
paimonRecord.getName());
+                                        }
+                                    });
+                        });
+
+        cleanPaimonTable(container);
+    }
+
+    @TestTemplate
+    public void testFakeMultipleTableSinkPaimon(TestContainer container) 
throws Exception {
+        Container.ExecResult execResult = 
container.executeJob("/fake_cdc_sink_paimon_case2.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(30L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy paimon to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            // Check FakeDatabase1.FakeTable1
+                            List<PaimonRecord> fake1PaimonRecords =
+                                    loadPaimonData(FAKE_DATABASE1, 
FAKE_TABLE1);
+                            Assertions.assertEquals(2, 
fake1PaimonRecords.size());
+                            fake1PaimonRecords.forEach(
+                                    paimonRecord -> {
+                                        if (paimonRecord.getPkId() == 1) {
+                                            Assertions.assertEquals("A_1", 
paimonRecord.getName());
+                                        }
+                                        if (paimonRecord.getPkId() == 3) {
+                                            Assertions.assertEquals("C", 
paimonRecord.getName());
+                                        }
+                                    });
+                            // Check FakeDatabase2.FakeTable1
+                            List<PaimonRecord> fake2PaimonRecords =
+                                    loadPaimonData(FAKE_DATABASE2, 
FAKE_TABLE2);
+                            Assertions.assertEquals(2, 
fake2PaimonRecords.size());
+                            fake2PaimonRecords.forEach(
+                                    paimonRecord -> {
+                                        if (paimonRecord.getPkId() == 100) {
+                                            Assertions.assertEquals(
+                                                    "A_100", 
paimonRecord.getName());
+                                        }
+                                        if (paimonRecord.getPkId() == 200) {
+                                            Assertions.assertEquals("C", 
paimonRecord.getName());
+                                        }
+                                    });
+                        });
+
+        cleanPaimonTable(container);
+    }
+
+    protected final ContainerExtendedFactory cleanContainerExtendedFactory =
+            genericContainer ->
+                    genericContainer.execInContainer("sh", "-c", "rm -rf  " + 
CATALOG_DIR + "**");
+
+    private void cleanPaimonTable(TestContainer container)
+            throws IOException, InterruptedException {
+        // clean table
+        container.executeExtraCommands(cleanContainerExtendedFactory);
+    }
+
+    protected final ContainerExtendedFactory containerExtendedFactory =
+            container -> {
+                FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
+                FileUtils.createNewDir(CATALOG_DIR);
+                container.execInContainer(
+                        "sh",
+                        "-c",
+                        "cd "
+                                + CATALOG_ROOT_DIR
+                                + " && tar -czvf "
+                                + NAMESPACE_TAR
+                                + " "
+                                + NAMESPACE);
+                container.copyFileFromContainer(
+                        CATALOG_ROOT_DIR + NAMESPACE_TAR, CATALOG_ROOT_DIR + 
NAMESPACE_TAR);
+                extractFiles();
+            };
+
+    private void extractFiles() {
+        ProcessBuilder processBuilder = new ProcessBuilder();
+        processBuilder.command(
+                "sh", "-c", "cd " + CATALOG_ROOT_DIR + " && tar -zxvf " + 
NAMESPACE_TAR);
+        try {
+            Process process = processBuilder.start();
+            // wait command completed
+            int exitCode = process.waitFor();
+            if (exitCode == 0) {
+                log.info("Extract files successful.");
+            } else {
+                log.error("Extract files failed with exit code " + exitCode);
+            }
+        } catch (IOException | InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private List<PaimonRecord> loadPaimonData(String dbName, String tbName) 
throws Exception {
+        Table table = getTable(dbName, tbName);
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        TableRead tableRead = readBuilder.newRead();
+        List<PaimonRecord> result = new ArrayList<>();
+        log.info(
+                "====================================Paimon 
data===========================================");
+        log.info(
+                
"==========================================================================================");
+        log.info(
+                
"==========================================================================================");
+        try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
+            reader.forEachRemaining(
+                    row -> {
+                        result.add(new PaimonRecord(row.getLong(0), 
row.getString(1).toString()));
+                        log.info("key_id:" + row.getLong(0) + ", name:" + 
row.getString(1));
+                    });
+        }
+        log.info(
+                
"==========================================================================================");
+        log.info(
+                
"==========================================================================================");
+        log.info(
+                
"==========================================================================================");
+        return result;
+    }
+
+    private Table getTable(String dbName, String tbName) {
+        try {
+            return getCatalog().getTable(getIdentifier(dbName, tbName));
+        } catch (Catalog.TableNotExistException e) {
+            // do something
+            throw new RuntimeException("table not exist");
+        }
+    }
+
+    private Identifier getIdentifier(String dbName, String tbName) {
+        return Identifier.create(dbName, tbName);
+    }
+
+    private Catalog getCatalog() {
+        Options options = new Options();
+        options.set("warehouse", "file://" + CATALOG_DIR);
+        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(options));
+        return catalog;
+    }
+
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public class PaimonRecord {
+        private Long pkId;
+        private String name;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
new file mode 100644
index 0000000000..59e3a0cf72
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "A_1", 100]
+      },
+      {
+        kind = DELETE
+        fields = [2, "B", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "seatunnel_namespace"
+    table = "st_test"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
new file mode 100644
index 0000000000..ddc9226871
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+            table = "FakeDatabase1.FakeTable1"
+            fields {
+                pk_id = bigint
+                name = string
+                score = int
+            }
+           primaryKey {
+               name = "pk_id"
+               columnNames = [pk_id]
+           }
+        }
+        rows = [
+          {
+            kind = INSERT
+            fields = [1, "A", 100]
+          },
+          {
+            kind = INSERT
+            fields = [2, "B", 100]
+          },
+          {
+            kind = INSERT
+            fields = [3, "C", 100]
+          },
+          {
+            kind = INSERT
+            fields = [3, "C", 100]
+          },
+          {
+            kind = INSERT
+            fields = [3, "C", 100]
+          },
+          {
+            kind = INSERT
+            fields = [3, "C", 100]
+          }
+          {
+            kind = UPDATE_BEFORE
+            fields = [1, "A", 100]
+          },
+          {
+            kind = UPDATE_AFTER
+            fields = [1, "A_1", 100]
+          },
+          {
+            kind = DELETE
+            fields = [2, "B", 100]
+          }
+        ]
+       },
+       {
+               schema = {
+                   table = "FakeDatabase2.FakeTable1"
+                   fields {
+                       pk_id = bigint
+                       name = string
+                   }
+                  primaryKey {
+                      name = "pk_id"
+                      columnNames = [pk_id]
+                  }
+               }
+               rows = [
+                 {
+                   kind = INSERT
+                   fields = [100, "A"]
+                 },
+                 {
+                   kind = INSERT
+                   fields = [200, "B"]
+                 },
+                 {
+                   kind = INSERT
+                   fields = [300, "C"]
+                 },
+                 {
+                   kind = INSERT
+                   fields = [300, "C"]
+                 },
+                 {
+                   kind = INSERT
+                   fields = [300, "C"]
+                 },
+                 {
+                   kind = INSERT
+                   fields = [300, "C"]
+                 }
+                 {
+                   kind = UPDATE_BEFORE
+                   fields = [100, "A"]
+                 },
+                 {
+                   kind = UPDATE_AFTER
+                   fields = [100, "A_100"]
+                 },
+                 {
+                   kind = DELETE
+                   fields = [200, "B"]
+                 }
+               ]
+       }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "${database_name}"
+    table = "${table_name}"
+  }
+}
diff --git a/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml 
b/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
index 00b55265c4..be5ced9214 100644
--- a/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
+++ b/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
@@ -47,6 +47,11 @@
             <artifactId>hadoop-client</artifactId>
             <version>${hadoop3.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>1.1.10.4</version>
+        </dependency>
     </dependencies>
 
     <build>

Reply via email to