This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c7a384af2b [Improve][Connector-v2] Use checkpointId as the commit's 
identifier instead of the hash for streaming write of paimon sink (#7835)
c7a384af2b is described below

commit c7a384af2b8bff37861ec249bff6670100bda9b2
Author: dailai <[email protected]>
AuthorDate: Tue Oct 15 17:26:51 2024 +0800

    [Improve][Connector-v2] Use checkpointId as the commit's identifier instead 
of the hash for streaming write of paimon sink (#7835)
---
 .../org/apache/seatunnel/api/sink/SinkWriter.java  | 15 ++++++++
 .../sink/multitablesink/MultiTableSinkWriter.java  |  9 ++++-
 .../multitablesink/MultiTableSinkWriterTest.java   |  2 +-
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    | 12 +++++--
 .../sink/commit/PaimonAggregatedCommitInfo.java    |  4 ++-
 .../sink/commit/PaimonAggregatedCommitter.java     | 42 +++++++++++++++-------
 .../paimon/sink/commit/PaimonCommitInfo.java       |  2 ++
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  2 +-
 .../translation/flink/sink/FlinkSinkWriter.java    |  2 +-
 .../spark/sink/writer/SparkDataWriter.java         |  2 +-
 .../spark/sink/write/SeaTunnelSparkDataWriter.java |  2 +-
 11 files changed, 72 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 4567e98cbf..330580b980 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -62,8 +62,23 @@ public interface SinkWriter<T, CommitInfoT, StateT> {
      *
      * @return the commit info need to commit
      */
+    @Deprecated
     Optional<CommitInfoT> prepareCommit() throws IOException;
 
+    /**
+     * prepare the commit, will be called before {@link #snapshotState(long 
checkpointId)}. If you
+     * need to use 2pc, you can return the commit info in this method, and 
receive the commit info
+     * in {@link SinkCommitter#commit(List)}. If this method failed (by throw 
exception), **Only**
+     * Spark engine will call {@link #abortPrepare()}
+     *
+     * @param checkpointId checkpointId
+     * @return the commit info need to commit
+     * @throws IOException If fail to prepareCommit
+     */
+    default Optional<CommitInfoT> prepareCommit(long checkpointId) throws 
IOException {
+        return prepareCommit();
+    }
+
     /**
      * @return The writer's state.
      * @throws IOException if fail to snapshot writer's state.
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index f01c3d65dc..f5b30be537 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -220,6 +220,11 @@ public class MultiTableSinkWriter
 
     @Override
     public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<MultiTableCommitInfo> prepareCommit(long checkpointId) 
throws IOException {
         checkQueueRemain();
         subSinkErrorCheck();
         MultiTableCommitInfo multiTableCommitInfo =
@@ -238,7 +243,9 @@ public class MultiTableSinkWriter
                                                             .entrySet()) {
                                         Optional<?> commit;
                                         try {
-                                            commit = 
sinkWriterEntry.getValue().prepareCommit();
+                                            SinkWriter<SeaTunnelRow, ?, ?> 
sinkWriter =
+                                                    sinkWriterEntry.getValue();
+                                            commit = 
sinkWriter.prepareCommit(checkpointId);
                                         } catch (IOException e) {
                                             throw new RuntimeException(e);
                                         }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
index 66e0ff0d4e..86722eb246 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
@@ -57,7 +57,7 @@ public class MultiTableSinkWriterTest {
         DefaultSerializer<Serializable> defaultSerializer = new 
DefaultSerializer<>();
 
         for (int i = 0; i < 100; i++) {
-            byte[] bytes = 
defaultSerializer.serialize(multiTableSinkWriter.prepareCommit().get());
+            byte[] bytes = 
defaultSerializer.serialize(multiTableSinkWriter.prepareCommit(i).get());
             defaultSerializer.deserialize(bytes);
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 7a3fe6d033..8cc6d0d485 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -130,6 +130,7 @@ public class PaimonSinkWriter
             return;
         }
         this.commitUser = states.get(0).getCommitUser();
+        long checkpointId = states.get(0).getCheckpointId();
         try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
             List<CommitMessage> commitables =
                     states.stream()
@@ -142,7 +143,7 @@ public class PaimonSinkWriter
                 ((BatchTableCommit) tableCommit).commit(commitables);
             } else {
                 log.debug("Trying to recommit states streaming mode");
-                ((StreamTableCommit) 
tableCommit).commit(Objects.hash(commitables), commitables);
+                ((StreamTableCommit) tableCommit).commit(checkpointId, 
commitables);
             }
         } catch (Exception e) {
             throw new PaimonConnectorException(
@@ -174,16 +175,21 @@ public class PaimonSinkWriter
 
     @Override
     public Optional<PaimonCommitInfo> prepareCommit() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PaimonCommitInfo> prepareCommit(long checkpointId) throws 
IOException {
         try {
             List<CommitMessage> fileCommittables;
             if (JobContextUtil.isBatchJob(jobContext)) {
                 fileCommittables = ((BatchTableWrite) 
tableWrite).prepareCommit();
             } else {
                 fileCommittables =
-                        ((StreamTableWrite) tableWrite).prepareCommit(false, 
committables.size());
+                        ((StreamTableWrite) tableWrite).prepareCommit(false, 
checkpointId);
             }
             committables.addAll(fileCommittables);
-            return Optional.of(new PaimonCommitInfo(fileCommittables));
+            return Optional.of(new PaimonCommitInfo(fileCommittables, 
checkpointId));
         } catch (Exception e) {
             throw new PaimonConnectorException(
                     PaimonConnectorErrorCode.TABLE_PRE_COMMIT_FAILED,
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
index 8a7ad84a2e..83ed71f615 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
@@ -24,6 +24,7 @@ import lombok.Data;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 
 /** Paimon connector aggregate commit information class. */
 @Data
@@ -32,5 +33,6 @@ public class PaimonAggregatedCommitInfo implements 
Serializable {
 
     private static final long serialVersionUID = 1;
 
-    private List<List<CommitMessage>> committables;
+    // key: checkpointId value: Paimon commit message List
+    private Map<Long, List<CommitMessage>> committablesMap;
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index 5c3f68f336..a3e457907e 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -36,10 +36,11 @@ import org.apache.paimon.table.sink.WriteBuilder;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
 /** Paimon connector aggregated committer class */
@@ -70,21 +71,32 @@ public class PaimonAggregatedCommitter
     public List<PaimonAggregatedCommitInfo> commit(
             List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) throws 
IOException {
         try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
-            List<CommitMessage> fileCommittables =
-                    aggregatedCommitInfo.stream()
-                            .map(PaimonAggregatedCommitInfo::getCommittables)
-                            .flatMap(List::stream)
-                            .flatMap(List::stream)
-                            .collect(Collectors.toList());
             PaimonSecurityContext.runSecured(
                     () -> {
                         if (JobContextUtil.isBatchJob(jobContext)) {
                             log.debug("Trying to commit states batch mode");
+                            List<CommitMessage> fileCommittables =
+                                    aggregatedCommitInfo.stream()
+                                            .flatMap(
+                                                    info ->
+                                                            
info.getCommittablesMap().values()
+                                                                    .stream())
+                                            .flatMap(List::stream)
+                                            .collect(Collectors.toList());
                             ((BatchTableCommit) 
tableCommit).commit(fileCommittables);
                         } else {
                             log.debug("Trying to commit states streaming 
mode");
-                            ((StreamTableCommit) tableCommit)
-                                    .commit(Objects.hash(fileCommittables), 
fileCommittables);
+                            aggregatedCommitInfo.stream()
+                                    .flatMap(
+                                            paimonAggregatedCommitInfo ->
+                                                    
paimonAggregatedCommitInfo.getCommittablesMap()
+                                                            
.entrySet().stream())
+                                    .forEach(
+                                            entry ->
+                                                    ((StreamTableCommit) 
tableCommit)
+                                                            .commit(
+                                                                    
entry.getKey(),
+                                                                    
entry.getValue()));
                         }
                         return null;
                     });
@@ -99,8 +111,14 @@ public class PaimonAggregatedCommitter
 
     @Override
     public PaimonAggregatedCommitInfo combine(List<PaimonCommitInfo> 
commitInfos) {
-        List<List<CommitMessage>> committables = new ArrayList<>();
-        commitInfos.forEach(commitInfo -> 
committables.add(commitInfo.getCommittables()));
+        Map<Long, List<CommitMessage>> committables = new HashMap<>();
+        commitInfos.forEach(
+                commitInfo ->
+                        committables
+                                .computeIfAbsent(
+                                        commitInfo.getCheckpointId(),
+                                        id -> new CopyOnWriteArrayList<>())
+                                .addAll(commitInfo.getCommittables()));
         return new PaimonAggregatedCommitInfo(committables);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
index 9927973821..1d9844103f 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
@@ -32,4 +32,6 @@ public class PaimonCommitInfo implements Serializable {
     private static final long serialVersionUID = 1L;
 
     List<CommitMessage> committables;
+
+    Long checkpointId;
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index bce6e9f637..5970d9a745 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -185,7 +185,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                 }
                 if (barrier.snapshot()) {
                     try {
-                        lastCommitInfo = writer.prepareCommit();
+                        lastCommitInfo = writer.prepareCommit(barrier.getId());
                     } catch (Exception e) {
                         writer.abortPrepare();
                         throw e;
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 7a47052c01..3ee6e3533c 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -101,7 +101,7 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
 
     @Override
     public List<CommitWrapper<CommT>> prepareCommit(boolean flush) throws 
IOException {
-        Optional<CommT> commTOptional = sinkWriter.prepareCommit();
+        Optional<CommT> commTOptional = sinkWriter.prepareCommit(checkpointId);
         return commTOptional
                 .map(CommitWrapper::new)
                 .map(Collections::singletonList)
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
index a9eac50062..c34a0783eb 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
@@ -87,7 +87,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements 
DataWriter<Internal
         // 2. commit fails
         //   2.1. We have the commit info, we need to execute the 
sinkCommitter#abort to rollback
         // the transaction.
-        Optional<CommitInfoT> commitInfoTOptional = sinkWriter.prepareCommit();
+        Optional<CommitInfoT> commitInfoTOptional = 
sinkWriter.prepareCommit(epochId);
         commitInfoTOptional.ifPresent(commitInfoT -> latestCommitInfoT = 
commitInfoT);
         sinkWriter.snapshotState(epochId++);
         if (sinkCommitter != null) {
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
index c2c24aa914..1a97f6e618 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
@@ -79,7 +79,7 @@ public class SeaTunnelSparkDataWriter<CommitInfoT, StateT> 
implements DataWriter
 
     @Override
     public WriterCommitMessage commit() throws IOException {
-        Optional<CommitInfoT> commitInfoTOptional = sinkWriter.prepareCommit();
+        Optional<CommitInfoT> commitInfoTOptional = 
sinkWriter.prepareCommit(epochId);
         commitInfoTOptional.ifPresent(commitInfoT -> latestCommitInfoT = 
commitInfoT);
         sinkWriter.snapshotState(epochId++);
         if (sinkCommitter != null) {

Reply via email to