This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c8dcefc3be [Fix] [Connectors-v2-Paimon] Flink table store failed to
prepare commit (#6057)
c8dcefc3be is described below
commit c8dcefc3be6190e6833acf85771ebe491d0fd91c
Author: 丑西蒙 <[email protected]>
AuthorDate: Fri Dec 22 16:10:46 2023 +0800
[Fix] [Connectors-v2-Paimon] Flink table store failed to prepare commit
(#6057)
---
.../connectors/seatunnel/paimon/sink/PaimonSinkWriter.java | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index f6255465f5..6d4b0c6147 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -33,7 +33,6 @@ import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.TableWrite;
import lombok.extern.slf4j.Slf4j;
@@ -52,7 +51,7 @@ public class PaimonSinkWriter
private String commitUser = UUID.randomUUID().toString();
- private TableWrite tableWrite = null;
+ private final BatchTableWrite tableWrite;
private long checkpointId = 0;
@@ -66,6 +65,7 @@ public class PaimonSinkWriter
public PaimonSinkWriter(Context context, Table table, SeaTunnelRowType
seaTunnelRowType) {
this.table = table;
+ this.tableWrite = this.table.newBatchWriteBuilder().newWrite();
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
}
@@ -76,6 +76,7 @@ public class PaimonSinkWriter
SeaTunnelRowType seaTunnelRowType,
List<PaimonSinkState> states) {
this.table = table;
+ this.tableWrite = this.table.newBatchWriteBuilder().newWrite();
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
if (Objects.isNull(states) || states.isEmpty()) {
@@ -101,9 +102,6 @@ public class PaimonSinkWriter
@Override
public void write(SeaTunnelRow element) throws IOException {
- if (Objects.isNull(tableWrite)) {
- tableWrite = table.newBatchWriteBuilder().newWrite();
- }
InternalRow rowData = RowConverter.convert(element, seaTunnelRowType);
try {
tableWrite.write(rowData);
@@ -118,7 +116,7 @@ public class PaimonSinkWriter
@Override
public Optional<PaimonCommitInfo> prepareCommit() throws IOException {
try {
- List<CommitMessage> fileCommittables = ((BatchTableWrite)
tableWrite).prepareCommit();
+ List<CommitMessage> fileCommittables = tableWrite.prepareCommit();
committables.addAll(fileCommittables);
return Optional.of(new PaimonCommitInfo(fileCommittables));
} catch (Exception e) {