This is an automated email from the ASF dual-hosted git repository.

dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c0f27c2f76 [Feature][Connector-V2] Piamon Sink supports 
changelog-procuder is lookup and full-compaction mode (#7834)
c0f27c2f76 is described below

commit c0f27c2f76c6562b18a7b4c76d2c7ef4452f4b4a
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Oct 21 10:41:38 2024 +0800

    [Feature][Connector-V2] Piamon Sink supports changelog-procuder is lookup 
and full-compaction mode (#7834)
---
 docs/en/connector-v2/sink/Paimon.md                |  45 +++++-
 docs/zh/connector-v2/sink/Paimon.md                |  70 +++++++---
 .../seatunnel/paimon/config/PaimonSinkConfig.java  |  33 ++++-
 .../seatunnel/paimon/sink/PaimonSink.java          |  15 +-
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    |  48 +++++--
 .../seatunnel/paimon/utils/SchemaUtil.java         |   5 +
 .../e2e/connector/paimon/PaimonRecord.java         |  30 ++++
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      | 152 ++++++++++++++++++++-
 .../changelog_fake_cdc_sink_paimon_case1_ddl.conf  |  53 +++++++
 ...log_fake_cdc_sink_paimon_case1_insert_data.conf |  67 +++++++++
 ...log_fake_cdc_sink_paimon_case1_update_data.conf |  71 ++++++++++
 .../changelog_fake_cdc_sink_paimon_case2.conf      |  83 +++++++++++
 .../test/resources/changelog_paimon_to_paimon.conf |  48 +++++++
 .../common/container/AbstractTestContainer.java    |  24 +++-
 .../e2e/common/container/TestContainer.java        |  13 +-
 .../flink/AbstractTestFlinkContainer.java          |   9 +-
 .../ConnectorPackageServiceContainer.java          |   9 +-
 .../container/seatunnel/SeaTunnelContainer.java    |  27 +++-
 .../spark/AbstractTestSparkContainer.java          |  10 +-
 .../seatunnel/e2e/common/util/JobIdGenerator.java  |  27 ++++
 20 files changed, 786 insertions(+), 53 deletions(-)

diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index 8133b6e836..c9e4b3a9b6 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -31,7 +31,7 @@ libfb303-xxx.jar
 
 ## Options
 
-|            name             |  type  | required |        default value       
  | Description                                                                 
                                                                                
     |
+|            name             | type   | required | default value              
  | Description                                                                 
                                                                                
     |
 
|-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | warehouse                   | String | Yes      | -                          
  | Paimon warehouse path                                                       
                                                                                
     |
 | catalog_type                | String | No       | filesystem                 
  | Catalog type of Paimon, support filesystem and hive                         
                                                                                
     |
@@ -43,7 +43,7 @@ libfb303-xxx.jar
 | data_save_mode              | Enum   | No       | APPEND_DATA                
  | The data save mode                                                          
                                                                                
     |
 | paimon.table.primary-keys   | String | No       | -                          
  | Default comma-separated list of columns (primary key) that identify a row 
in tables.(Notice: The partition field needs to be included in the primary key 
fields) |
 | paimon.table.partition-keys | String | No       | -                          
  | Default comma-separated list of partition fields to use when creating 
tables.                                                                         
           |
-| paimon.table.write-props    | Map    | No       | -                          
  | Properties passed through to paimon table initialization, 
[reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions).
               |
+| paimon.table.write-props    | Map    | No       | -                          
  | Properties passed through to paimon table initialization, 
[reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions).
            |
 | paimon.hadoop.conf          | Map    | No       | -                          
  | Properties in hadoop conf                                                   
                                                                                
     |
 | paimon.hadoop.conf-path     | String | No       | -                          
  | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files                                                           
            |
 
@@ -52,9 +52,14 @@ You must configure the `changelog-producer=input` option to 
enable the changelog
 
 The changelog producer mode of the paimon table has [four 
mode](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/)
 which is `none`、`input`、`lookup` and `full-compaction`.
 
-Currently, we only support the `none` and `input` mode. The default is `none` 
which will not output the changelog file. The `input` mode will output the 
changelog file in paimon table.
+All `changelog-producer` modes are currently supported. The default is `none`.
 
-When you use a streaming mode to read paimon table, these two mode will 
produce [different 
results](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog).
+* 
[`none`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#none)
+* 
[`input`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#input)
+* 
[`lookup`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#lookup)
+* 
[`full-compaction`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#full-compaction)
+> note: 
+> When you use a streaming mode to read paimon table,different mode will 
produce [different 
results](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
 
 ## Examples
 
@@ -250,6 +255,38 @@ sink {
 }
 ```
 
+#### Write with the `changelog-producer` attribute
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+  base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+  username = "root"
+  password = "******"
+  table-names = ["seatunnel.role"]
+ }
+}
+
+sink {
+ Paimon {
+  catalog_name = "seatunnel_test"
+  warehouse = "file:///tmp/seatunnel/paimon/hadoop-sink/"
+  database = "seatunnel"
+  table = "role"
+  paimon.table.write-props = {
+   changelog-producer = full-compaction
+   changelog-tmp-path = /tmp/paimon/changelog
+  }
+ }
+}
+```
+
 ### Write to dynamic bucket table 
 
 Single dynamic bucket table with write props of paimon,operates on the primary 
key table and bucket is -1.
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
index 32d35a5e95..375c8c90ca 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -30,30 +30,35 @@ libfb303-xxx.jar
 
 ## 连接器选项
 
-| 名称                        | 类型  | 是否必须 | 默认值                        | 描述     
                                                                                
           |
-|-----------------------------|-------|----------|------------------------------|---------------------------------------------------------------------------------------------------|
-| warehouse                   | 字符串 | 是       | -                            | 
Paimon warehouse路径                                                              
                  |
-| catalog_type                | 字符串 | 否       | filesystem                   | 
Paimon的catalog类型,目前支持filesystem和hive                                            
                  |
-| catalog_uri                 | 字符串 | 否       | -                            | 
Paimon catalog的uri,仅当catalog_type为hive时需要配置                                     
                  |
-| database                    | 字符串 | 是       | -                            | 
数据库名称                                                                           
                  |
-| table                       | 字符串 | 是       | -                            | 
表名                                                                              
                  |
-| hdfs_site_path              | 字符串 | 否       | -                            | 
hdfs-site.xml文件路径                                                               
                  |
-| schema_save_mode            | 枚举   | 否       | CREATE_SCHEMA_WHEN_NOT_EXIST 
| Schema保存模式                                                                    
                    |
-| data_save_mode              | 枚举   | 否       | APPEND_DATA                  
| 数据保存模式                                                                        
                    |
-| paimon.table.primary-keys   | 字符串 | 否       | -                            | 
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)                                            
                  |
-| paimon.table.partition-keys | 字符串 | 否       | -                            | 
分区字段列表,多字段使用逗号分隔                                                                
                  |
-| paimon.table.write-props    | Map    | 否       | -                           
 | Paimon表初始化指定的属性, 
[参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions)
 |
-| paimon.hadoop.conf          | Map    | 否       | -                           
 | Hadoop配置文件属性信息                                                               
                     |
-| paimon.hadoop.conf-path     | 字符串 | 否       | -                            | 
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置          
                  |
+| 名称                          | 类型   | 是否必须 | 默认值                          | 
描述                                                                              
                      |
+|-----------------------------|------|------|------------------------------|-------------------------------------------------------------------------------------------------------|
+| warehouse                   | 字符串  | 是    | -                            | 
Paimon warehouse路径                                                              
                      |
+| catalog_type                | 字符串  | 否    | filesystem                   | 
Paimon的catalog类型,目前支持filesystem和hive                                            
                      |
+| catalog_uri                 | 字符串  | 否    | -                            | 
Paimon catalog的uri,仅当catalog_type为hive时需要配置                                     
                      |
+| database                    | 字符串  | 是    | -                            | 
数据库名称                                                                           
                      |
+| table                       | 字符串  | 是    | -                            | 
表名                                                                              
                      |
+| hdfs_site_path              | 字符串  | 否    | -                            | 
hdfs-site.xml文件路径                                                               
                      |
+| schema_save_mode            | 枚举   | 否    | CREATE_SCHEMA_WHEN_NOT_EXIST | 
Schema保存模式                                                                      
                      |
+| data_save_mode              | 枚举   | 否    | APPEND_DATA                  | 
数据保存模式                                                                          
                      |
+| paimon.table.primary-keys   | 字符串  | 否    | -                            | 
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)                                            
                      |
+| paimon.table.partition-keys | 字符串  | 否    | -                            | 
分区字段列表,多字段使用逗号分隔                                                                
                      |
+| paimon.table.write-props    | Map  | 否    | -                            | 
Paimon表初始化指定的属性, 
[参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions)
  |
+| paimon.hadoop.conf          | Map  | 否    | -                            | 
Hadoop配置文件属性信息                                                                  
                      |
+| paimon.hadoop.conf-path     | 字符串  | 否    | -                            | 
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置          
                      |
 
 ## 更新日志
 你必须配置`changelog-producer=input`来启用paimon表的changelog产生模式。如果你使用了paimon 
sink的自动建表功能,你可以在`paimon.table.write-props`中指定这个属性。
 
 
Paimon表的changelog产生模式有[四种](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/),分别是`none`、`input`、`lookup`
 和 `full-compaction`。
 
-目前,我们只支持`none`和`input`模式。默认是`none`,这种模式将不会产生changelog文件。`input`模式将会在Paimon表下产生changelog文件。
+目前支持全部`changelog-producer`模式。默认是`none`模式。
 
-当你使用流模式去读paimon表的数据时,这两种模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
+* 
[`none`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#none)
+* 
[`input`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#input)
+* 
[`lookup`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#lookup)
+* 
[`full-compaction`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#full-compaction)
+> 注意:
+ > 
当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
 
 ## 示例
 
@@ -248,6 +253,37 @@ sink {
   }
 }
 ```
+#### 使用`changelog-producer`属性写入
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+  base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+  username = "root"
+  password = "******"
+  table-names = ["seatunnel.role"]
+ }
+}
+
+sink {
+ Paimon {
+  catalog_name = "seatunnel_test"
+  warehouse = "file:///tmp/seatunnel/paimon/hadoop-sink/"
+  database = "seatunnel"
+  table = "role"
+  paimon.table.write-props = {
+   changelog-producer = full-compaction
+   changelog-tmp-path = /tmp/paimon/changelog
+  }
+ }
+}
+```
 
 ### 动态分桶paimon单表
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index 9b358a2e8c..87766ff96b 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -23,16 +23,22 @@ import 
org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
+import org.apache.paimon.CoreOptions;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 @Getter
 @Slf4j
 public class PaimonSinkConfig extends PaimonConfig {
+
+    public static final String CHANGELOG_TMP_PATH = "changelog-tmp-path";
+
     public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
             Options.key("schema_save_mode")
                     .enumType(SchemaSaveMode.class)
@@ -44,7 +50,6 @@ public class PaimonSinkConfig extends PaimonConfig {
                     .enumType(DataSaveMode.class)
                     .defaultValue(DataSaveMode.APPEND_DATA)
                     .withDescription("data_save_mode");
-
     public static final Option<String> PRIMARY_KEYS =
             Options.key("paimon.table.primary-keys")
                     .stringType()
@@ -66,11 +71,13 @@ public class PaimonSinkConfig extends PaimonConfig {
                     .withDescription(
                             "Properties passed through to paimon table 
initialization, such as 'file.format', 
'bucket'(org.apache.paimon.CoreOptions)");
 
-    private SchemaSaveMode schemaSaveMode;
-    private DataSaveMode dataSaveMode;
-    private List<String> primaryKeys;
-    private List<String> partitionKeys;
-    private Map<String, String> writeProps;
+    private final SchemaSaveMode schemaSaveMode;
+    private final DataSaveMode dataSaveMode;
+    private final CoreOptions.ChangelogProducer changelogProducer;
+    private final String changelogTmpPath;
+    private final List<String> primaryKeys;
+    private final List<String> partitionKeys;
+    private final Map<String, String> writeProps;
 
     public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
         super(readonlyConfig);
@@ -79,6 +86,20 @@ public class PaimonSinkConfig extends PaimonConfig {
         this.primaryKeys = stringToList(readonlyConfig.get(PRIMARY_KEYS), ",");
         this.partitionKeys = stringToList(readonlyConfig.get(PARTITION_KEYS), 
",");
         this.writeProps = readonlyConfig.get(WRITE_PROPS);
+        this.changelogProducer =
+                Stream.of(CoreOptions.ChangelogProducer.values())
+                        .filter(
+                                cp ->
+                                        cp.toString()
+                                                .equalsIgnoreCase(
+                                                        
writeProps.getOrDefault(
+                                                                
CoreOptions.CHANGELOG_PRODUCER
+                                                                        .key(),
+                                                                "")))
+                        .findFirst()
+                        .orElse(null);
+        this.changelogTmpPath =
+                writeProps.getOrDefault(CHANGELOG_TMP_PATH, 
System.getProperty("java.io.tmpdir"));
         checkConfig();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 73d2151b89..86828c9a58 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -94,7 +94,12 @@ public class PaimonSink
     @Override
     public PaimonSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
         return new PaimonSinkWriter(
-                context, table, seaTunnelRowType, jobContext, 
paimonHadoopConfiguration);
+                context,
+                table,
+                seaTunnelRowType,
+                jobContext,
+                paimonSinkConfig,
+                paimonHadoopConfiguration);
     }
 
     @Override
@@ -108,7 +113,13 @@ public class PaimonSink
     public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState> 
restoreWriter(
             SinkWriter.Context context, List<PaimonSinkState> states) throws 
IOException {
         return new PaimonSinkWriter(
-                context, table, seaTunnelRowType, states, jobContext, 
paimonHadoopConfiguration);
+                context,
+                table,
+                seaTunnelRowType,
+                states,
+                jobContext,
+                paimonSinkConfig,
+                paimonHadoopConfiguration);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index ac0b1027d0..e57e62c981 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
@@ -33,7 +34,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkSta
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
@@ -58,6 +61,8 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
+
 @Slf4j
 public class PaimonSinkWriter
         implements SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>,
@@ -65,14 +70,14 @@ public class PaimonSinkWriter
 
     private String commitUser = UUID.randomUUID().toString();
 
+    private final FileStoreTable table;
+
     private final WriteBuilder tableWriteBuilder;
 
     private final TableWrite tableWrite;
 
     private List<CommitMessage> committables = new ArrayList<>();
 
-    private final Table table;
-
     private final SeaTunnelRowType seaTunnelRowType;
 
     private final SinkWriter.Context context;
@@ -90,18 +95,30 @@ public class PaimonSinkWriter
             Table table,
             SeaTunnelRowType seaTunnelRowType,
             JobContext jobContext,
+            PaimonSinkConfig paimonSinkConfig,
             PaimonHadoopConfiguration paimonHadoopConfiguration) {
-        this.table = table;
+        this.table = (FileStoreTable) table;
+        CoreOptions.ChangelogProducer changelogProducer =
+                this.table.coreOptions().changelogProducer();
+        if (Objects.nonNull(paimonSinkConfig.getChangelogProducer())
+                && changelogProducer != 
paimonSinkConfig.getChangelogProducer()) {
+            log.warn(
+                    "configured the props named 'changelog-producer' which is 
not compatible with the options in table , so it will use the table's 
'changelog-producer'");
+        }
+        String changelogTmpPath = paimonSinkConfig.getChangelogTmpPath();
         this.tableWriteBuilder =
                 JobContextUtil.isBatchJob(jobContext)
                         ? this.table.newBatchWriteBuilder()
                         : this.table.newStreamWriteBuilder();
-        this.tableWrite = tableWriteBuilder.newWrite();
+        this.tableWrite =
+                tableWriteBuilder
+                        .newWrite()
+                        
.withIOManager(IOManager.create(splitPaths(changelogTmpPath)));
         this.seaTunnelRowType = seaTunnelRowType;
         this.context = context;
         this.jobContext = jobContext;
-        this.tableSchema = ((FileStoreTable) table).schema();
-        BucketMode bucketMode = ((FileStoreTable) table).bucketMode();
+        this.tableSchema = this.table.schema();
+        BucketMode bucketMode = this.table.bucketMode();
         this.dynamicBucket =
                 BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC 
== bucketMode;
         int bucket = ((FileStoreTable) table).coreOptions().bucket();
@@ -124,8 +141,15 @@ public class PaimonSinkWriter
             SeaTunnelRowType seaTunnelRowType,
             List<PaimonSinkState> states,
             JobContext jobContext,
+            PaimonSinkConfig paimonSinkConfig,
             PaimonHadoopConfiguration paimonHadoopConfiguration) {
-        this(context, table, seaTunnelRowType, jobContext, 
paimonHadoopConfiguration);
+        this(
+                context,
+                table,
+                seaTunnelRowType,
+                jobContext,
+                paimonSinkConfig,
+                paimonHadoopConfiguration);
         if (Objects.isNull(states) || states.isEmpty()) {
             return;
         }
@@ -186,7 +210,8 @@ public class PaimonSinkWriter
                 fileCommittables = ((BatchTableWrite) 
tableWrite).prepareCommit();
             } else {
                 fileCommittables =
-                        ((StreamTableWrite) tableWrite).prepareCommit(false, 
checkpointId);
+                        ((StreamTableWrite) tableWrite)
+                                .prepareCommit(waitCompaction(), checkpointId);
             }
             committables.addAll(fileCommittables);
             return Optional.of(new PaimonCommitInfo(fileCommittables, 
checkpointId));
@@ -224,4 +249,11 @@ public class PaimonSinkWriter
             committables.clear();
         }
     }
+
+    private boolean waitCompaction() {
+        CoreOptions.ChangelogProducer changelogProducer =
+                this.table.coreOptions().changelogProducer();
+        return changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
+                || changelogProducer == 
CoreOptions.ChangelogProducer.FULL_COMPACTION;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index fa8ed33820..ca825a269f 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -25,6 +25,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -61,6 +62,10 @@ public class SchemaUtil {
             paiSchemaBuilder.partitionKeys(partitionKeys);
         }
         Map<String, String> writeProps = paimonSinkConfig.getWriteProps();
+        CoreOptions.ChangelogProducer changelogProducer = 
paimonSinkConfig.getChangelogProducer();
+        if (changelogProducer != null) {
+            writeProps.remove(PaimonSinkConfig.CHANGELOG_TMP_PATH);
+        }
         if (!writeProps.isEmpty()) {
             paiSchemaBuilder.options(writeProps);
         }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
index 700bf25f51..c17d8dbc14 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
@@ -21,18 +21,23 @@
 package org.apache.seatunnel.e2e.connector.paimon;
 
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.util.Arrays;
+
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 public class PaimonRecord {
+    public RowKind rowKind;
     public Long pkId;
     public String name;
     public Integer score;
+    public String op;
     public String dt;
     public Timestamp oneTime;
     public Timestamp twoTime;
@@ -45,6 +50,12 @@ public class PaimonRecord {
         this.name = name;
     }
 
+    public PaimonRecord(RowKind rowKind, Long pkId, String name) {
+        this(pkId, name);
+        this.rowKind = rowKind;
+        this.name = name;
+    }
+
     public PaimonRecord(Long pkId, String name, String dt) {
         this(pkId, name);
         this.dt = dt;
@@ -68,4 +79,23 @@ public class PaimonRecord {
         this.threeTime = threeTime;
         this.fourTime = fourTime;
     }
+
+    public String toChangeLogFull() {
+        Object[] objects = new Object[4];
+        objects[0] = rowKind.shortString();
+        objects[1] = pkId;
+        objects[2] = name;
+        objects[3] = score;
+        return Arrays.toString(objects);
+    }
+
+    public String toChangeLogLookUp() {
+        Object[] objects = new Object[5];
+        objects[0] = rowKind.shortString();
+        objects[1] = pkId;
+        objects[2] = name;
+        objects[3] = score;
+        objects[4] = op;
+        return Arrays.toString(objects);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index dc6bfc9eba..293cf6c76e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.e2e.connector.paimon;
 
 import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.core.starter.utils.CompressionUtils;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
@@ -25,6 +26,7 @@ import 
org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.apache.commons.compress.archivers.ArchiveException;
 import org.apache.commons.lang3.StringUtils;
@@ -58,7 +60,9 @@ import java.io.File;
 import java.io.IOException;
 import java.time.LocalDate;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -84,6 +88,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements 
TestResource {
     private String CATALOG_ROOT_DIR_WIN = "C:/Users/";
     private String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
     private boolean isWindows;
+    private boolean changeLogEnabled = false;
 
     @BeforeAll
     @Override
@@ -545,6 +550,129 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
                         });
     }
 
+    @TestTemplate
+    public void testChangelogLookup(TestContainer container) throws Exception {
+        // create Piamon table (changelog-producer=lookup)
+        Container.ExecResult writeResult =
+                
container.executeJob("/changelog_fake_cdc_sink_paimon_case1_ddl.conf");
+        Assertions.assertEquals(0, writeResult.getExitCode());
+        TimeUnit.SECONDS.sleep(20);
+        String[] jobIds =
+                new String[] {
+                    JobIdGenerator.newJobId(), JobIdGenerator.newJobId(), 
JobIdGenerator.newJobId()
+                };
+        log.info("jobIds: {}", Arrays.toString(jobIds));
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        // read changelog and write to append only paimon table
+        futures.add(
+                CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                
container.executeJob("/changelog_paimon_to_paimon.conf", jobIds[0]);
+                            } catch (Exception e) {
+                                throw new SeaTunnelException(e);
+                            }
+                        }));
+        TimeUnit.SECONDS.sleep(10);
+        // dml: insert data
+        futures.add(
+                CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                container.executeJob(
+                                        
"/changelog_fake_cdc_sink_paimon_case1_insert_data.conf",
+                                        jobIds[1]);
+                            } catch (Exception e) {
+                                throw new SeaTunnelException(e);
+                            }
+                        }));
+        // dml: update and delete data
+        TimeUnit.SECONDS.sleep(10);
+        futures.add(
+                CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                container.executeJob(
+                                        
"/changelog_fake_cdc_sink_paimon_case1_update_data.conf",
+                                        jobIds[2]);
+                            } catch (Exception e) {
+                                throw new SeaTunnelException(e);
+                            }
+                        }));
+        // stream job running 30 seconds
+        TimeUnit.SECONDS.sleep(30);
+        // cancel stream job
+        container.cancelJob(jobIds[1]);
+        container.cancelJob(jobIds[2]);
+        container.cancelJob(jobIds[0]);
+        changeLogEnabled = true;
+        TimeUnit.SECONDS.sleep(10);
+        // copy paimon to local
+        container.executeExtraCommands(containerExtendedFactory);
+        List<PaimonRecord> paimonRecords1 = 
loadPaimonData("seatunnel_namespace", "st_test_sink");
+        List<String> actual1 =
+                paimonRecords1.stream()
+                        .map(PaimonRecord::toChangeLogLookUp)
+                        .collect(Collectors.toList());
+        log.info("paimon records: {}", actual1);
+        Assertions.assertEquals(8, actual1.size());
+        Assertions.assertEquals(
+                Arrays.asList(
+                        "[+I, 1, A, 100, +I]",
+                        "[+I, 2, B, 100, +I]",
+                        "[+I, 3, C, 100, +I]",
+                        "[+I, 1, A, 100, -U]",
+                        "[+I, 1, Aa, 200, +U]",
+                        "[+I, 2, B, 100, -U]",
+                        "[+I, 2, Bb, 90, +U]",
+                        "[+I, 3, C, 100, -D]"),
+                actual1);
+        List<PaimonRecord> paimonRecords2 = 
loadPaimonData("seatunnel_namespace", "st_test_lookup");
+        List<String> actual2 =
+                paimonRecords2.stream()
+                        .map(PaimonRecord::toChangeLogFull)
+                        .collect(Collectors.toList());
+        log.info("paimon records: {}", actual2);
+        Assertions.assertEquals(2, actual2.size());
+        Assertions.assertEquals(Arrays.asList("[+U, 1, Aa, 200]", "[+I, 2, Bb, 
90]"), actual2);
+        changeLogEnabled = false;
+        futures.forEach(future -> future.cancel(true));
+    }
+
+    @TestTemplate
+    public void testChangelogFullCompaction(TestContainer container) throws 
Exception {
+        String jobId = JobIdGenerator.newJobId();
+        log.info("jobId: {}", jobId);
+        CompletableFuture<Void> voidCompletableFuture =
+                CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                container.executeJob(
+                                        
"/changelog_fake_cdc_sink_paimon_case2.conf", jobId);
+                            } catch (Exception e) {
+                                throw new SeaTunnelException(e);
+                            }
+                        });
+        // stream job running 20 seconds
+        TimeUnit.SECONDS.sleep(20);
+        changeLogEnabled = true;
+        // cancel stream job
+        container.cancelJob(jobId);
+        TimeUnit.SECONDS.sleep(5);
+        // copy paimon to local
+        container.executeExtraCommands(containerExtendedFactory);
+        List<PaimonRecord> paimonRecords = 
loadPaimonData("seatunnel_namespace", "st_test_full");
+        List<String> actual =
+                paimonRecords.stream()
+                        .map(PaimonRecord::toChangeLogFull)
+                        .collect(Collectors.toList());
+        log.info("paimon records: {}", actual);
+        Assertions.assertEquals(2, actual.size());
+        Assertions.assertEquals(Arrays.asList("[+U, 1, Aa, 200]", "[+I, 2, Bb, 
90]"), actual);
+        changeLogEnabled = false;
+        voidCompletableFuture.cancel(true);
+    }
+
     protected final ContainerExtendedFactory containerExtendedFactory =
             container -> {
                 if (isWindows) {
@@ -619,13 +747,31 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
         try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
             reader.forEachRemaining(
                     row -> {
-                        PaimonRecord paimonRecord =
-                                new PaimonRecord(row.getLong(0), 
row.getString(1).toString());
+                        PaimonRecord paimonRecord;
+                        if (changeLogEnabled) {
+                            paimonRecord =
+                                    new PaimonRecord(
+                                            row.getRowKind(),
+                                            row.getLong(0),
+                                            row.getString(1).toString());
+                        } else {
+                            paimonRecord =
+                                    new PaimonRecord(row.getLong(0), 
row.getString(1).toString());
+                        }
                         if (table.schema().fieldNames().contains("score")) {
                             paimonRecord.setScore(row.getInt(2));
                         }
+                        if (table.schema().fieldNames().contains("op")) {
+                            paimonRecord.setOp(row.getString(3).toString());
+                        }
                         result.add(paimonRecord);
-                        log.info("key_id:" + row.getLong(0) + ", name:" + 
row.getString(1));
+                        log.info(
+                                "rowKind:"
+                                        + row.getRowKind().shortString()
+                                        + ", key_id:"
+                                        + row.getLong(0)
+                                        + ", name:"
+                                        + row.getString(1));
                     });
         }
         log.info(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
new file mode 100644
index 0000000000..6a32727505
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "batch"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = []
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "seatunnel_namespace"
+    table = "st_test_lookup"
+    paimon.table.write-props = {
+      changelog-producer = lookup
+      changelog-tmp-path = "/tmp/paimon/changelog"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
new file mode 100644
index 0000000000..9b7310177c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "Streaming"
+  checkpoint.interval = 2000
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "seatunnel_namespace"
+    table = "st_test_lookup"
+    paimon.table.write-props = {
+      changelog-producer = lookup
+      changelog-tmp-path = "/tmp/paimon/changelog"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
new file mode 100644
index 0000000000..271ad20bff
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "Streaming"
+  checkpoint.interval = 2000
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "Aa", 200]
+      },
+      {
+        kind = INSERT
+        fields = [2, "Bb", 90]
+      },
+      {
+        kind = DELETE
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "seatunnel_namespace"
+    table = "st_test_lookup"
+    paimon.table.write-props = {
+      changelog-producer = lookup
+      changelog-tmp-path = "/tmp/paimon/changelog"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
new file mode 100644
index 0000000000..f7135e645f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "Streaming"
+  checkpoint.interval = 2000
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "Aa", 200]
+      },
+      {
+        kind = INSERT
+        fields = [2, "Bb", 90]
+      },
+      {
+        kind = DELETE
+        fields = [3, "C", 100]
+      },
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "seatunnel_namespace"
+    table = "st_test_full"
+    paimon.table.write-props = {
+      changelog-producer = full-compaction
+      changelog-tmp-path = "/tmp/paimon/changelog"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
new file mode 100644
index 0000000000..d23d11d9e0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "Streaming"
+  checkpoint.interval = 2000
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "seatunnel_namespace"
+    table = "st_test_lookup"
+  }
+}
+
+transform {
+  RowKindExtractor {
+    custom_field_name = op
+    transform_type = SHORT
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "seatunnel_namespace"
+    table = "st_test_sink"
+    paimon.table.write-props = {
+      write-only = true
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
index d7bd0f4d74..10d5685c6d 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.e2e.common.container;
 
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
@@ -68,6 +70,8 @@ public abstract class AbstractTestContainer implements 
TestContainer {
 
     protected abstract String getSavePointCommand();
 
+    protected abstract String getCancelJobCommand();
+
     protected abstract String getRestoreCommand();
 
     protected abstract String getConnectorNamePrefix();
@@ -95,11 +99,11 @@ public abstract class AbstractTestContainer implements 
TestContainer {
 
     protected Container.ExecResult executeJob(GenericContainer<?> container, 
String confFile)
             throws IOException, InterruptedException {
-        return executeJob(container, confFile, null);
+        return executeJob(container, confFile, null, null);
     }
 
     protected Container.ExecResult executeJob(
-            GenericContainer<?> container, String confFile, List<String> 
variables)
+            GenericContainer<?> container, String confFile, String jobId, 
List<String> variables)
             throws IOException, InterruptedException {
         final String confInContainerPath = 
copyConfigFileToContainer(container, confFile);
         // copy connectors
@@ -118,6 +122,10 @@ public abstract class AbstractTestContainer implements 
TestContainer {
         command.add(adaptPathForWin(confInContainerPath));
         command.add("--name");
         command.add(new File(confInContainerPath).getName());
+        if (StringUtils.isNoneEmpty(jobId)) {
+            command.add("--set-job-id");
+            command.add(jobId);
+        }
         List<String> extraStartShellCommands = new 
ArrayList<>(getExtraStartShellCommands());
         if (variables != null && !variables.isEmpty()) {
             variables.forEach(
@@ -142,6 +150,18 @@ public abstract class AbstractTestContainer implements 
TestContainer {
         return executeCommand(container, command);
     }
 
+    protected Container.ExecResult cancelJob(GenericContainer<?> container, 
String jobId)
+            throws IOException, InterruptedException {
+        final List<String> command = new ArrayList<>();
+        String binPath = Paths.get(SEATUNNEL_HOME, "bin", 
getStartShellName()).toString();
+        // base command
+        command.add(adaptPathForWin(binPath));
+        command.add(getCancelJobCommand());
+        command.add(jobId);
+        command.addAll(getExtraStartShellCommands());
+        return executeCommand(container, command);
+    }
+
     protected Container.ExecResult restoreJob(
             GenericContainer<?> container, String confFile, String jobId)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index e83a3635e8..72584158f6 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -39,15 +39,20 @@ public interface TestContainer extends TestResource {
     Container.ExecResult executeJob(String confFile, List<String> variables)
             throws IOException, InterruptedException;
 
+    default Container.ExecResult executeJob(String confFile, String jobId)
+            throws IOException, InterruptedException {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
     default Container.ExecResult executeConnectorCheck(String[] args)
             throws IOException, InterruptedException {
         throw new UnsupportedOperationException("Not implemented");
-    };
+    }
 
     default Container.ExecResult executeBaseCommand(String[] args)
             throws IOException, InterruptedException {
         throw new UnsupportedOperationException("Not implemented");
-    };
+    }
 
     default Container.ExecResult savepointJob(String jobId)
             throws IOException, InterruptedException {
@@ -59,6 +64,10 @@ public interface TestContainer extends TestResource {
         throw new UnsupportedOperationException("Not implemented");
     }
 
+    default Container.ExecResult cancelJob(String jobId) throws IOException, 
InterruptedException {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
     String getServerLogs();
 
     void copyFileToContainer(String path, String targetPath);
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index ff16c0c754..47b3de5ff5 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -131,6 +131,11 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
         throw new UnsupportedOperationException("Not implemented");
     }
 
+    @Override
+    protected String getCancelJobCommand() {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
     @Override
     protected String getRestoreCommand() {
         throw new UnsupportedOperationException("Not implemented");
@@ -150,14 +155,14 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
     @Override
     public Container.ExecResult executeJob(String confFile)
             throws IOException, InterruptedException {
-        return executeJob(confFile, null);
+        return executeJob(confFile, Collections.emptyList());
     }
 
     @Override
     public Container.ExecResult executeJob(String confFile, List<String> 
variables)
             throws IOException, InterruptedException {
         log.info("test in container: {}", identifier());
-        return executeJob(jobManager, confFile, variables);
+        return executeJob(jobManager, confFile, null, variables);
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
index 3a27d78d42..ea8bcd8788 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
@@ -189,6 +189,11 @@ public class ConnectorPackageServiceContainer extends 
AbstractTestContainer {
         return "-s";
     }
 
+    @Override
+    protected String getCancelJobCommand() {
+        return "-can";
+    }
+
     @Override
     protected String getRestoreCommand() {
         return "-r";
@@ -220,14 +225,14 @@ public class ConnectorPackageServiceContainer extends 
AbstractTestContainer {
     @Override
     public Container.ExecResult executeJob(String confFile)
             throws IOException, InterruptedException {
-        return executeJob(confFile, null);
+        return executeJob(confFile, Collections.emptyList());
     }
 
     @Override
     public Container.ExecResult executeJob(String confFile, List<String> 
variables)
             throws IOException, InterruptedException {
         log.info("test in container: {}", identifier());
-        return executeJob(server1, confFile, variables);
+        return executeJob(server1, confFile, null, variables);
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 96e5162d7a..14d89571b9 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
+import org.apache.commons.compress.utils.Lists;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -241,6 +242,11 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
         return "-s";
     }
 
+    @Override
+    protected String getCancelJobCommand() {
+        return "-can";
+    }
+
     @Override
     protected String getRestoreCommand() {
         return "-r";
@@ -281,16 +287,27 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
     @Override
     public Container.ExecResult executeJob(String confFile)
             throws IOException, InterruptedException {
-        return executeJob(confFile, null);
+        return executeJob(confFile, Lists.newArrayList());
     }
 
     @Override
     public Container.ExecResult executeJob(String confFile, List<String> 
variables)
             throws IOException, InterruptedException {
+        return executeJob(confFile, null, variables);
+    }
+
+    @Override
+    public Container.ExecResult executeJob(String confFile, String jobId)
+            throws IOException, InterruptedException {
+        return executeJob(confFile, jobId, null);
+    }
+
+    private Container.ExecResult executeJob(String confFile, String jobId, 
List<String> variables)
+            throws IOException, InterruptedException {
         log.info("test in container: {}", identifier());
         List<String> beforeThreads = ContainerUtil.getJVMThreadNames(server);
         runningCount.incrementAndGet();
-        Container.ExecResult result = executeJob(server, confFile, variables);
+        Container.ExecResult result = executeJob(server, confFile, jobId, 
variables);
         if (runningCount.decrementAndGet() > 0) {
             // only check thread when job all finished.
             return result;
@@ -322,7 +339,6 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                                                         
.collect(Collectors.joining()));
                             });
         }
-        //        classLoaderObjectCheck(1);
         return result;
     }
 
@@ -467,6 +483,11 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
         return result;
     }
 
+    @Override
+    public Container.ExecResult cancelJob(String jobId) throws IOException, 
InterruptedException {
+        return cancelJob(server, jobId);
+    }
+
     @Override
     public String getServerLogs() {
         return server.getLogs();
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index 9970ffb3aa..b13851582c 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Stream;
 
@@ -87,6 +88,11 @@ public abstract class AbstractTestSparkContainer extends 
AbstractTestContainer {
         throw new UnsupportedOperationException("Not implemented");
     }
 
+    @Override
+    protected String getCancelJobCommand() {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
     @Override
     protected String getRestoreCommand() {
         throw new UnsupportedOperationException("Not implemented");
@@ -105,14 +111,14 @@ public abstract class AbstractTestSparkContainer extends 
AbstractTestContainer {
     @Override
     public Container.ExecResult executeJob(String confFile)
             throws IOException, InterruptedException {
-        return executeJob(confFile, null);
+        return executeJob(confFile, Collections.emptyList());
     }
 
     @Override
     public Container.ExecResult executeJob(String confFile, List<String> 
variables)
             throws IOException, InterruptedException {
         log.info("test in container: {}", identifier());
-        return executeJob(master, confFile, variables);
+        return executeJob(master, confFile, null, variables);
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
new file mode 100644
index 0000000000..6904593b24
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.util;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class JobIdGenerator {
+
+    public static String newJobId() {
+        return 
String.valueOf(Math.abs(ThreadLocalRandom.current().nextLong()));
+    }
+}

Reply via email to