This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c4b18dbb24 [Feature] Add `init` and `restoreCommit` method in 
`SinkAggregatedCommitter` (#5598)
c4b18dbb24 is described below

commit c4b18dbb2497966d99caba2b848f82b9fe747550
Author: Jia Fan <[email protected]>
AuthorDate: Tue Oct 10 10:16:03 2023 +0800

    [Feature] Add `init` and `restoreCommit` method in 
`SinkAggregatedCommitter` (#5598)
---
 .../org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java   | 9 +++++++++
 .../engine/server/task/SinkAggregatedCommitterTask.java          | 8 +++++++-
 .../seatunnel/translation/flink/sink/FlinkGlobalCommitter.java   | 1 +
 .../translation/spark/sink/writer/SparkDataSourceWriter.java     | 3 +++
 4 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index e87482baf2..c5bdc6926d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -32,6 +32,15 @@ import java.util.List;
  */
 public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> 
extends Serializable {
 
+    /** init sink aggregated committer */
+    default void init() {};
+
+    /** Re-commit message to third party data receiver, The method need to 
achieve idempotency. */
+    default List<AggregatedCommitInfoT> restoreCommit(
+            List<AggregatedCommitInfoT> aggregatedCommitInfo) throws 
IOException {
+        return commit(aggregatedCommitInfo);
+    }
+
     /**
      * Commit message to third party data receiver, The method need to achieve 
idempotency.
      *
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 014e5b4cd1..0e73964b03 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -114,6 +114,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT>
         this.commitInfoSerializer = 
sink.getSink().getCommitInfoSerializer().get();
         this.aggregatedCommitInfoSerializer =
                 sink.getSink().getAggregatedCommitInfoSerializer().get();
+        aggregatedCommitter.init();
         log.debug(
                 "starting seatunnel sink aggregated committer task, sink 
name[{}] ",
                 sink.getName());
@@ -262,7 +263,12 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT>
                                                         
aggregatedCommitInfoSerializer.deserialize(
                                                                 bytes)))
                         .collect(Collectors.toList());
-        aggregatedCommitter.commit(aggregatedCommitInfos);
+        List<AggregatedCommitInfoT> commit =
+                aggregatedCommitter.restoreCommit(aggregatedCommitInfos);
+        if (CollectionUtils.isNotEmpty(commit)) {
+            log.error("aggregated committer error: {}", commit.size());
+            throw new 
CheckpointException(CheckpointCloseReason.AGGREGATE_COMMIT_ERROR);
+        }
         restoreComplete.complete(null);
         log.debug("restoreState for sink agg committer [{}] finished", 
actionStateList);
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
index 7c95735e17..02a29beeb9 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
@@ -46,6 +46,7 @@ public class FlinkGlobalCommitter<CommT, GlobalCommT>
 
     FlinkGlobalCommitter(SinkAggregatedCommitter<CommT, GlobalCommT> 
aggregatedCommitter) {
         this.aggregatedCommitter = aggregatedCommitter;
+        aggregatedCommitter.init();
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
index 871e4b58ac..08279e0714 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
@@ -49,6 +49,9 @@ public class SparkDataSourceWriter<StateT, CommitInfoT, 
AggregatedCommitInfoT>
             throws IOException {
         this.sink = sink;
         this.sinkAggregatedCommitter = 
sink.createAggregatedCommitter().orElse(null);
+        if (sinkAggregatedCommitter != null) {
+            sinkAggregatedCommitter.init();
+        }
     }
 
     @Override

Reply via email to