This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b32df930e9 [Feature][Connectors-v2-Paimon] Adaptation Paimon 0.6 
Version (#6061)
b32df930e9 is described below

commit b32df930e9edacdbedb68ef36574137f719dedb2
Author: 丑西蒙 <[email protected]>
AuthorDate: Tue Dec 26 21:30:57 2023 +0800

    [Feature][Connectors-v2-Paimon] Adaptation Paimon 0.6 Version (#6061)
---
 seatunnel-connectors-v2/connector-paimon/pom.xml          |  2 +-
 .../connectors/seatunnel/paimon/sink/PaimonSink.java      |  6 ++++++
 .../seatunnel/paimon/sink/PaimonSinkWriter.java           | 15 ++++++++-------
 .../paimon/sink/commit/PaimonAggregatedCommitter.java     |  4 +---
 4 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml 
b/seatunnel-connectors-v2/connector-paimon/pom.xml
index 8f6fe7c882..8bcb1c3507 100644
--- a/seatunnel-connectors-v2/connector-paimon/pom.xml
+++ b/seatunnel-connectors-v2/connector-paimon/pom.xml
@@ -30,7 +30,7 @@
     <name>SeaTunnel : Connectors V2 : Paimon</name>
 
     <properties>
-        <paimon.version>0.4.0-incubating</paimon.version>
+        <paimon.version>0.6.0-incubating</paimon.version>
     </properties>
 
     <dependencies>
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index fec1a7d7ce..ac1a0b97ed 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -123,6 +124,11 @@ public class PaimonSink
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
     @Override
     public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 6d4b0c6147..930f62045f 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -27,12 +27,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkSta
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.operation.Lock;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.InnerTableCommit;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -51,6 +50,8 @@ public class PaimonSinkWriter
 
     private String commitUser = UUID.randomUUID().toString();
 
+    private final BatchWriteBuilder tableWriteBuilder;
+
     private final BatchTableWrite tableWrite;
 
     private long checkpointId = 0;
@@ -65,7 +66,8 @@ public class PaimonSinkWriter
 
     public PaimonSinkWriter(Context context, Table table, SeaTunnelRowType 
seaTunnelRowType) {
         this.table = table;
-        this.tableWrite = this.table.newBatchWriteBuilder().newWrite();
+        this.tableWriteBuilder = 
this.table.newBatchWriteBuilder().withOverwrite();
+        this.tableWrite = tableWriteBuilder.newWrite();
         this.seaTunnelRowType = seaTunnelRowType;
         this.context = context;
     }
@@ -76,7 +78,8 @@ public class PaimonSinkWriter
             SeaTunnelRowType seaTunnelRowType,
             List<PaimonSinkState> states) {
         this.table = table;
-        this.tableWrite = this.table.newBatchWriteBuilder().newWrite();
+        this.tableWriteBuilder = 
this.table.newBatchWriteBuilder().withOverwrite();
+        this.tableWrite = tableWriteBuilder.newWrite();
         this.seaTunnelRowType = seaTunnelRowType;
         this.context = context;
         if (Objects.isNull(states) || states.isEmpty()) {
@@ -84,9 +87,7 @@ public class PaimonSinkWriter
         }
         this.commitUser = states.get(0).getCommitUser();
         this.checkpointId = states.get(0).getCheckpointId();
-        try (BatchTableCommit tableCommit =
-                ((InnerTableCommit) table.newBatchWriteBuilder().newCommit())
-                        .withLock(Lock.emptyFactory().create())) {
+        try (BatchTableCommit tableCommit = tableWriteBuilder.newCommit()) {
             List<CommitMessage> commitables =
                     states.stream()
                             .map(PaimonSinkState::getCommittables)
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index 96cdd93fc6..987d8fbb80 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -25,7 +25,6 @@ import org.apache.paimon.operation.Lock;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.InnerTableCommit;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -54,8 +53,7 @@ public class PaimonAggregatedCommitter
     public List<PaimonAggregatedCommitInfo> commit(
             List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) throws 
IOException {
         try (BatchTableCommit tableCommit =
-                ((InnerTableCommit) table.newBatchWriteBuilder().newCommit())
-                        .withLock(localFactory.create())) {
+                table.newBatchWriteBuilder().withOverwrite().newCommit()) {
             List<CommitMessage> fileCommittables =
                     aggregatedCommitInfo.stream()
                             .map(PaimonAggregatedCommitInfo::getCommittables)

Reply via email to