This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c8dcefc3be [Fix] [Connectors-v2-Paimon] Flink table store failed to 
prepare commit (#6057)
c8dcefc3be is described below

commit c8dcefc3be6190e6833acf85771ebe491d0fd91c
Author: 丑西蒙 <[email protected]>
AuthorDate: Fri Dec 22 16:10:46 2023 +0800

    [Fix] [Connectors-v2-Paimon] Flink table store failed to prepare commit 
(#6057)
---
 .../connectors/seatunnel/paimon/sink/PaimonSinkWriter.java     | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index f6255465f5..6d4b0c6147 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -33,7 +33,6 @@ import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.TableWrite;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -52,7 +51,7 @@ public class PaimonSinkWriter
 
     private String commitUser = UUID.randomUUID().toString();
 
-    private TableWrite tableWrite = null;
+    private final BatchTableWrite tableWrite;
 
     private long checkpointId = 0;
 
@@ -66,6 +65,7 @@ public class PaimonSinkWriter
 
     public PaimonSinkWriter(Context context, Table table, SeaTunnelRowType 
seaTunnelRowType) {
         this.table = table;
+        this.tableWrite = this.table.newBatchWriteBuilder().newWrite();
         this.seaTunnelRowType = seaTunnelRowType;
         this.context = context;
     }
@@ -76,6 +76,7 @@ public class PaimonSinkWriter
             SeaTunnelRowType seaTunnelRowType,
             List<PaimonSinkState> states) {
         this.table = table;
+        this.tableWrite = this.table.newBatchWriteBuilder().newWrite();
         this.seaTunnelRowType = seaTunnelRowType;
         this.context = context;
         if (Objects.isNull(states) || states.isEmpty()) {
@@ -101,9 +102,6 @@ public class PaimonSinkWriter
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
-        if (Objects.isNull(tableWrite)) {
-            tableWrite = table.newBatchWriteBuilder().newWrite();
-        }
         InternalRow rowData = RowConverter.convert(element, seaTunnelRowType);
         try {
             tableWrite.write(rowData);
@@ -118,7 +116,7 @@ public class PaimonSinkWriter
     @Override
     public Optional<PaimonCommitInfo> prepareCommit() throws IOException {
         try {
-            List<CommitMessage> fileCommittables = ((BatchTableWrite) 
tableWrite).prepareCommit();
+            List<CommitMessage> fileCommittables = tableWrite.prepareCommit();
             committables.addAll(fileCommittables);
             return Optional.of(new PaimonCommitInfo(fileCommittables));
         } catch (Exception e) {

Reply via email to