This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f22d4ebd4d [Improve][Connector-v2] Support checkpoint in batch mode 
for paimon sink (#8333)
f22d4ebd4d is described below

commit f22d4ebd4da87dfb5572122f12cfe02b8f2f0a8d
Author: dailai <[email protected]>
AuthorDate: Mon Dec 23 20:23:26 2024 +0800

    [Improve][Connector-v2] Support checkpoint in batch mode for paimon sink 
(#8333)
---
 docs/en/connector-v2/sink/Paimon.md                |  5 ++
 docs/zh/connector-v2/sink/Paimon.md                |  5 ++
 .../seatunnel/paimon/sink/PaimonSink.java          | 11 ++-
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    | 41 +++-------
 .../sink/commit/PaimonAggregatedCommitter.java     | 51 +++---------
 .../seatunnel/paimon/utils/JobContextUtil.java     | 32 --------
 .../e2e/connector/paimon/PaimonWithS3IT.java       | 12 ++-
 .../resources/fake_2_paimon_with_s3_to_assert.conf | 91 ++++++++++++++++++++++
 .../fake_to_paimon_with_s3_with_checkpoint.conf    | 63 +++++++++++++++
 9 files changed, 203 insertions(+), 108 deletions(-)

diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index f2a68ae3b8..68aa63ad03 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -47,6 +47,11 @@ libfb303-xxx.jar
 | paimon.hadoop.conf          | Map    | No       | -                          
  | Properties in hadoop conf                                                   
                                                                                
     |
 | paimon.hadoop.conf-path     | String | No       | -                          
  | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files                                                           
            |
 
+## Checkpoint in batch mode
+
+When you set `checkpoint.interval` to a value greater than 0 in batch mode, 
the paimon connector will commit the data to the paimon table when the 
checkpoint triggers after a certain number of records have been written. At 
this moment, the written data in paimon that is visible. 
+However, if you do not set `checkpoint.interval` in batch mode, the paimon 
sink connector will commit the data after all records are written. The written 
data in paimon that is not visible until the batch task completes.
+
 ## Changelog
 You must configure the `changelog-producer=input` option to enable the 
changelog producer mode of the paimon table. If you use the auto-create table 
function of paimon sink, you can configure this property in 
`paimon.table.write-props`.
 
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
index 1faa5dc9b0..157c1fa5e8 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -46,6 +46,11 @@ libfb303-xxx.jar
 | paimon.hadoop.conf          | Map  | 否    | -                            | 
Hadoop配置文件属性信息                                                                  
                      |
 | paimon.hadoop.conf-path     | 字符串  | 否    | -                            | 
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置          
                      |
 
+## 批模式下的checkpoint
+
+当您在批处理模式下将`checkpoint.interval`设置为大于0的值时,在写入一定数量的记录后checkpoint触发时,paimon连接器将把数据提交到paimon表。此时,写入的数据是可见的。
+但是,如果您没有在批处理模式下设置`checkpoint.interval`,则在写入所有记录之后,paimon 
sink连接器将提交数据。到批任务完成之前,写入的数据都是不可见的。
+
 ## 更新日志
 你必须配置`changelog-producer=input`来启用paimon表的changelog产生模式。如果你使用了paimon 
sink的自动建表功能,你可以在`paimon.table.write-props`中指定这个属性。
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index d657810c95..0129438c83 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -67,13 +67,13 @@ public class PaimonSink
 
     private JobContext jobContext;
 
-    private ReadonlyConfig readonlyConfig;
+    private final ReadonlyConfig readonlyConfig;
 
-    private PaimonSinkConfig paimonSinkConfig;
+    private final PaimonSinkConfig paimonSinkConfig;
 
-    private CatalogTable catalogTable;
+    private final CatalogTable catalogTable;
 
-    private PaimonHadoopConfiguration paimonHadoopConfiguration;
+    private final PaimonHadoopConfiguration paimonHadoopConfiguration;
 
     public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
         this.readonlyConfig = readonlyConfig;
@@ -102,8 +102,7 @@ public class PaimonSink
     @Override
     public Optional<SinkAggregatedCommitter<PaimonCommitInfo, 
PaimonAggregatedCommitInfo>>
             createAggregatedCommitter() throws IOException {
-        return Optional.of(
-                new PaimonAggregatedCommitter(paimonTable, jobContext, 
paimonHadoopConfiguration));
+        return Optional.of(new PaimonAggregatedCommitter(paimonTable, 
paimonHadoopConfiguration));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index b208a916bb..bc02cc4bf3 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -39,7 +39,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucket
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.schema.handler.AlterPaimonTableSchemaEventHandler;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
-import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
 
 import org.apache.paimon.CoreOptions;
@@ -49,8 +48,6 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchTableCommit;
-import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
@@ -89,10 +86,6 @@ public class PaimonSinkWriter
 
     private SeaTunnelRowType seaTunnelRowType;
 
-    private final SinkWriter.Context context;
-
-    private final JobContext jobContext;
-
     private org.apache.seatunnel.api.table.catalog.TableSchema 
sourceTableSchema;
 
     private TableSchema sinkPaimonTableSchema;
@@ -133,8 +126,6 @@ public class PaimonSinkWriter
         }
         this.paimonSinkConfig = paimonSinkConfig;
         this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
-        this.context = context;
-        this.jobContext = jobContext;
         this.newTableWrite();
         BucketMode bucketMode = this.paimonFileStoretable.bucketMode();
         this.dynamicBucket =
@@ -147,8 +138,8 @@ public class PaimonSinkWriter
             this.bucketAssigner =
                     new PaimonBucketAssigner(
                             paimonFileStoretable,
-                            this.context.getNumberOfParallelSubtasks(),
-                            this.context.getIndexOfSubtask());
+                            context.getNumberOfParallelSubtasks(),
+                            context.getIndexOfSubtask());
         }
         PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
     }
