This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c4b18dbb24 [Feature] Add `init` and `restoreCommit` method in
`SinkAggregatedCommitter` (#5598)
c4b18dbb24 is described below
commit c4b18dbb2497966d99caba2b848f82b9fe747550
Author: Jia Fan <[email protected]>
AuthorDate: Tue Oct 10 10:16:03 2023 +0800
[Feature] Add `init` and `restoreCommit` method in
`SinkAggregatedCommitter` (#5598)
---
.../org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java | 9 +++++++++
.../engine/server/task/SinkAggregatedCommitterTask.java | 8 +++++++-
.../seatunnel/translation/flink/sink/FlinkGlobalCommitter.java | 1 +
.../translation/spark/sink/writer/SparkDataSourceWriter.java | 3 +++
4 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index e87482baf2..c5bdc6926d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -32,6 +32,15 @@ import java.util.List;
*/
public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>
extends Serializable {
+ /** init sink aggregated committer */
+ default void init() {};
+
+ /** Re-commit message to third party data receiver, The method need to
achieve idempotency. */
+ default List<AggregatedCommitInfoT> restoreCommit(
+ List<AggregatedCommitInfoT> aggregatedCommitInfo) throws
IOException {
+ return commit(aggregatedCommitInfo);
+ }
+
/**
* Commit message to third party data receiver, The method need to achieve
idempotency.
*
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 014e5b4cd1..0e73964b03 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -114,6 +114,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT>
this.commitInfoSerializer =
sink.getSink().getCommitInfoSerializer().get();
this.aggregatedCommitInfoSerializer =
sink.getSink().getAggregatedCommitInfoSerializer().get();
+ aggregatedCommitter.init();
log.debug(
"starting seatunnel sink aggregated committer task, sink
name[{}] ",
sink.getName());
@@ -262,7 +263,12 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT>
aggregatedCommitInfoSerializer.deserialize(
bytes)))
.collect(Collectors.toList());
- aggregatedCommitter.commit(aggregatedCommitInfos);
+ List<AggregatedCommitInfoT> commit =
+ aggregatedCommitter.restoreCommit(aggregatedCommitInfos);
+ if (CollectionUtils.isNotEmpty(commit)) {
+ log.error("aggregated committer error: {}", commit.size());
+ throw new
CheckpointException(CheckpointCloseReason.AGGREGATE_COMMIT_ERROR);
+ }
restoreComplete.complete(null);
log.debug("restoreState for sink agg committer [{}] finished",
actionStateList);
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
index 7c95735e17..02a29beeb9 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
@@ -46,6 +46,7 @@ public class FlinkGlobalCommitter<CommT, GlobalCommT>
FlinkGlobalCommitter(SinkAggregatedCommitter<CommT, GlobalCommT>
aggregatedCommitter) {
this.aggregatedCommitter = aggregatedCommitter;
+ aggregatedCommitter.init();
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
index 871e4b58ac..08279e0714 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
@@ -49,6 +49,9 @@ public class SparkDataSourceWriter<StateT, CommitInfoT,
AggregatedCommitInfoT>
throws IOException {
this.sink = sink;
this.sinkAggregatedCommitter =
sink.createAggregatedCommitter().orElse(null);
+ if (sinkAggregatedCommitter != null) {
+ sinkAggregatedCommitter.init();
+ }
}
@Override