This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e01108a0ef [Fix][paimon-e2e] Optimize Paimon E2E Cases (#9612)
e01108a0ef is described below

commit e01108a0ef129e7ebb49ed031d85966773713c9b
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Jul 24 19:45:36 2025 +0800

    [Fix][paimon-e2e] Optimize Paimon E2E Cases (#9612)
---
 .../e2e/connector/paimon/AbstractPaimonIT.java     |  85 +----------
 .../seatunnel/e2e/connector/paimon/PaimonIT.java   |  31 ++--
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      |  50 ++----
 .../paimon/PaimonSinkDynamicBucketIT.java          | 126 +++------------
 .../paimon/PaimonSinkWithSchemaEvolutionIT.java    |   4 -
 .../e2e/connector/paimon/PaimonStreamReadIT.java   |  18 ++-
 .../changelog_fake_cdc_sink_paimon_case1_ddl.conf  |   3 +-
 ...log_fake_cdc_sink_paimon_case1_insert_data.conf |   3 +-
 ...log_fake_cdc_sink_paimon_case1_update_data.conf |   3 +-
 .../changelog_fake_cdc_sink_paimon_case2.conf      |   3 +-
 .../test/resources/changelog_paimon_to_paimon.conf |   7 +-
 .../test/resources/fake_cdc_sink_paimon_case1.conf |   2 +-
 .../resources/fake_cdc_sink_paimon_case10.conf     |   2 +-
 ...ke_cdc_sink_paimon_case1_with_error_schema.conf |   2 +-
 .../test/resources/fake_cdc_sink_paimon_case2.conf | 128 ++++++++--------
 .../test/resources/fake_cdc_sink_paimon_case3.conf |   4 +-
 .../test/resources/fake_cdc_sink_paimon_case4.conf |   4 +-
 .../test/resources/fake_cdc_sink_paimon_case5.conf |   4 +-
 .../test/resources/fake_cdc_sink_paimon_case6.conf |   4 +-
 .../test/resources/fake_cdc_sink_paimon_case7.conf |  82 +++++-----
 .../test/resources/fake_cdc_sink_paimon_case8.conf |  38 ++---
 .../test/resources/fake_cdc_sink_paimon_case9.conf |   2 +-
 .../fake_cdc_sink_paimon_with_hdfs_ha.conf         |   8 +-
 ...dc_sink_paimon_with_hdfs_with_hive_catalog.conf |  12 +-
 .../fake_cdc_to_dynamic_bucket_paimon_case.conf    |  88 +++++------
 .../fake_sink_paimon_truncate_with_hdfs_case2.conf |   2 +-
 .../fake_sink_paimon_truncate_with_hive_case1.conf |   4 +-
 .../fake_sink_paimon_truncate_with_hive_case2.conf |   6 +-
 ...fake_sink_paimon_truncate_with_local_case1.conf |   2 +-
 ...fake_sink_paimon_truncate_with_local_case2.conf |   4 +-
 .../fake_to_dynamic_bucket_paimon_case1.conf       |  12 +-
 .../fake_to_dynamic_bucket_paimon_case2.conf       |   8 +-
 .../fake_to_dynamic_bucket_paimon_case3.conf       |   8 +-
 .../fake_to_dynamic_bucket_paimon_case4.conf       |  14 +-
 .../fake_to_dynamic_bucket_paimon_case5.conf       |  10 +-
 .../fake_to_dynamic_bucket_paimon_case6.conf       |   8 +-
 .../fake_to_dynamic_bucket_paimon_case7.conf       |   4 +-
 .../src/test/resources/fake_to_paimon.conf         |   9 +-
 .../resources/fake_to_paimon_with_full_type.conf   |   3 +-
 .../fake_to_paimon_with_full_type_cdc_data.conf    |   2 +-
 .../src/test/resources/fake_to_paimon_with_s3.conf |  10 +-
 .../fake_to_paimon_with_s3_with_checkpoint.conf    |  10 +-
 .../mysql_cdc_to_paimon_with_schema_change.conf    |   6 +-
 .../resources/paimon_projection_to_assert.conf     |  10 +-
 .../src/test/resources/paimon_to_assert.conf       |  10 +-
 .../resources/paimon_to_assert_with_filter1.conf   |   2 +-
 .../resources/paimon_to_assert_with_filter2.conf   |   2 +-
 .../resources/paimon_to_assert_with_filter3.conf   |  34 ++---
 .../resources/paimon_to_assert_with_filter4.conf   |  34 ++---
 .../resources/paimon_to_assert_with_filter5.conf   |  42 ++---
 .../resources/paimon_to_assert_with_filter6.conf   |  42 ++---
 .../resources/paimon_to_assert_with_filter7.conf   |  42 ++---
 .../resources/paimon_to_assert_with_filter8.conf   |   2 +-
 .../paimon_to_assert_with_hivecatalog.conf         |  12 +-
 .../paimon_to_assert_with_timestampN.conf          |   2 +-
 .../src/test/resources/paimon_to_paimon.conf       |   4 +-
 .../test/resources/paimon_with_s3_to_assert.conf   | 100 ++++++------
 .../read_from_paimon_with_hdfs_ha_to_assert.conf   |   8 +-
 .../src/test/resources/schema-0.json               | 170 ++++++++++++---------
 .../common/container/AbstractTestContainer.java    |  15 ++
 .../flink/AbstractTestFlinkContainer.java          |  21 ++-
 .../container/seatunnel/SeaTunnelContainer.java    |   9 ++
 .../spark/AbstractTestSparkContainer.java          |  10 ++
 63 files changed, 655 insertions(+), 751 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
index d5dd8608a5..ea2cf3bfc1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
@@ -17,12 +17,9 @@
 
 package org.apache.seatunnel.e2e.connector.paimon;
 
-import org.apache.seatunnel.common.utils.FileUtils;
-import org.apache.seatunnel.core.starter.utils.CompressionUtils;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
 
-import org.apache.commons.compress.archivers.ArchiveException;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -32,86 +29,18 @@ import org.apache.paimon.table.Table;
 
 import lombok.extern.slf4j.Slf4j;
 
-import java.io.File;
-import java.io.IOException;
-
 @Slf4j
 public abstract class AbstractPaimonIT extends TestSuiteBase {
 
-    protected static String CATALOG_ROOT_DIR = "/tmp/";
     protected static final String NAMESPACE = "paimon";
-    protected static final String NAMESPACE_TAR = "paimon.tar.gz";
-    protected static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE + 
"/";
     protected static final String TARGET_TABLE = "st_test";
     protected static final String FAKE_TABLE1 = "FakeTable1";
     protected static final String FAKE_DATABASE1 = "FakeDatabase1";
     protected static final String FAKE_TABLE2 = "FakeTable1";
     protected static final String FAKE_DATABASE2 = "FakeDatabase2";
-    private final String CATALOG_ROOT_DIR_WIN =
-            "C:/Users/" + System.getProperty("user.name") + "/tmp/";
-    private final String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + 
"/";
     protected boolean isWindows;
     protected boolean changeLogEnabled = false;
 
-    protected final ContainerExtendedFactory containerExtendedFactory =
-            container -> {
-                if (isWindows) {
-                    FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR);
-                    FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + "paimon.tar");
-                    FileUtils.createNewDir(CATALOG_ROOT_DIR_WIN);
-                } else {
-                    FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
-                    FileUtils.createNewDir(CATALOG_DIR);
-                }
-
-                container.execInContainer(
-                        "sh",
-                        "-c",
-                        "cd "
-                                + CATALOG_ROOT_DIR
-                                + " && tar -czvf "
-                                + NAMESPACE_TAR
-                                + " "
-                                + NAMESPACE);
-                container.copyFileFromContainer(
-                        CATALOG_ROOT_DIR + NAMESPACE_TAR,
-                        (isWindows ? CATALOG_ROOT_DIR_WIN : CATALOG_ROOT_DIR) 
+ NAMESPACE_TAR);
-                if (isWindows) {
-                    extractFilesWin();
-                } else {
-                    extractFiles();
-                }
-            };
-
-    private void extractFiles() {
-        ProcessBuilder processBuilder = new ProcessBuilder();
-        processBuilder.command(
-                "sh", "-c", "cd " + CATALOG_ROOT_DIR + " && tar -zxvf " + 
NAMESPACE_TAR);
-        try {
-            Process process = processBuilder.start();
-            // wait command completed
-            int exitCode = process.waitFor();
-            if (exitCode == 0) {
-                log.info("Extract files successful.");
-            } else {
-                log.error("Extract files failed with exit code " + exitCode);
-            }
-        } catch (IOException | InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
-    private void extractFilesWin() {
-        try {
-            CompressionUtils.unGzip(
-                    new File(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR), new 
File(CATALOG_ROOT_DIR_WIN));
-            CompressionUtils.unTar(
-                    new File(CATALOG_ROOT_DIR_WIN + "paimon.tar"), new 
File(CATALOG_ROOT_DIR_WIN));
-        } catch (IOException | ArchiveException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     protected Table getTable(String dbName, String tbName) {
         try {
             return getCatalog().getTable(getIdentifier(dbName, tbName));
@@ -127,11 +56,13 @@ public abstract class AbstractPaimonIT extends 
TestSuiteBase {
 
     private Catalog getCatalog() {
         Options options = new Options();
-        if (isWindows) {
-            options.set("warehouse", CATALOG_DIR_WIN);
-        } else {
-            options.set("warehouse", "file://" + CATALOG_DIR);
-        }
+        String warehouse =
+                String.format(
+                        "%s%s/%s",
+                        isWindows ? "" : "file://",
+                        AbstractTestContainer.HOST_VOLUME_MOUNT_PATH,
+                        NAMESPACE);
+        options.set("warehouse", warehouse);
         return CatalogFactory.createCatalog(CatalogContext.create(options));
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
index 952ff106a1..7ea21f3020 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.e2e.connector.paimon;
 
+import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -25,12 +26,7 @@ import 
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
-
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
@@ -42,7 +38,7 @@ import java.nio.file.Path;
 @DisabledOnContainer(
         value = TestContainerId.FLINK_1_13,
         disabledReason = "Paimon does not support flink 1.13")
-public class PaimonIT extends TestSuiteBase {
+public class PaimonIT extends TestSuiteBase implements TestResource {
 
     @TestContainerExtension
     private final ContainerExtendedFactory extendedFactory =
@@ -50,8 +46,8 @@ public class PaimonIT extends TestSuiteBase {
                 Path schemaPath = 
ContainerUtil.getResourcesFile("/schema-0.json").toPath();
                 container.copyFileToContainer(
                         MountableFile.forHostPath(schemaPath),
-                        
"/opt/seatunnel_mounts/paimon/default.db/st_test/schema/schema-0");
-                container.execInContainer("chmod", "777", "-R", 
"/opt/seatunnel_mounts/paimon");
+                        
"/tmp/seatunnel_mnt/paimon/default.db/st_test/schema/schema-0");
+                container.execInContainer("chmod", "777", "-R", 
"/tmp/seatunnel_mnt/");
             };
 
     @TestTemplate
@@ -64,17 +60,12 @@ public class PaimonIT extends TestSuiteBase {
         Container.ExecResult readProjectionResult =
                 container.executeJob("/paimon_projection_to_assert.conf");
         Assertions.assertEquals(0, readProjectionResult.getExitCode());
-        deleteTable();
     }
 
-    private void deleteTable() {
-        Options options = new Options();
-        options.set("warehouse", "file://" + "/opt/seatunnel_mounts/paimon");
-        try {
-            CatalogFactory.createCatalog(CatalogContext.create(options))
-                    .dropTable(Identifier.create("default", "st_test"), true);
-        } catch (Catalog.TableNotExistException e) {
-            throw new RuntimeException(e);
-        }
-    }
+    @Override
+    public void startUp() throws Exception {}
+
+    @Override
+    @AfterEach
+    public void tearDown() throws Exception {}
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index d4f5bb5f9d..3def9a3497 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -38,9 +38,9 @@ import org.apache.paimon.types.DateType;
 import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.utils.DateTimeUtils;
 
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 
@@ -64,14 +64,14 @@ import static org.awaitility.Awaitility.given;
 @Slf4j
 public class PaimonSinkCDCIT extends AbstractPaimonIT implements TestResource {
 
-    @BeforeAll
+    @BeforeEach
     @Override
     public void startUp() throws Exception {
         this.isWindows =
                 
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
     }
 
-    @AfterAll
+    @AfterEach
     @Override
     public void tearDown() throws Exception {}
 
@@ -91,8 +91,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             List<PaimonRecord> paimonRecords =
                                     loadPaimonData("seatunnel_namespace9", 
TARGET_TABLE);
                             Assertions.assertEquals(3, paimonRecords.size());
@@ -120,8 +118,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             List<PaimonRecord> paimonRecords =
                                     loadPaimonData("seatunnel_namespace1", 
TARGET_TABLE);
                             Assertions.assertEquals(2, paimonRecords.size());
@@ -162,8 +158,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             // Check FakeDatabase1.FakeTable1
                             List<PaimonRecord> fake1PaimonRecords =
                                     loadPaimonData(FAKE_DATABASE1, 
FAKE_TABLE1);
@@ -205,8 +199,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             Table table = getTable("seatunnel_namespace3", 
TARGET_TABLE);
                             String bucket = 
table.options().get(CoreOptions.BUCKET.key());
                             
Assertions.assertTrue(StringUtils.isNoneBlank(bucket));
@@ -237,8 +229,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             Table table = getTable("seatunnel_namespace4", 
TARGET_TABLE);
                             List<String> partitionKeys = table.partitionKeys();
                             List<String> primaryKeys = table.primaryKeys();
@@ -290,8 +280,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             Table table = getTable("seatunnel_namespace5", 
TARGET_TABLE);
                             String fileFormat = 
table.options().get(CoreOptions.FILE_FORMAT.key());
                             
Assertions.assertTrue(StringUtils.isNoneBlank(fileFormat));
@@ -322,8 +310,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             Table table = getTable("seatunnel_namespace6", 
TARGET_TABLE);
                             String fileFormat = 
table.options().get(CoreOptions.FILE_FORMAT.key());
                             
Assertions.assertTrue(StringUtils.isNoneBlank(fileFormat));
@@ -355,8 +341,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             FileStoreTable table =
                                     (FileStoreTable) 
getTable("seatunnel_namespace7", TARGET_TABLE);
                             List<DataField> fields = table.schema().fields();
@@ -397,15 +381,15 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                             Assertions.assertEquals(2, result.size());
                             for (PaimonRecord paimonRecord : result) {
                                 Assertions.assertEquals(
-                                        paimonRecord.oneTime.toString(), 
"2024-03-10T10:00:12");
+                                        "2024-03-10T10:00:12", 
paimonRecord.oneTime.toString());
                                 Assertions.assertEquals(
-                                        paimonRecord.twoTime.toString(), 
"2024-03-10T10:00:00.123");
+                                        "2024-03-10T10:00:00.123", 
paimonRecord.twoTime.toString());
                                 Assertions.assertEquals(
-                                        paimonRecord.threeTime.toString(),
-                                        "2024-03-10T10:00:00.123456");
+                                        "2024-03-10T10:00:00.123456",
+                                        paimonRecord.threeTime.toString());
                                 Assertions.assertEquals(
-                                        paimonRecord.fourTime.toString(),
-                                        "2024-03-10T10:00:00.123456789");
+                                        "2024-03-10T10:00:00.123456789",
+                                        paimonRecord.fourTime.toString());
                             }
                         });
 
@@ -425,8 +409,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             FileStoreTable table =
                                     (FileStoreTable) 
getTable("seatunnel_namespace8", TARGET_TABLE);
                             List<DataField> fields = table.schema().fields();
@@ -511,8 +493,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             List<PaimonRecord> paimonRecords =
                                     loadPaimonData("seatunnel_namespace10", 
TARGET_TABLE);
                             Assertions.assertEquals(2, paimonRecords.size());
@@ -540,7 +520,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
         Container.ExecResult writeResult =
                 
container.executeJob("/changelog_fake_cdc_sink_paimon_case1_ddl.conf");
         Assertions.assertEquals(0, writeResult.getExitCode());
-        TimeUnit.SECONDS.sleep(120);
         String[] jobIds =
                 new String[] {
                     String.valueOf(JobIdGenerator.newJobId()),
@@ -585,16 +564,13 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
                                 throw new SeaTunnelException(e);
                             }
                         }));
-        // stream job running 30 seconds
-        TimeUnit.SECONDS.sleep(120);
+        // stream job running 60 seconds
+        TimeUnit.SECONDS.sleep(60);
         // cancel stream job
         container.cancelJob(jobIds[1]);
         container.cancelJob(jobIds[2]);
         container.cancelJob(jobIds[0]);
         changeLogEnabled = true;
-        TimeUnit.SECONDS.sleep(10);
-        // copy paimon to local
-        container.executeExtraCommands(containerExtendedFactory);
         List<PaimonRecord> paimonRecords1 = 
loadPaimonData("seatunnel_namespace", "st_test_sink");
         List<String> actual1 =
                 paimonRecords1.stream()
@@ -646,8 +622,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
         // cancel stream job
         container.cancelJob(String.valueOf(jobId));
         TimeUnit.SECONDS.sleep(5);
-        // copy paimon to local
-        container.executeExtraCommands(containerExtendedFactory);
         List<PaimonRecord> paimonRecords = 
loadPaimonData("seatunnel_namespace", "st_test_full");
         List<String> actual =
                 paimonRecords.stream()
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
index b44da835ee..4b74575da8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
@@ -18,19 +18,15 @@
 package org.apache.seatunnel.e2e.connector.paimon;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogLoader;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
-import org.apache.seatunnel.core.starter.utils.CompressionUtils;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
-import org.apache.commons.compress.archivers.ArchiveException;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -50,16 +46,15 @@ import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.TimestampType;
 
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 
 import lombok.extern.slf4j.Slf4j;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -70,6 +65,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.e2e.common.container.AbstractTestContainer.HOST_VOLUME_MOUNT_PATH;
 import static org.awaitility.Awaitility.given;
 
 @DisabledOnContainer(
@@ -80,23 +76,16 @@ import static org.awaitility.Awaitility.given;
 @Slf4j
 public class PaimonSinkDynamicBucketIT extends TestSuiteBase implements 
TestResource {
 
-    private static String CATALOG_ROOT_DIR = "/tmp/";
-    private static final String NAMESPACE = "paimon";
-    private static final String NAMESPACE_TAR = "paimon.tar.gz";
-    private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE + 
"/";
-    private String CATALOG_ROOT_DIR_WIN = "C:/Users/";
-    private String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
     private boolean isWindows;
+    private static final String NAMESPACE = "paimon";
 
     private Map<String, Object> PAIMON_SINK_PROPERTIES;
 
-    @BeforeAll
+    @BeforeEach
     @Override
     public void startUp() throws Exception {
         this.isWindows =
                 
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
-        CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN + 
System.getProperty("user.name") + "/tmp/";
-        CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
         Map<String, Object> map = new HashMap<>();
         map.put("warehouse", "hdfs:///tmp/paimon");
         map.put("database", "default");
@@ -115,7 +104,7 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
         this.PAIMON_SINK_PROPERTIES = map;
     }
 
-    @AfterAll
+    @AfterEach
     @Override
     public void tearDown() throws Exception {}
 
@@ -130,18 +119,6 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
         Container.ExecResult readProjectionResult =
                 container.executeJob("/paimon_projection_to_assert.conf");
         Assertions.assertEquals(0, readProjectionResult.getExitCode());
-        deleteTable();
-    }
-
-    private void deleteTable() {
-        Options options = new Options();
-        options.set("warehouse", "file://" + "/opt/seatunnel_mounts/paimon");
-        try {
-            CatalogFactory.createCatalog(CatalogContext.create(options))
-                    .dropTable(Identifier.create("default", "st_test"), true);
-        } catch (Catalog.TableNotExistException e) {
-            throw new RuntimeException(e);
-        }
     }
 
     @TestTemplate
@@ -155,8 +132,6 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             FileStoreTable table =
                                     (FileStoreTable) getTable("default", 
"st_test_2");
                             IndexBootstrap indexBootstrap = new 
IndexBootstrap(table);
@@ -225,8 +200,6 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             FileStoreTable table =
                                     (FileStoreTable) getTable("default", 
"st_test_3");
                             IndexBootstrap indexBootstrap = new 
IndexBootstrap(table);
@@ -262,8 +235,6 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
                 .atMost(120L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             FileStoreTable table =
                                     (FileStoreTable) getTable("default", 
"st_test_4");
                             IndexBootstrap indexBootstrap = new 
IndexBootstrap(table);
@@ -303,10 +274,8 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
                 .atMost(30L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             FileStoreTable table =
-                                    (FileStoreTable) getTable("default", 
"st_test_3");
+                                    (FileStoreTable) getTable("default", 
"st_test_cdc_write");
                             List<DataField> fields = table.schema().fields();
                             for (DataField field : fields) {
                                 if (field.name().equalsIgnoreCase("one_time")) 
{
@@ -345,15 +314,15 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
                             Assertions.assertEquals(2, result.size());
                             for (PaimonRecord paimonRecord : result) {
                                 Assertions.assertEquals(
-                                        paimonRecord.oneTime.toString(), 
"2024-03-10T10:00:12");
+                                        "2024-03-10T10:00:12", 
paimonRecord.oneTime.toString());
                                 Assertions.assertEquals(
-                                        paimonRecord.twoTime.toString(), 
"2024-03-10T10:00:00.123");
+                                        "2024-03-10T10:00:00.123", 
paimonRecord.twoTime.toString());
                                 Assertions.assertEquals(
-                                        paimonRecord.threeTime.toString(),
-                                        "2024-03-10T10:00:00.123456");
+                                        "2024-03-10T10:00:00.123456",
+                                        paimonRecord.threeTime.toString());
                                 Assertions.assertEquals(
-                                        paimonRecord.fourTime.toString(),
-                                        "2024-03-10T10:00:00.123456789");
+                                        "2024-03-10T10:00:00.123456789",
+                                        paimonRecord.fourTime.toString());
                             }
                         });
     }