@@ -181,14 +172,13 @@ public class PaimonSinkWriter
                             .map(PaimonSinkState::getCommittables)
                             .flatMap(List::stream)
                             .collect(Collectors.toList());
-            log.info("Trying to recommit states {}", commitables);
-            if (JobContextUtil.isBatchJob(jobContext)) {
-                log.debug("Trying to recommit states batch mode");
-                ((BatchTableCommit) tableCommit).commit(commitables);
-            } else {
-                log.debug("Trying to recommit states streaming mode");
-                ((StreamTableCommit) tableCommit).commit(checkpointId, 
commitables);
+            // batch mode without checkpoint has no state to commit
+            if (commitables.isEmpty()) {
+                return;
             }
+            // streaming mode or batch mode with checkpoint need to recommit 
by stream api
+            log.info("Trying to recommit states {}", commitables);
+            ((StreamTableCommit) tableCommit).commit(checkpointId, 
commitables);
         } catch (Exception e) {
             throw new PaimonConnectorException(
                     PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
@@ -238,10 +228,7 @@ public class PaimonSinkWriter
     }
 
     private void newTableWrite() {
-        this.tableWriteBuilder =
-                JobContextUtil.isBatchJob(jobContext)
-                        ? this.paimonFileStoretable.newBatchWriteBuilder()
-                        : this.paimonFileStoretable.newStreamWriteBuilder();
+        this.tableWriteBuilder = 
this.paimonFileStoretable.newStreamWriteBuilder();
         TableWrite oldTableWrite = this.tableWrite;
         this.tableWrite =
                 tableWriteBuilder
@@ -260,14 +247,8 @@ public class PaimonSinkWriter
     @Override
     public Optional<PaimonCommitInfo> prepareCommit(long checkpointId) throws 
IOException {
         try {
-            List<CommitMessage> fileCommittables;
-            if (JobContextUtil.isBatchJob(jobContext)) {
-                fileCommittables = ((BatchTableWrite) 
tableWrite).prepareCommit();
-            } else {
-                fileCommittables =
-                        ((StreamTableWrite) tableWrite)
-                                .prepareCommit(waitCompaction(), checkpointId);
-            }
+            List<CommitMessage> fileCommittables =
+                    ((StreamTableWrite) 
tableWrite).prepareCommit(waitCompaction(), checkpointId);
             committables.addAll(fileCommittables);
             return Optional.of(new PaimonCommitInfo(fileCommittables, 
checkpointId));
         } catch (Exception e) {
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index 8009135346..e452b6cf03 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -17,17 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit;
 
-import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
-import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
 
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.TableCommit;
@@ -41,7 +38,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.stream.Collectors;
 
 /** Paimon connector aggregated committer class */
 @Slf4j
@@ -53,17 +49,9 @@ public class PaimonAggregatedCommitter
 
     private final WriteBuilder tableWriteBuilder;
 
-    private final JobContext jobContext;
-
     public PaimonAggregatedCommitter(
-            Table table,
-            JobContext jobContext,
-            PaimonHadoopConfiguration paimonHadoopConfiguration) {
-        this.jobContext = jobContext;
-        this.tableWriteBuilder =
-                JobContextUtil.isBatchJob(jobContext)
-                        ? table.newBatchWriteBuilder()
-                        : table.newStreamWriteBuilder();
+            Table table, PaimonHadoopConfiguration paimonHadoopConfiguration) {
+        this.tableWriteBuilder = table.newStreamWriteBuilder();
         PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
     }
 
@@ -73,31 +61,16 @@ public class PaimonAggregatedCommitter
         try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
             PaimonSecurityContext.runSecured(
                     () -> {
-                        if (JobContextUtil.isBatchJob(jobContext)) {
-                            log.debug("Trying to commit states batch mode");
-                            List<CommitMessage> fileCommittables =
-                                    aggregatedCommitInfo.stream()
-                                            .flatMap(
-                                                    info ->
-                                                            
info.getCommittablesMap().values()
-                                                                    .stream())
-                                            .flatMap(List::stream)
-                                            .collect(Collectors.toList());
-                            ((BatchTableCommit) 
tableCommit).commit(fileCommittables);
-                        } else {
-                            log.debug("Trying to commit states streaming 
mode");
-                            aggregatedCommitInfo.stream()
-                                    .flatMap(
-                                            paimonAggregatedCommitInfo ->
-                                                    
paimonAggregatedCommitInfo.getCommittablesMap()
-                                                            
.entrySet().stream())
-                                    .forEach(
-                                            entry ->
-                                                    ((StreamTableCommit) 
tableCommit)
-                                                            .commit(
-                                                                    
entry.getKey(),
-                                                                    
entry.getValue()));
-                        }
+                        log.debug("Trying to commit states streaming mode");
+                        aggregatedCommitInfo.stream()
+                                .flatMap(
+                                        paimonAggregatedCommitInfo ->
+                                                
paimonAggregatedCommitInfo.getCommittablesMap()
+                                                        .entrySet().stream())
+                                .forEach(
+                                        entry ->
+                                                ((StreamTableCommit) 
tableCommit)
+                                                        
.commit(entry.getKey(), entry.getValue()));
                         return null;
                     });
         } catch (Exception e) {
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
deleted file mode 100644
index 3a4d9b72d4..0000000000
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.common.constants.JobMode;
-
-import lombok.extern.slf4j.Slf4j;
-
-/** Job env util */
-@Slf4j
-public class JobContextUtil {
-
-    public static boolean isBatchJob(JobContext jobContext) {
-        return jobContext.getJobMode().equals(JobMode.BATCH);
-    }
-}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
index a618aad8b3..a939955cc0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
@@ -122,11 +122,21 @@ public class PaimonWithS3IT extends SeaTunnelContainer {
     }
 
     @Test
-    public void testFaceCDCSinkPaimonWithS3Filesystem() throws Exception {
+    public void testFakeCDCSinkPaimonWithS3Filesystem() throws Exception {
         Container.ExecResult execResult = 
executeJob("/fake_to_paimon_with_s3.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
 
         Container.ExecResult readResult = 
executeJob("/paimon_with_s3_to_assert.conf");
         Assertions.assertEquals(0, readResult.getExitCode());
     }
+
+    @Test
+    public void 
testFakeCDCSinkPaimonWithCheckpointInBatchModeWithS3Filesystem() throws 
Exception {
+        Container.ExecResult execResult =
+                executeJob("/fake_to_paimon_with_s3_with_checkpoint.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Container.ExecResult readResult = 
executeJob("/fake_2_paimon_with_s3_to_assert.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_2_paimon_with_s3_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_2_paimon_with_s3_to_assert.conf
new file mode 100644
index 0000000000..e046f100e0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_2_paimon_with_s3_to_assert.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "s3a://test/"
+    database = "seatunnel_namespace12"
+    table = "st_test"
+    paimon.hadoop.conf = {
+        fs.s3a.access-key=minio
+        fs.s3a.secret-key=miniominio
+        fs.s3a.endpoint="http://minio:9000";
+        fs.s3a.path.style.access=true
+        
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+    }
+  }
+}
+
+sink {
+ Assert {
+    rules {
+          row_rules = [
+            {
+              rule_type = MAX_ROW
+              rule_value = 5000
+            }
+          ],
+          field_rules = [
+            {
+              field_name = pk_id
+              field_type = bigint
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                },
+                {
+                  rule_type = MIN
+                  rule_value = 1
+                },
+                {
+                  rule_type = MAX
+                  rule_value = 100000
+                }
+              ]
+            },
+            {
+              field_name = name
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+             {
+               field_name = score
+               field_type = int
+               field_value = [
+                 {
+                   rule_type = NOT_NULL
+                 }
+               ]
+             }
+          ]
+        }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
new file mode 100644
index 0000000000..dc2585abc9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+  checkpoint.interval = 5000
+}
+
+source {
+  FakeSource {
+    row.num = 5000
+    split.num = 10
+    split.read-interval = 1000
+    bigint.min = 1
+    bigint.max = 100000
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "s3a://test/"
+    database = "seatunnel_namespace12"
+    table = "st_test"
+    paimon.hadoop.conf = {
+        fs.s3a.access-key=minio
+        fs.s3a.secret-key=miniominio
+        fs.s3a.endpoint="http://minio:9000";
+        fs.s3a.path.style.access=true
+        
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+    }
+  }
+}

Reply via email to