This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b32df930e9 [Feature][Connectors-v2-Paimon] Adaptation Paimon 0.6
Version (#6061)
b32df930e9 is described below
commit b32df930e9edacdbedb68ef36574137f719dedb2
Author: 丑西蒙 <[email protected]>
AuthorDate: Tue Dec 26 21:30:57 2023 +0800
[Feature][Connectors-v2-Paimon] Adaptation Paimon 0.6 Version (#6061)
---
seatunnel-connectors-v2/connector-paimon/pom.xml | 2 +-
.../connectors/seatunnel/paimon/sink/PaimonSink.java | 6 ++++++
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 15 ++++++++-------
.../paimon/sink/commit/PaimonAggregatedCommitter.java | 4 +---
4 files changed, 16 insertions(+), 11 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml
b/seatunnel-connectors-v2/connector-paimon/pom.xml
index 8f6fe7c882..8bcb1c3507 100644
--- a/seatunnel-connectors-v2/connector-paimon/pom.xml
+++ b/seatunnel-connectors-v2/connector-paimon/pom.xml
@@ -30,7 +30,7 @@
<name>SeaTunnel : Connectors V2 : Paimon</name>
<properties>
- <paimon.version>0.4.0-incubating</paimon.version>
+ <paimon.version>0.6.0-incubating</paimon.version>
</properties>
<dependencies>
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index fec1a7d7ce..ac1a0b97ed 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -123,6 +124,11 @@ public class PaimonSink
this.seaTunnelRowType = seaTunnelRowType;
}
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
@Override
public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>
createWriter(
SinkWriter.Context context) throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 6d4b0c6147..930f62045f 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -27,12 +27,11 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkSta
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.operation.Lock;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.InnerTableCommit;
import lombok.extern.slf4j.Slf4j;
@@ -51,6 +50,8 @@ public class PaimonSinkWriter
private String commitUser = UUID.randomUUID().toString();
+ private final BatchWriteBuilder tableWriteBuilder;
+
private final BatchTableWrite tableWrite;
private long checkpointId = 0;
@@ -65,7 +66,8 @@ public class PaimonSinkWriter
public PaimonSinkWriter(Context context, Table table, SeaTunnelRowType
seaTunnelRowType) {
this.table = table;
- this.tableWrite = this.table.newBatchWriteBuilder().newWrite();
+ this.tableWriteBuilder =
this.table.newBatchWriteBuilder().withOverwrite();
+ this.tableWrite = tableWriteBuilder.newWrite();
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
}
@@ -76,7 +78,8 @@ public class PaimonSinkWriter
SeaTunnelRowType seaTunnelRowType,
List<PaimonSinkState> states) {
this.table = table;
- this.tableWrite = this.table.newBatchWriteBuilder().newWrite();
+ this.tableWriteBuilder =
this.table.newBatchWriteBuilder().withOverwrite();
+ this.tableWrite = tableWriteBuilder.newWrite();
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
if (Objects.isNull(states) || states.isEmpty()) {
@@ -84,9 +87,7 @@ public class PaimonSinkWriter
}
this.commitUser = states.get(0).getCommitUser();
this.checkpointId = states.get(0).getCheckpointId();
- try (BatchTableCommit tableCommit =
- ((InnerTableCommit) table.newBatchWriteBuilder().newCommit())
- .withLock(Lock.emptyFactory().create())) {
+ try (BatchTableCommit tableCommit = tableWriteBuilder.newCommit()) {
List<CommitMessage> commitables =
states.stream()
.map(PaimonSinkState::getCommittables)
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index 96cdd93fc6..987d8fbb80 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -25,7 +25,6 @@ import org.apache.paimon.operation.Lock;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.InnerTableCommit;
import lombok.extern.slf4j.Slf4j;
@@ -54,8 +53,7 @@ public class PaimonAggregatedCommitter
public List<PaimonAggregatedCommitInfo> commit(
List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) throws
IOException {
try (BatchTableCommit tableCommit =
- ((InnerTableCommit) table.newBatchWriteBuilder().newCommit())
- .withLock(localFactory.create())) {
+ table.newBatchWriteBuilder().withOverwrite().newCommit()) {
List<CommitMessage> fileCommittables =
aggregatedCommitInfo.stream()
.map(PaimonAggregatedCommitInfo::getCommittables)