@@ -371,7 +340,6 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
                 .atMost(60L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            
container.executeExtraCommands(containerExtendedFactory);
                             FileStoreTable table =
                                     (FileStoreTable) getTable("full_type", 
"st_test");
                             List<String> primaryKeys = 
table.schema().primaryKeys();
@@ -386,72 +354,12 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
         Assertions.assertEquals(0, writeResult1.getExitCode());
     }
 
-    protected final ContainerExtendedFactory containerExtendedFactory =
-            container -> {
-                if (isWindows) {
-                    FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR);
-                    FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + "paimon.tar");
-                    FileUtils.createNewDir(CATALOG_ROOT_DIR_WIN);
-                } else {
-                    FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
-                    FileUtils.createNewDir(CATALOG_DIR);
-                }
-
-                container.execInContainer(
-                        "sh",
-                        "-c",
-                        "cd "
-                                + CATALOG_ROOT_DIR
-                                + " && tar -czvf "
-                                + NAMESPACE_TAR
-                                + " "
-                                + NAMESPACE);
-                container.copyFileFromContainer(
-                        CATALOG_ROOT_DIR + NAMESPACE_TAR,
-                        (isWindows ? CATALOG_ROOT_DIR_WIN : CATALOG_ROOT_DIR) 
+ NAMESPACE_TAR);
-                if (isWindows) {
-                    extractFilesWin();
-                } else {
-                    extractFiles();
-                }
-            };
-
-    private void extractFiles() {
-        ProcessBuilder processBuilder = new ProcessBuilder();
-        processBuilder.command(
-                "sh", "-c", "cd " + CATALOG_ROOT_DIR + " && tar -zxvf " + 
NAMESPACE_TAR);
-        try {
-            Process process = processBuilder.start();
-            // wait command completed
-            int exitCode = process.waitFor();
-            if (exitCode == 0) {
-                log.info("Extract files successful.");
-            } else {
-                log.error("Extract files failed with exit code " + exitCode);
-            }
-        } catch (IOException | InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
-    private void extractFilesWin() {
-        try {
-            CompressionUtils.unGzip(
-                    new File(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR), new 
File(CATALOG_ROOT_DIR_WIN));
-            CompressionUtils.unTar(
-                    new File(CATALOG_ROOT_DIR_WIN + "paimon.tar"), new 
File(CATALOG_ROOT_DIR_WIN));
-        } catch (IOException | ArchiveException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     protected Table getTable(String dbName, String tbName) {
         Options options = new Options();
-        if (isWindows) {
-            options.set("warehouse", CATALOG_DIR_WIN);
-        } else {
-            options.set("warehouse", "file://" + CATALOG_DIR);
-        }
+        String warehouse =
+                String.format(
+                        "%s%s/%s", isWindows ? "" : "file://", 
HOST_VOLUME_MOUNT_PATH, NAMESPACE);
+        options.set("warehouse", warehouse);
         try {
             Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(options));
             return catalog.getTable(Identifier.create(dbName, tbName));
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
index 5c85b29863..3a81eb5196 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
@@ -170,8 +170,6 @@ public class PaimonSinkWithSchemaEvolutionIT extends 
AbstractPaimonIT implements
         await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             Assertions.assertIterableEquals(
                                     queryMysql(String.format(QUERY, 
MYSQL_DATABASE, SOURCE_TABLE)),
                                     queryPaimon(null, 0, Integer.MAX_VALUE));
@@ -335,8 +333,6 @@ public class PaimonSinkWithSchemaEvolutionIT extends 
AbstractPaimonIT implements
         await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            // copy paimon to local
-                            
container.executeExtraCommands(containerExtendedFactory);
                             // 1. Vertify the schema
                             vertifySchema();
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
index 1217ab4d4a..985029a792 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.e2e.connector.paimon;
 
 import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -31,7 +32,9 @@ import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 
@@ -52,7 +55,7 @@ import static org.awaitility.Awaitility.given;
         disabledReason =
                 "Spark and Flink engine can not auto create paimon table on 
worker node in local file(e.g flink tm) by savemode feature which can lead 
error")
 @Slf4j
-public class PaimonStreamReadIT extends PaimonSinkCDCIT {
+public class PaimonStreamReadIT extends AbstractPaimonIT implements 
TestResource {
 
     @TestTemplate
     public void testStreamReadPaimon(TestContainer container) throws Exception 
{
@@ -75,7 +78,6 @@ public class PaimonStreamReadIT extends PaimonSinkCDCIT {
                 .atMost(400L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            
container.executeExtraCommands(containerExtendedFactory);
                             List<PaimonRecordWithFullType> paimonSourceRecords 
=
                                     loadPaimonDataWithFullType("full_type", 
"st_test");
                             List<PaimonRecordWithFullType> paimonSinkRecords =
@@ -96,7 +98,6 @@ public class PaimonStreamReadIT extends PaimonSinkCDCIT {
                 .atMost(400L, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
-                            
container.executeExtraCommands(containerExtendedFactory);
                             List<PaimonRecordWithFullType> paimonSourceRecords 
=
                                     loadPaimonDataWithFullType("full_type", 
"st_test");
                             List<PaimonRecordWithFullType> paimonSinkRecords =
@@ -150,4 +151,15 @@ public class PaimonStreamReadIT extends PaimonSinkCDCIT {
         }
         return result;
     }
+
+    @Override
+    @BeforeEach
+    public void startUp() throws Exception {
+        this.isWindows =
+                
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
+    }
+
+    @Override
+    @AfterEach
+    public void tearDown() throws Exception {}
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
index 6a32727505..4d4b30d7ac 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
@@ -42,12 +42,13 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace"
     table = "st_test_lookup"
     paimon.table.write-props = {
       changelog-producer = lookup
       changelog-tmp-path = "/tmp/paimon/changelog"
+      file-format = parquet
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
index 9b7310177c..3ff070d2d5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
@@ -56,12 +56,13 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace"
     table = "st_test_lookup"
     paimon.table.write-props = {
       changelog-producer = lookup
       changelog-tmp-path = "/tmp/paimon/changelog"
+      file-format = parquet
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
index 271ad20bff..209b2a7e41 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
@@ -60,12 +60,13 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace"
     table = "st_test_lookup"
     paimon.table.write-props = {
       changelog-producer = lookup
       changelog-tmp-path = "/tmp/paimon/changelog"
+      file-format = parquet
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
index f7135e645f..e5660abc55 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
@@ -72,12 +72,13 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace"
     table = "st_test_full"
     paimon.table.write-props = {
       changelog-producer = full-compaction
       changelog-tmp-path = "/tmp/paimon/changelog"
+      file-format = parquet
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
index 7f0798039a..d63810fa19 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
@@ -18,12 +18,12 @@
 env {
   parallelism = 1
   job.mode = "Streaming"
-  checkpoint.interval = 2000
+  checkpoint.interval = 5000
 }
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace"
     table = "st_test_lookup"
   }
@@ -38,12 +38,13 @@ transform {
 
 sink {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace"
     table = "st_test_sink"
     paimon.table.non-primary-key = true
     paimon.table.write-props = {
       write-only = true
+      file-format = parquet
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
index 50ce13aa68..d2a6929cac 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
@@ -79,7 +79,7 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace1"
     table = "st_test"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case10.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case10.conf
index 3c2061c55b..8f1c539ad6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case10.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case10.conf
@@ -51,7 +51,7 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace9"
     table = "st_test"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
index 70bcedff29..3dd8ea1adb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
@@ -55,7 +55,7 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace1"
     table = "st_test"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
index ddc9226871..fcb2044ba8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
@@ -26,18 +26,18 @@ env {
 source {
   FakeSource {
     tables_configs = [
-       {
+      {
         schema = {
-            table = "FakeDatabase1.FakeTable1"
-            fields {
-                pk_id = bigint
-                name = string
-                score = int
-            }
-           primaryKey {
-               name = "pk_id"
-               columnNames = [pk_id]
-           }
+          table = "FakeDatabase1.FakeTable1"
+          fields {
+            pk_id = bigint
+            name = string
+            score = int
+          }
+          primaryKey {
+            name = "pk_id"
+            columnNames = [pk_id]
+          }
         }
         rows = [
           {
@@ -77,65 +77,65 @@ source {
             fields = [2, "B", 100]
           }
         ]
-       },
-       {
-               schema = {
-                   table = "FakeDatabase2.FakeTable1"
-                   fields {
-                       pk_id = bigint
-                       name = string
-                   }
-                  primaryKey {
-                      name = "pk_id"
-                      columnNames = [pk_id]
-                  }
-               }
-               rows = [
-                 {
-                   kind = INSERT
-                   fields = [100, "A"]
-                 },
-                 {
-                   kind = INSERT
-                   fields = [200, "B"]
-                 },
-                 {
-                   kind = INSERT
-                   fields = [300, "C"]
-                 },
-                 {
-                   kind = INSERT
-                   fields = [300, "C"]
-                 },
-                 {
-                   kind = INSERT
-                   fields = [300, "C"]
-                 },
-                 {
-                   kind = INSERT
-                   fields = [300, "C"]
-                 }
-                 {
-                   kind = UPDATE_BEFORE
-                   fields = [100, "A"]
-                 },
-                 {
-                   kind = UPDATE_AFTER
-                   fields = [100, "A_100"]
-                 },
-                 {
-                   kind = DELETE
-                   fields = [200, "B"]
-                 }
-               ]
-       }
+      },
+      {
+        schema = {
+          table = "FakeDatabase2.FakeTable1"
+          fields {
+            pk_id = bigint
+            name = string
+          }
+          primaryKey {
+            name = "pk_id"
+            columnNames = [pk_id]
+          }
+        }
+        rows = [
+          {
+            kind = INSERT
+            fields = [100, "A"]
+          },
+          {
+            kind = INSERT
+            fields = [200, "B"]
+          },
+          {
+            kind = INSERT
+            fields = [300, "C"]
+          },
+          {
+            kind = INSERT
+            fields = [300, "C"]
+          },
+          {
+            kind = INSERT
+            fields = [300, "C"]
+          },
+          {
+            kind = INSERT
+            fields = [300, "C"]
+          }
+          {
+            kind = UPDATE_BEFORE
+            fields = [100, "A"]
+          },
+          {
+            kind = UPDATE_AFTER
+            fields = [100, "A_100"]
+          },
+          {
+            kind = DELETE
+            fields = [200, "B"]
+          }
+        ]
+      }
     ]
   }
 }
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "${database_name}"
     table = "${table_name}"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
index f5db1c8253..2c7ac5c06e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
@@ -83,11 +83,11 @@ transform {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace3"
     table = "st_test"
     paimon.table.write-props = {
-        bucket = 2
+      bucket = 2
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
index 9a287a61b1..42feabe071 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
@@ -79,11 +79,11 @@ transform {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace4"
     table = "st_test"
     paimon.table.write-props = {
-        bucket = 2
+      bucket = 2
     }
     paimon.table.partition-keys = "dt"
     paimon.table.primary-keys = "pk_id,dt"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
index 65df2115f4..ce9bac3130 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
@@ -83,11 +83,11 @@ transform {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace5"
     table = "st_test"
     paimon.table.write-props = {
-        file.format = "parquet"
+      file.format = "parquet"
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
index 102747ef0f..9b3ce7c16a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
@@ -83,11 +83,11 @@ transform {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace6"
     table = "st_test"
     paimon.table.write-props = {
-        file.format = "avro"
+      file.format = "avro"
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
index 6578c72358..037f6cb30e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
@@ -27,46 +27,46 @@ source {
   FakeSource {
     schema = {
       columns = [
-         {
-            name = pk_id
-            type = bigint
-            nullable = false
-            comment = "primary key id"
-         },
-         {
-            name = name
-            type = "string"
-            nullable = true
-            comment = "name"
-         },
-         {
-            name = one_time
-            type = timestamp
-            nullable = false
-            comment = "one time"
-            columnScale = 0
-         },
-          {
-             name = two_time
-             type = timestamp
-             nullable = false
-             comment = "two time"
-             columnScale = 3
-          },
-         {
-            name = three_time
-            type = timestamp
-            nullable = false
-            comment = "three time"
-            columnScale = 6
-         },
-          {
-             name = four_time
-             type = timestamp
-             nullable = false
-             comment = "four time"
-             columnScale = 9
-          }
+        {
+          name = pk_id
+          type = bigint
+          nullable = false
+          comment = "primary key id"
+        },
+        {
+          name = name
+          type = "string"
+          nullable = true
+          comment = "name"
+        },
+        {
+          name = one_time
+          type = timestamp
+          nullable = false
+          comment = "one time"
+          columnScale = 0
+        },
+        {
+          name = two_time
+          type = timestamp
+          nullable = false
+          comment = "two time"
+          columnScale = 3
+        },
+        {
+          name = three_time
+          type = timestamp
+          nullable = false
+          comment = "three time"
+          columnScale = 6
+        },
+        {
+          name = four_time
+          type = timestamp
+          nullable = false
+          comment = "four time"
+          columnScale = 9
+        }
       ]
       primaryKey {
         name = "pk_id"
@@ -120,7 +120,7 @@ transform {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace7"
     table = "st_test"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf
index 2fc4910e98..414d048bf2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf
@@ -27,24 +27,24 @@ source {
   FakeSource {
     schema = {
       columns = [
-         {
-            name = pk_id
-            type = bigint
-            nullable = false
-            comment = "primary key id"
-         },
-         {
-            name = name
-            type = "string"
-            nullable = true
-            comment = "name"
-         },
-         {
-            name = one_date
-            type = date
-            nullable = false
-            comment = "one date"
-         }
+        {
+          name = pk_id
+          type = bigint
+          nullable = false
+          comment = "primary key id"
+        },
+        {
+          name = name
+          type = "string"
+          nullable = true
+          comment = "name"
+        },
+        {
+          name = one_date
+          type = date
+          nullable = false
+          comment = "one date"
+        }
       ]
       primaryKey {
         name = "pk_id"
@@ -86,7 +86,7 @@ transform {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace8"
     table = "st_test"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
index 674491f90d..090948f656 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
@@ -67,7 +67,7 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace9"
     table = "st_test"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf
index 7e5fd6da94..ec8c4ce529 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf
@@ -80,10 +80,10 @@ source {
 sink {
   Paimon {
     schema_save_mode = "RECREATE_SCHEMA"
-    catalog_name="seatunnel_test"
-    warehouse="hdfs:///tmp/paimon"
-    database="seatunnel_namespace1"
-    table="st_test"
+    catalog_name = "seatunnel_test"
+    warehouse = "hdfs:///tmp/paimon"
+    database = "seatunnel_namespace1"
+    table = "st_test"
     paimon.hadoop.conf = {
       fs.defaultFS = "hdfs://nameservice1"
       dfs.nameservices = "nameservice1"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf
index 3afdc59701..eb517229a5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf
@@ -80,12 +80,12 @@ source {
 sink {
   Paimon {
     schema_save_mode = "RECREATE_SCHEMA"
-    catalog_name="seatunnel_test"
-    catalog_type="hive"
-    catalog_uri="thrift://hadoop04:9083"
-    warehouse="hdfs:///tmp/seatunnel"
-    database="seatunnel_test"
-    table="st_test3"
+    catalog_name = "seatunnel_test"
+    catalog_type = "hive"
+    catalog_uri = "thrift://hadoop04:9083"
+    warehouse = "hdfs:///tmp/seatunnel"
+    database = "seatunnel_test"
+    table = "st_test3"
     paimon.hadoop.conf = {
       fs.defaultFS = "hdfs://nameservice1"
       dfs.nameservices = "nameservice1"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
index f9993fe33f..ac40022241 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
@@ -27,46 +27,46 @@ source {
   FakeSource {
     schema = {
       columns = [
-         {
-            name = pk_id
-            type = bigint
-            nullable = false
-            comment = "primary key id"
-         },
-         {
-            name = name
-            type = "string"
-            nullable = true
-            comment = "name"
-         },
-         {
-            name = one_time
-            type = timestamp
-            nullable = false
-            comment = "one time"
-            columnScale = 0
-         },
-          {
-             name = two_time
-             type = timestamp
-             nullable = false
-             comment = "two time"
-             columnScale = 3
-          },
-         {
-            name = three_time
-            type = timestamp
-            nullable = false
-            comment = "three time"
-            columnScale = 6
-         },
-          {
-             name = four_time
-             type = timestamp
-             nullable = false
-             comment = "four time"
-             columnScale = 9
-          }
+        {
+          name = pk_id
+          type = bigint
+          nullable = false
+          comment = "primary key id"
+        },
+        {
+          name = name
+          type = "string"
+          nullable = true
+          comment = "name"
+        },
+        {
+          name = one_time
+          type = timestamp
+          nullable = false
+          comment = "one time"
+          columnScale = 0
+        },
+        {
+          name = two_time
+          type = timestamp
+          nullable = false
+          comment = "two time"
+          columnScale = 3
+        },
+        {
+          name = three_time
+          type = timestamp
+          nullable = false
+          comment = "three time"
+          columnScale = 6
+        },
+        {
+          name = four_time
+          type = timestamp
+          nullable = false
+          comment = "four time"
+          columnScale = 9
+        }
       ]
       primaryKey {
         name = "pk_id"
@@ -120,12 +120,12 @@ transform {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "default"
-    table = "st_test_3"
+    table = "st_test_cdc_write"
     paimon.table.write-props = {
-       bucket = -1
-       dynamic-bucket.target-row-num = 50000
+      bucket = -1
+      dynamic-bucket.target-row-num = 50000
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
index 7f9c453d35..55fec624d9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
@@ -51,7 +51,7 @@ sink {
     warehouse = "hdfs:///tmp/paimon"
     database = "seatunnel_namespace11"
     table = "st_test"
-    data_save_mode=DROP_DATA
+    data_save_mode = DROP_DATA
     paimon.hadoop.conf = {
       hadoop_user_name = "hdfs"
       fs.defaultFS = "hdfs://nameservice1"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
index 26e95870e3..d99d927c7e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
@@ -65,8 +65,8 @@ source {
 sink {
   Paimon {
     warehouse = "hdfs:///tmp/paimon"
-    catalog_type="hive"
-    catalog_uri="thrift://hadoop04:9083"
+    catalog_type = "hive"
+    catalog_uri = "thrift://hadoop04:9083"
     database = "seatunnel_namespace12"
     table = "st_test"
     paimon.hadoop.conf = {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
index ef1e79b86e..37f8afc326 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
@@ -49,11 +49,11 @@ source {
 sink {
   Paimon {
     warehouse = "hdfs:///tmp/paimon"
-    catalog_type="hive"
-    catalog_uri="thrift://hadoop04:9083"
+    catalog_type = "hive"
+    catalog_uri = "thrift://hadoop04:9083"
     database = "seatunnel_namespace12"
     table = "st_test"
-    data_save_mode=DROP_DATA
+    data_save_mode = DROP_DATA
     paimon.hadoop.conf = {
       fs.defaultFS = "hdfs://nameservice1"
       dfs.nameservices = "nameservice1"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
index e22474a06d..2e8356879a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
@@ -64,7 +64,7 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace10"
     table = "st_test"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
index 64cb24bc8e..8b322f0a02 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
@@ -48,9 +48,9 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace10"
     table = "st_test"
-    data_save_mode=DROP_DATA
+    data_save_mode = DROP_DATA
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
index de4e21e3ed..92120c4bc9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
@@ -23,6 +23,8 @@ env {
 source {
   FakeSource {
     row.num = 100000
+    auto.increment.enabled = true
+    auto.increment.start = 1
     schema = {
       fields {
         pk_id = int
@@ -42,6 +44,10 @@ source {
         c_timestamp = timestamp
         c_time = time
       }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
     }
     plugin_output = "fake"
   }
@@ -49,12 +55,12 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///opt/seatunnel_mounts/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "default"
     table = "st_test"
     paimon.table.write-props = {
-       bucket = -1
-       dynamic-bucket.target-row-num = 50000
+      bucket = -1
+      dynamic-bucket.target-row-num = 50000
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
index 338e624d04..993543effd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
@@ -25,6 +25,8 @@ env {
 
 source {
   FakeSource {
+    auto.increment.enabled = true
+    auto.increment.start = 1
     row.num = 100000
     schema = {
       fields {
@@ -42,12 +44,12 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "default"
     table = "st_test_2"
     paimon.table.write-props = {
-       bucket = -1
-       dynamic-bucket.target-row-num = 50000
+      bucket = -1
+      dynamic-bucket.target-row-num = 50000
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
index 21cbc0ef83..48ed12b65f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
@@ -25,6 +25,8 @@ env {
 
 source {
   FakeSource {
+    auto.increment.enabled = true
+    auto.increment.start = 1
     row.num = 100000
     schema = {
       fields {
@@ -42,12 +44,12 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "default"
     table = "st_test_3"
     paimon.table.write-props = {
-       bucket = -1
-       dynamic-bucket.target-row-num = 50000
+      bucket = -1
+      dynamic-bucket.target-row-num = 50000
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
index 9a71171330..0270b7de36 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
@@ -71,12 +71,12 @@ source {
 
 sink {
   Paimon {
-      warehouse = "file:///tmp/paimon"
-      database = "default"
-      table = "st_test_4"
-      paimon.table.write-props = {
-         bucket = -1
-         dynamic-bucket.target-row-num = 5
-      }
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
+    database = "default"
+    table = "st_test_4"
+    paimon.table.write-props = {
+      bucket = -1
+      dynamic-bucket.target-row-num = 5
     }
+  }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
index b1e562c088..92df043c7d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
@@ -25,6 +25,8 @@ env {
 
 source {
   FakeSource {
+    auto.increment.enabled = true
+    auto.increment.start = 1000000
     row.num = 100000
     schema = {
       fields {
@@ -43,10 +45,10 @@ source {
 sink {
   Paimon {
     schema_save_mode = "RECREATE_SCHEMA"
-    catalog_name="seatunnel_test"
-    warehouse="hdfs:///tmp/paimon"
-    database="default"
-    table="st_test_5"
+    catalog_name = "seatunnel_test"
+    warehouse = "hdfs:///tmp/paimon"
+    database = "default"
+    table = "st_test_5"
     paimon.table.write-props = {
       bucket = -1
       dynamic-bucket.target-row-num = 50000
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
index 43fb21dbf8..8c8532b8d8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
@@ -42,7 +42,7 @@ source {
       }
       primaryKey {
         name = "pk"
-        columnNames = 
[c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_bytes,c_date,c_timestamp]
+        columnNames = [c_string, c_boolean, c_tinyint, c_smallint, c_int, 
c_bigint, c_float, c_double, c_decimal, c_bytes, c_date, c_timestamp]
       }
     }
     rows = [
@@ -60,7 +60,7 @@ source {
       }
       {
         kind = INSERT
-        fields = [{"a": "f"}, [104], null, false, 118, 15988, 563873951, 
7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214", 
"bWlJWmo=", "2023-04-24", "2023-04-24T23:20:58", "23:20:58"]
+        fields = [{"a": "f"}, [104], "", false, 118, 15988, 563873951, 
7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214", 
"bWlJWmo=", "2023-04-24", "2023-04-24T23:20:58", "23:20:58"]
       }
       {
         kind = INSERT
@@ -85,11 +85,11 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
     paimon.table.write-props = {
-       bucket = -1
+      bucket = -1
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
index f87cd54213..d5960a79df 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
@@ -42,7 +42,7 @@ source {
       }
       primaryKey {
         name = "pk"
-        columnNames = 
[c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_bytes,c_date,c_timestamp,c_time]
+        columnNames = [c_string, c_boolean, c_tinyint, c_smallint, c_int, 
c_bigint, c_float, c_double, c_decimal, c_bytes, c_date, c_timestamp, c_time]
       }
     }
     rows = [
@@ -73,7 +73,7 @@ source {
 
 sink {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
     paimon.table.write-props = {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
index e93a7a8653..6f794a0c68 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
@@ -29,9 +29,12 @@ env {
 
 source {
   FakeSource {
+    auto.increment.enabled = true
+    auto.increment.start = 1
     row.num = 100000
     schema = {
       fields {
+        pk_id = bigint
         c_map = "map<string, string>"
         c_array = "array<int>"
         c_string = string
@@ -48,6 +51,10 @@ source {
         c_timestamp = timestamp
         c_time = time
       }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
     }
     plugin_output = "fake"
   }
@@ -55,7 +62,7 @@ source {
 
 sink {
   Paimon {
-    warehouse = "/opt/seatunnel_mounts/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "default"
     table = "st_test"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
index e076b521df..d16b54faec 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
@@ -85,7 +85,8 @@ source {
 
 sink {
   Paimon {
-    warehouse = "/tmp/paimon"
+    plugin_input = "fake"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
index 086becf2b8..8efdb8f2cc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
@@ -73,7 +73,7 @@ source {
 
 sink {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
index a379a638eb..86137b9e78 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
@@ -85,11 +85,11 @@ sink {
     database = "seatunnel_namespace11"
     table = "st_test"
     paimon.hadoop.conf = {
-        fs.s3a.access-key=minio
-        fs.s3a.secret-key=miniominio
-        fs.s3a.endpoint="http://minio:9000";
-        fs.s3a.path.style.access=true
-        
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+      fs.s3a.access-key = minio
+      fs.s3a.secret-key = miniominio
+      fs.s3a.endpoint = "http://minio:9000";
+      fs.s3a.path.style.access = true
+      fs.s3a.aws.credentials.provider = 
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
index dc2585abc9..55d94aa033 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
@@ -53,11 +53,11 @@ sink {
     database = "seatunnel_namespace12"
     table = "st_test"
     paimon.hadoop.conf = {
-        fs.s3a.access-key=minio
-        fs.s3a.secret-key=miniominio
-        fs.s3a.endpoint="http://minio:9000";
-        fs.s3a.path.style.access=true
-        
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+      fs.s3a.access-key = minio
+      fs.s3a.secret-key = miniominio
+      fs.s3a.endpoint = "http://minio:9000";
+      fs.s3a.path.style.access = true
+      fs.s3a.aws.credentials.provider = 
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
index a214430dd0..62777ed3d6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
@@ -23,8 +23,8 @@ env {
   parallelism = 5
   job.mode = "STREAMING"
   checkpoint.interval = 5000
-  read_limit.bytes_per_second=7000000
-  read_limit.rows_per_second=400
+  read_limit.bytes_per_second = 7000000
+  read_limit.rows_per_second = 400
 }
 
 source {
@@ -41,7 +41,7 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/paimon"
+    warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "mysql_to_paimon"
     table = "products"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
index a45792848e..bf2781b536 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
@@ -29,7 +29,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/opt/seatunnel_mounts/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "default"
     table = "st_test"
     plugin_output = paimon_source
@@ -41,15 +41,15 @@ sink {
   Assert {
     plugin_input = paimon_source
     rules {
-    row_rules = [
+      row_rules = [
         {
           rule_type = MIN_ROW
           rule_value = 100000
         },
         {
-                  rule_type = MAX_ROW
-                  rule_value = 100000
-                }
+          rule_type = MAX_ROW
+          rule_value = 100000
+        }
       ],
       field_rules = [
         {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
index d8d5c9f0d2..9224fef95d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
@@ -29,7 +29,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/opt/seatunnel_mounts/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "default"
     table = "st_test"
     plugin_output = paimon_source
@@ -40,15 +40,15 @@ sink {
   Assert {
     plugin_input = paimon_source
     rules {
-    row_rules = [
+      row_rules = [
         {
           rule_type = MIN_ROW
           rule_value = 100000
         },
         {
-                  rule_type = MAX_ROW
-                  rule_value = 100000
-                }
+          rule_type = MAX_ROW
+          rule_value = 100000
+        }
       ],
       field_rules = [
         {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
index 6c54339442..96e145a89e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
@@ -22,7 +22,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
     query = "select * from st_test where c_string is not null"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
index c5faa260aa..4dc87f9394 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
@@ -22,7 +22,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
     query = "select * from st_test where c_string='c_string2'"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
index 27d8c1897e..21baafc1e5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
@@ -22,7 +22,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
     query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116 
and c_smallint = 15987"
@@ -56,24 +56,24 @@ sink {
           ]
         }
         {
-            field_name = c_tinyint
-            field_type = tinyint
-            field_value = [
-                {
-                    rule_type = MIN
-                    rule_value = 116
-                }
-            ]
+          field_name = c_tinyint
+          field_type = tinyint
+          field_value = [
+            {
+              rule_type = MIN
+              rule_value = 116
+            }
+          ]
         }
         {
-            field_name = c_smallint
-            field_type = smallint
-            field_value = [
-                {
-                    rule_type = NOT_NULL
-                    equals_to = 15987
-                }
-            ]
+          field_name = c_smallint
+          field_type = smallint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = 15987
+            }
+          ]
         }
       ]
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
index 8bcec7150a..aa8c0f6e55 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
@@ -22,7 +22,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
     query = "select * from st_test where c_date > '2023-04-21' and 
c_timestamp='2023-04-27 23:20:58'"
@@ -46,24 +46,24 @@ sink {
       ]
       field_rules = [
         {
-            field_name = c_date
-            field_type = date
-            field_value = [
-                {
-                    rule_type = NOT_NULL
-                    equals_to = "2023-04-27"
-                }
-            ]
+          field_name = c_date
+          field_type = date
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "2023-04-27"
+            }
+          ]
         }
         {
-            field_name = c_timestamp
-            field_type = timestamp
-            field_value = [
-                {
-                    rule_type = NOT_NULL
-                    equals_to = "2023-04-27T23:20:58"
-                }
-            ]
+          field_name = c_timestamp
+          field_type = timestamp
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "2023-04-27T23:20:58"
+            }
+          ]
         }
       ]
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
index d1d43ecfd2..01eb77fbb7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
@@ -22,7 +22,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
     query = "select * from st_test where c_boolean= 'true' and c_smallint = 
15987 and c_tinyint between 116 and 120"
@@ -56,29 +56,29 @@ sink {
           ]
         }
         {
-            field_name = c_tinyint
-            field_type = tinyint
-            field_value = [
-                {
-                    rule_type = MIN
-                    rule_value = 116
-                },
-                {
-                    rule_type = MAX
-                    rule_value = 120
-                }
+          field_name = c_tinyint
+          field_type = tinyint
+          field_value = [
+            {
+              rule_type = MIN
+              rule_value = 116
+            },
+            {
+              rule_type = MAX
+              rule_value = 120
+            }
 
-            ]
+          ]
         }
         {
-            field_name = c_smallint
-            field_type = smallint
-            field_value = [
-                {
-                    rule_type = NOT_NULL
-                    equals_to = 15987
-                }
-            ]
+          field_name = c_smallint
+          field_type = smallint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = 15987
+            }
+          ]
         }
       ]
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
index 785f294cf6..5dd29e0f50 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
@@ -22,7 +22,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
     query = "select * from st_test where c_boolean= 'true' and c_smallint = 
15987 and c_tinyint in (117, 118, 119)"
@@ -56,29 +56,29 @@ sink {
           ]
         }
         {
-            field_name = c_tinyint
-            field_type = tinyint
-            field_value = [
-                {
-                    rule_type = MIN
-                    rule_value = 117
-                },
-                {
-                    rule_type = MAX
-                    rule_value = 119
-                }
+          field_name = c_tinyint
+          field_type = tinyint
+          field_value = [
+            {
+              rule_type = MIN
+              rule_value = 117
+            },
+            {
+              rule_type = MAX
+              rule_value = 119
+            }
 
-            ]
+          ]
         }
         {
-            field_name = c_smallint
-            field_type = smallint
-            field_value = [
-                {
-                    rule_type = NOT_NULL
-                    equals_to = 15987
-                }
-            ]
+          field_name = c_smallint
+          field_type = smallint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = 15987
+            }
+          ]
         }
       ]
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
index daa5a1fb1e..698bf73cec 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
@@ -22,7 +22,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
     query = "select * from st_test where c_boolean= 'true' and c_smallint = 
15987 and  c_tinyint not in (116, 120)"
@@ -56,29 +56,29 @@ sink {
           ]
         }
         {
-            field_name = c_tinyint
-            field_type = tinyint
-            field_value = [
-                {
-                    rule_type = MIN
-                    rule_value = 117
-                },
-                {
-                    rule_type = MAX
-                    rule_value = 119
-                }
+          field_name = c_tinyint
+          field_type = tinyint
+          field_value = [
+            {
+              rule_type = MIN
+              rule_value = 117
+            },
+            {
+              rule_type = MAX
+              rule_value = 119
+            }
 
-            ]
+          ]
         }
         {
-            field_name = c_smallint
-            field_type = smallint
-            field_value = [
-                {
-                    rule_type = NOT_NULL
-                    equals_to = 15987
-                }
-            ]
+          field_name = c_smallint
+          field_type = smallint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = 15987
+            }
+          ]
         }
       ]
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
index 1e0947403c..f23f8d15df 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
@@ -22,7 +22,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
     query = "select * from st_test where c_string like 'c_string2%'"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
index c9f9136bd3..1de9aa4efb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
@@ -22,12 +22,12 @@ env {
 
 source {
   Paimon {
-    catalog_name="seatunnel_test"
-    catalog_type="hive"
-    catalog_uri="thrift://hadoop04:9083"
-    warehouse="hdfs:///tmp/seatunnel"
-    database="seatunnel_test"
-    table="st_test3"
+    catalog_name = "seatunnel_test"
+    catalog_type = "hive"
+    catalog_uri = "thrift://hadoop04:9083"
+    warehouse = "hdfs:///tmp/seatunnel"
+    database = "seatunnel_test"
+    table = "st_test3"
     paimon.hadoop.conf = {
       fs.defaultFS = "hdfs://nameservice1"
       dfs.nameservices = "nameservice1"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
index 68101da6f3..5c3c340058 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
@@ -22,7 +22,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "seatunnel_namespace7"
     table = "st_test"
     plugin_output = paimon_source
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
index 50728871af..5daa2b8a68 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
@@ -22,7 +22,7 @@ env {
 
 source {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test"
   }
@@ -30,7 +30,7 @@ source {
 
 sink {
   Paimon {
-    warehouse = "/tmp/paimon"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
     database = "full_type"
     table = "st_test_sink"
     paimon.table.primary-keys = "c_tinyint"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
index 6684b5fa95..489ab781fd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
@@ -31,68 +31,68 @@ source {
     database = "seatunnel_namespace11"
     table = "st_test"
     paimon.hadoop.conf = {
-        fs.s3a.access-key=minio
-        fs.s3a.secret-key=miniominio
-        fs.s3a.endpoint="http://minio:9000";
-        fs.s3a.path.style.access=true
-        
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+      fs.s3a.access-key = minio
+      fs.s3a.secret-key = miniominio
+      fs.s3a.endpoint = "http://minio:9000";
+      fs.s3a.path.style.access = true
+      fs.s3a.aws.credentials.provider = 
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
     }
   }
 }
 
 sink {
- Assert {
+  Assert {
     rules {
-        row_rules = [
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 2
+        }
+      ],
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 2
+        }
+      ],
+      field_rules = [
+        {
+          field_name = pk_id
+          field_type = bigint
+          field_value = [
             {
-              rule_type = MIN_ROW
-              rule_value = 2
-            }
-          ],
-          row_rules = [
+              rule_type = NOT_NULL
+            },
             {
-              rule_type = MAX_ROW
-              rule_value = 2
+              rule_type = MIN
+              rule_value = 1
+            },
+            {
+              rule_type = MAX
+              rule_value = 3
             }
-          ],
-          field_rules = [
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
             {
-              field_name = pk_id
-              field_type = bigint
-              field_value = [
-                {
-                  rule_type = NOT_NULL
-                },
-                {
-                  rule_type = MIN
-                  rule_value = 1
-                },
-                {
-                  rule_type = MAX
-                  rule_value = 3
-                }
-              ]
-            },
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = score
+          field_type = int
+          field_value = [
             {
-              field_name = name
-              field_type = string
-              field_value = [
-                {
-                  rule_type = NOT_NULL
-                }
-              ]
-            },
-             {
-               field_name = score
-               field_type = int
-               field_value = [
-                 {
-                   rule_type = NOT_NULL
-                   equals_to = 100
-                 }
-               ]
-             }
+              rule_type = NOT_NULL
+              equals_to = 100
+            }
           ]
         }
+      ]
+    }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
index eff421346f..ae896b18f0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
@@ -22,10 +22,10 @@ env {
 
 source {
   Paimon {
-    catalog_name="seatunnel_test"
-    warehouse="hdfs:///tmp/paimon"
-    database="seatunnel_namespace1"
-    table="st_test"
+    catalog_name = "seatunnel_test"
+    warehouse = "hdfs:///tmp/paimon"
+    database = "seatunnel_namespace1"
+    table = "st_test"
     query = "select * from st_test where pk_id is not null and pk_id < 3"
     paimon.hadoop.conf = {
       fs.defaultFS = "hdfs://nameservice1"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
index 9b0dfa8028..1a9a41d79c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
@@ -1,75 +1,99 @@
 {
-  "id" : 0,
-  "fields" : [ {
-    "id" : 0,
-    "name" : "c_map",
-    "type" : {
-      "type" : "MAP",
-      "key" : "STRING",
-      "value" : "STRING"
+  "id": 0,
+  "fields": [
+    {
+      "id": 0,
+      "name": "pk_id",
+      "type": "BIGINT NOT NULL"
+    },
+    {
+      "id": 1,
+      "name": "c_map",
+      "type": {
+        "type": "MAP",
+        "key": "STRING",
+        "value": "STRING"
+      }
+    },
+    {
+      "id": 2,
+      "name": "c_array",
+      "type": {
+        "type": "ARRAY",
+        "element": "INT"
+      }
+    },
+    {
+      "id": 3,
+      "name": "c_string",
+      "type": "STRING"
+    },
+    {
+      "id": 4,
+      "name": "c_boolean",
+      "type": "BOOLEAN"
+    },
+    {
+      "id": 5,
+      "name": "c_tinyint",
+      "type": "TINYINT"
+    },
+    {
+      "id": 6,
+      "name": "c_smallint",
+      "type": "SMALLINT"
+    },
+    {
+      "id": 7,
+      "name": "c_int",
+      "type": "INT"
+    },
+    {
+      "id": 8,
+      "name": "c_bigint",
+      "type": "BIGINT"
+    },
+    {
+      "id": 9,
+      "name": "c_float",
+      "type": "FLOAT"
+    },
+    {
+      "id": 10,
+      "name": "c_double",
+      "type": "DOUBLE"
+    },
+    {
+      "id": 11,
+      "name": "c_decimal",
+      "type": "DECIMAL(30, 8)"
+    },
+    {
+      "id": 12,
+      "name": "c_bytes",
+      "type": "BYTES"
+    },
+    {
+      "id": 13,
+      "name": "c_date",
+      "type": "DATE"
+    },
+    {
+      "id": 14,
+      "name": "c_timestamp",
+      "type": "TIMESTAMP(6)"
+    },
+    {
+      "id": 15,
+      "name": "c_time",
+      "type": "TIME(0)"
     }
-  }, {
-    "id" : 1,
-    "name" : "c_array",
-    "type" : {
-      "type" : "ARRAY",
-      "element" : "INT"
-    }
-  }, {
-    "id" : 2,
-    "name" : "c_string",
-    "type" : "STRING"
-  }, {
-    "id" : 3,
-    "name" : "c_boolean",
-    "type" : "BOOLEAN"
-  }, {
-    "id" : 4,
-    "name" : "c_tinyint",
-    "type" : "TINYINT"
-  }, {
-    "id" : 5,
-    "name" : "c_smallint",
-    "type" : "SMALLINT"
-  }, {
-    "id" : 6,
-    "name" : "c_int",
-    "type" : "INT"
-  }, {
-    "id" : 7,
-    "name" : "c_bigint",
-    "type" : "BIGINT"
-  }, {
-    "id" : 8,
-    "name" : "c_float",
-    "type" : "FLOAT"
-  }, {
-    "id" : 9,
-    "name" : "c_double",
-    "type" : "DOUBLE"
-  }, {
-    "id" : 10,
-    "name" : "c_decimal",
-    "type" : "DECIMAL(30, 8)"
-  }, {
-    "id" : 11,
-    "name" : "c_bytes",
-    "type" : "BYTES"
-  }, {
-    "id" : 12,
-    "name" : "c_date",
-    "type" : "DATE"
-  }, {
-    "id" : 13,
-    "name" : "c_timestamp",
-    "type" : "TIMESTAMP(6)"
-  }, {
-      "id" : 14,
-      "name" : "c_time",
-      "type" : "TIME"
-    }],
-  "highestFieldId" : 14,
-  "partitionKeys" : [ ],
-  "primaryKeys" : [ ],
-  "options" : { }
-}
+  ],
+  "highestFieldId": 15,
+  "partitionKeys": [],
+  "primaryKeys": [
+    "pk_id"
+  ],
+  "options": {},
+  "timeMillis": 1751613422623
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
index bcf2043d70..9e02db6a7b 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
@@ -43,6 +43,21 @@ public abstract class AbstractTestContainer implements 
TestContainer {
     protected static final String START_ROOT_MODULE_NAME = "seatunnel-core";
 
     public static final String SEATUNNEL_HOME = "/tmp/seatunnel/";
+
+    protected static final boolean isWindows =
+            
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
+
+    protected static String hostName = System.getProperty("user.name");
+    protected Integer hostUid = Integer.parseInt(System.getProperty("user.id", 
"1000"));
+    protected Integer hostGid = 
Integer.parseInt(System.getProperty("user.gid", "1000"));
+
+    protected static final String CONTAINER_VOLUME_MOUNT_PATH = 
"/tmp/seatunnel_mnt";
+
+    public static final String HOST_VOLUME_MOUNT_PATH =
+            isWindows
+                    ? String.format("C:/Users/%s/tmp/seatunnel_mnt", hostName)
+                    : CONTAINER_VOLUME_MOUNT_PATH;
+
     protected final String startModuleName;
 
     protected final String startModuleFullPath;
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 1bfe8e8c3c..7d2dec4360 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.e2e.common.container.flink;
 
 import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 
+import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -61,8 +62,6 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
 
     protected static final String DEFAULT_DOCKER_IMAGE = 
"flink:1.13.6-scala_2.11";
 
-    protected static final String MOUNTS_PATH = "/opt/seatunnel_mounts";
-
     protected GenericContainer<?> jobManager;
     protected GenericContainer<?> taskManager;
 
@@ -73,6 +72,7 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
 
     @Override
     public void startUp() throws Exception {
+        FileUtils.createNewDir(HOST_VOLUME_MOUNT_PATH);
         final String dockerImage = getDockerImage();
         final String properties = String.join("\n", getFlinkProperties());
         jobManager =
@@ -89,10 +89,12 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
                                 new LogMessageWaitStrategy()
                                         .withRegEx(".*Starting the resource 
manager.*")
                                         
.withStartupTimeout(Duration.ofMinutes(2)))
-                        .withFileSystemBind(MOUNTS_PATH, MOUNTS_PATH, 
BindMode.READ_WRITE);
+                        .withFileSystemBind(
+                                HOST_VOLUME_MOUNT_PATH,
+                                CONTAINER_VOLUME_MOUNT_PATH,
+                                BindMode.READ_WRITE);
         copySeaTunnelStarterToContainer(jobManager);
         copySeaTunnelStarterLoggingToContainer(jobManager);
-
         jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
8081, 8081)));
 
         taskManager =
@@ -111,11 +113,13 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
                                         .withRegEx(
                                                 ".*Successful registration at 
resource manager.*")
                                         
.withStartupTimeout(Duration.ofMinutes(2)))
-                        .withFileSystemBind(MOUNTS_PATH, MOUNTS_PATH, 
BindMode.READ_WRITE);
+                        .withFileSystemBind(
+                                HOST_VOLUME_MOUNT_PATH,
+                                CONTAINER_VOLUME_MOUNT_PATH,
+                                BindMode.READ_WRITE);
 
         Startables.deepStart(Stream.of(jobManager)).join();
         Startables.deepStart(Stream.of(taskManager)).join();
-        // execute extra commands
         executeExtraCommands(jobManager);
     }
 
@@ -126,11 +130,16 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
     @Override
     public void tearDown() throws Exception {
         if (taskManager != null) {
+            // delete the volume
+            taskManager.execInContainer("rm", "-rf", 
CONTAINER_VOLUME_MOUNT_PATH);
             taskManager.stop();
         }
         if (jobManager != null) {
+            // delete the volume
+            jobManager.execInContainer("rm", "-rf", 
CONTAINER_VOLUME_MOUNT_PATH);
             jobManager.stop();
         }
+        FileUtils.deleteFile(HOST_VOLUME_MOUNT_PATH);
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 2e225b2745..6397773294 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -39,6 +39,7 @@ import org.apache.http.util.EntityUtils;
 
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
@@ -84,6 +85,7 @@ public class SeaTunnelContainer extends AbstractTestContainer 
{
 
     @Override
     public void startUp() throws Exception {
+        FileUtils.createNewDir(HOST_VOLUME_MOUNT_PATH);
         server = createSeaTunnelServer();
     }
 
@@ -114,6 +116,10 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                                 new Slf4jLogConsumer(
                                         DockerLoggerFactory.getLogger(
                                                 "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
+                        .withFileSystemBind(
+                                HOST_VOLUME_MOUNT_PATH,
+                                CONTAINER_VOLUME_MOUNT_PATH,
+                                BindMode.READ_WRITE)
                         .waitingFor(Wait.forLogMessage(".*received new worker 
register:.*", 1));
         copySeaTunnelStarterToContainer(server);
         server.setPortBindings(Arrays.asList("5801:5801", "8080:8080"));
@@ -213,8 +219,11 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
     @Override
     public void tearDown() throws Exception {
         if (server != null) {
+            // delete the volume
+            server.execInContainer("rm", "-rf", CONTAINER_VOLUME_MOUNT_PATH);
             server.close();
         }
+        FileUtils.deleteFile(HOST_VOLUME_MOUNT_PATH);
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index d6c08f1231..ac201f2ae0 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -17,10 +17,12 @@
 
 package org.apache.seatunnel.e2e.common.container.spark;
 
+import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
+import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -52,6 +54,7 @@ public abstract class AbstractTestSparkContainer extends 
AbstractTestContainer {
 
     @Override
     public void startUp() throws Exception {
+        FileUtils.createNewDir(HOST_VOLUME_MOUNT_PATH);
         master =
                 new GenericContainer<>(getDockerImage())
                         .withNetwork(NETWORK)
@@ -62,6 +65,10 @@ public abstract class AbstractTestSparkContainer extends 
AbstractTestContainer {
                                 new Slf4jLogConsumer(
                                         
DockerLoggerFactory.getLogger(getDockerImage())))
                         .withCreateContainerCmdModifier(cmd -> 
cmd.withUser("root"))
+                        .withFileSystemBind(
+                                HOST_VOLUME_MOUNT_PATH,
+                                CONTAINER_VOLUME_MOUNT_PATH,
+                                BindMode.READ_WRITE)
                         .waitingFor(
                                 new LogMessageWaitStrategy()
                                         .withRegEx(".*Master: Starting Spark 
master at.*")
@@ -80,8 +87,11 @@ public abstract class AbstractTestSparkContainer extends 
AbstractTestContainer {
     @Override
     public void tearDown() throws Exception {
         if (master != null) {
+            // delete the volume
+            master.execInContainer("rm", "-rf", CONTAINER_VOLUME_MOUNT_PATH);
             master.stop();
         }
+        FileUtils.deleteFile(HOST_VOLUME_MOUNT_PATH);
     }
 
     @Override

Reply via email to