This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new b2cd4c02 Add SeaTunnel kafka sink (#1952)
b2cd4c02 is described below

commit b2cd4c02fc9452c724f09eefa924cc87b4bee16b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed May 25 23:20:38 2022 +0800

    Add SeaTunnel kafka sink (#1952)
---
 .../apache/seatunnel/api/sink/SinkCommitter.java   |   5 +-
 .../org/apache/seatunnel/api/sink/SinkWriter.java  |  12 +-
 .../seatunnel-connectors-seatunnel-dist/pom.xml    |   5 +
 .../seatunnel/console/sink/ConsoleSinkWriter.java  |  10 +-
 .../connectors/seatunnel/kafka/config/Config.java  |   2 +
 .../seatunnel/kafka/config/KafkaSemantics.java     |  23 ++++
 .../serialize/DefaultSeaTunnelRowSerializer.java   |  19 ++++
 .../kafka/serialize/SeaTunnelRowSerializer.java    |  16 +++
 .../kafka/sink/KafkaNoTransactionSender.java       |  88 +++++++++++++++
 .../seatunnel/kafka/sink/KafkaProduceSender.java   |  63 +++++++++++
 .../connectors/seatunnel/kafka/sink/KafkaSink.java |  86 +++++++++++++++
 .../seatunnel/kafka/sink/KafkaSinkCommitter.java   |  77 +++++++++++++
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      | 118 ++++++++++++++++++++
 .../kafka/sink/KafkaTransactionSender.java         | 122 +++++++++++++++++++++
 ...kaState.java => KafkaAggregatedCommitInfo.java} |   6 +-
 .../{KafkaState.java => KafkaCommitInfo.java}      |  12 +-
 .../seatunnel/kafka/state/KafkaState.java          |  10 ++
 .../translation/flink/sink/FlinkSinkWriter.java    |   4 +-
 .../spark/sink/SparkDataSourceWriter.java          |  29 +++--
 .../translation/spark/sink/SparkDataWriter.java    |  33 +++++-
 .../spark/sink/SparkWriterCommitMessage.java       |   4 +-
 21 files changed, 723 insertions(+), 21 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 6fff7762..4dad4306 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -38,9 +38,10 @@ public interface SinkCommitter<CommitInfoT> extends 
Serializable {
     List<CommitInfoT> commit(List<CommitInfoT> committables) throws 
IOException;
 
     /**
-     * Close this resource.
+     * Abort the transaction, this method will be called when the commit is 
failed.
      *
+     * @param commitInfos The list of commit message, used to abort the commit.
      * @throws IOException throw IOException when close failed.
      */
-    void abort() throws IOException;
+    void abort(List<CommitInfoT> commitInfos) throws IOException;
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 456e78de..6ce5a79a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * The sink writer use to write data to third party data receiver. This class 
will run on taskManger/Worker.
@@ -42,11 +43,12 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends 
Serializable {
     void write(T element) throws IOException;
 
     /**
-     * prepare the commit, will be called before {@link #snapshotState()}
+     * prepare the commit, will be called before {@link #snapshotState()}.
+     * If you need to use 2pc, you can return the commit info in this method, 
and receive the commit info in {@link SinkCommitter#commit(List)}.
      *
      * @return the commit info need to commit
      */
-    CommitInfoT prepareCommit() throws IOException;
+    Optional<CommitInfoT> prepareCommit() throws IOException;
 
     /**
      * @return The writer's state.
@@ -56,6 +58,12 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends 
Serializable {
         return Collections.emptyList();
     }
 
+    /**
+     * Used to abort the prepareCommit, if the prepareCommit failed,
+     * there is no CommitInfoT, so the rollback work cannot be done by {@link 
SinkCommitter}.
+     */
+    void abort();
+
     /**
      * call it when SinkWriter close
      *
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index b487fe1a..f0e03b24 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -40,6 +40,11 @@
             <artifactId>seatunnel-connectors-seatunnel-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connectors-seatunnel-kafka</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 650aa4c2..b567e6a4 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.Optional;
 
 public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow, 
ConsoleCommitInfo, ConsoleState> {
 
@@ -44,8 +45,13 @@ public class ConsoleSinkWriter implements 
SinkWriter<SeaTunnelRow, ConsoleCommit
     }
 
     @Override
-    public ConsoleCommitInfo prepareCommit() {
-        return null;
+    public Optional<ConsoleCommitInfo> prepareCommit() {
+        return Optional.empty();
+    }
+
+    @Override
+    public void abort() {
+
     }
 
     @Override
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index c5fddd25..1eebcdb5 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -27,4 +27,6 @@ public class Config {
      * The server address of kafka cluster.
      */
     public static final String BOOTSTRAP_SERVER = "bootstrap.server";
+
+    public static final String KAFKA_CONFIG_PREFIX = "kafka.";
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSemantics.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSemantics.java
new file mode 100644
index 00000000..816eadf0
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSemantics.java
@@ -0,0 +1,23 @@
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+public enum KafkaSemantics {
+
+    /**
+     * At this semantics, we will directly send the message to kafka, the data 
may duplicat/lost
+     * if job restart/retry or network error.
+     */
+    NON,
+
+    /**
+     * At this semantics, we will retry sending the message to kafka, if the 
response is not ack.
+     */
+    AT_LEAST_ONCE,
+
+    /**
+     * AT this semantics, we will use 2pc to guarantee the message is sent to 
kafka exactly once.
+     */
+    EXACTLY_ONCE,
+    ;
+
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
new file mode 100644
index 00000000..bb6c14f8
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -0,0 +1,19 @@
+package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer<String, SeaTunnelRow> {
+
+    private final String topic;
+
+    public DefaultSeaTunnelRowSerializer(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public ProducerRecord<String, SeaTunnelRow> serializeRow(SeaTunnelRow row) 
{
+        return new ProducerRecord<>(topic, null, row);
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 00000000..ae92e2a9
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,16 @@
+package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+public interface SeaTunnelRowSerializer<K, V> {
+
+    /**
+     * Serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}.
+     *
+     * @param row seatunnel row
+     * @return kafka record.
+     */
+    ProducerRecord<K, V> serializeRow(SeaTunnelRow row);
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
new file mode 100644
index 00000000..c1339e48
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * This sender will send the data to the Kafka, and will not guarantee the 
data is committed to the Kafka exactly-once.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public class KafkaNoTransactionSender<K, V> implements KafkaProduceSender<K, 
V> {
+
+    private final KafkaProducer<K, V> kafkaProducer;
+    private final SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer;
+
+    public KafkaNoTransactionSender(Properties properties, 
SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer) {
+        this.seaTunnelRowSerializer = seaTunnelRowSerializer;
+        this.kafkaProducer = new KafkaProducer<>(properties);
+    }
+
+    @Override
+    public void send(SeaTunnelRow seaTunnelRow) {
+        ProducerRecord<K, V> producerRecord = 
seaTunnelRowSerializer.serializeRow(seaTunnelRow);
+        kafkaProducer.send(producerRecord);
+    }
+
+    @Override
+    public void beginTransaction() {
+        // no-op
+    }
+
+    @Override
+    public Optional<KafkaCommitInfo> prepareCommit() {
+        return Optional.empty();
+    }
+
+    @Override
+    public void abortTransaction() {
+        // no-op
+    }
+
+    @Override
+    public void abortTransaction(List<KafkaState> kafkaStates) {
+        // no-op
+    }
+
+    @Override
+    public List<KafkaState> snapshotState() {
+        kafkaProducer.flush();
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void close() {
+        kafkaProducer.flush();
+        try (KafkaProducer<?, ?> closedKafkaProducer = kafkaProducer) {
+            // close the producer
+        }
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
new file mode 100644
index 00000000..1444755a
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface KafkaProduceSender<K, V> extends AutoCloseable {
+    /**
+     * Send data to kafka.
+     *
+     * @param seaTunnelRow data to send
+     */
+    void send(SeaTunnelRow seaTunnelRow);
+
+    void beginTransaction();
+
+    /**
+     * Prepare a transaction commit.
+     *
+     * @return commit info, or empty if no commit is needed.
+     */
+    Optional<KafkaCommitInfo> prepareCommit();
+
+    /**
+     * Abort the current transaction.
+     */
+    void abortTransaction();
+
+    /**
+     * Abort the given transaction.
+     *
+     * @param kafkaStates kafka states about the transaction info.
+     */
+    void abortTransaction(List<KafkaState> kafkaStates);
+
+    /**
+     * Get the current kafka state of the sender.
+     *
+     * @return kafka state List, or empty if no state is available.
+     */
+    List<KafkaState> snapshotState();
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
new file mode 100644
index 00000000..c6a96e3f
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Kafka Sink implementation by using SeaTunnel sink API.
+ * This class contains the method to create {@link KafkaSinkWriter} and {@link 
KafkaSinkCommitter}.
+ */
+public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaState, 
KafkaCommitInfo, KafkaAggregatedCommitInfo> {
+
+    private Config pluginConfig;
+    private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.pluginConfig = pluginConfig;
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaState> 
createWriter(SinkWriter.Context context) {
+        return new KafkaSinkWriter(context, seaTunnelRowTypeInfo, 
pluginConfig, Collections.emptyList());
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaState> 
restoreWriter(SinkWriter.Context context, List<KafkaState> states) {
+        return new KafkaSinkWriter(context, seaTunnelRowTypeInfo, 
pluginConfig, states);
+    }
+
+    @Override
+    public Optional<Serializer<KafkaState>> getWriterStateSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<SinkCommitter<KafkaCommitInfo>> createCommitter() {
+        return Optional.of(new KafkaSinkCommitter(pluginConfig));
+    }
+
+    @Override
+    public Optional<Serializer<KafkaCommitInfo>> getCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Kafka";
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
new file mode 100644
index 00000000..5d70ff8e
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaSinkCommitter.class);
+
+    private final Config pluginConfig;
+
+    public KafkaSinkCommitter(Config pluginConfig) {
+        this.pluginConfig = pluginConfig;
+    }
+
+    @Override
+    public List<KafkaCommitInfo> commit(List<KafkaCommitInfo> commitInfos) {
+        if (commitInfos.isEmpty()) {
+            return commitInfos;
+        }
+        for (KafkaCommitInfo commitInfo : commitInfos) {
+            String transactionId = commitInfo.getTransactionId();
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Committing transaction {}", transactionId);
+            }
+            KafkaProducer<?, ?> producer = getProducer(commitInfo);
+            producer.commitTransaction();
+        }
+        return commitInfos;
+    }
+
+    @Override
+    public void abort(List<KafkaCommitInfo> commitInfos) {
+        if (commitInfos.isEmpty()) {
+            return;
+        }
+        for (KafkaCommitInfo commitInfo : commitInfos) {
+            KafkaProducer<?, ?> producer = getProducer(commitInfo);
+            producer.abortTransaction();
+        }
+    }
+
+    private KafkaProducer<?, ?> getProducer(KafkaCommitInfo kafkaCommitInfo) {
+        Properties kafkaProperties = kafkaCommitInfo.getKafkaProperties();
+        kafkaProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
kafkaCommitInfo.getTransactionId());
+        KafkaProducer<?, ?> kafkaProducer = new 
KafkaProducer<>(kafkaCommitInfo.getKafkaProperties());
+        kafkaProducer.initTransactions();
+        return kafkaProducer;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
new file mode 100644
index 00000000..c202c699
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to 
Kafka.
+ */
+public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, 
KafkaCommitInfo, KafkaState> {
+
+    private final SinkWriter.Context context;
+    private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+    private final Config pluginConfig;
+
+    private KafkaProduceSender<?, ?> kafkaProducerSender;
+
+    public KafkaSinkWriter(
+        SinkWriter.Context context,
+        SeaTunnelRowTypeInfo seaTunnelRowTypeInfo,
+        Config pluginConfig,
+        List<KafkaState> kafkaStates) {
+        this.context = context;
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+        this.pluginConfig = pluginConfig;
+        if 
(KafkaSemantics.AT_LEAST_ONCE.equals(getKafkaSemantics(pluginConfig))) {
+            // the recover state
+            this.kafkaProducerSender = new KafkaTransactionSender<>(
+                getKafkaProperties(pluginConfig),
+                getSerializer(pluginConfig));
+            this.kafkaProducerSender.abortTransaction(kafkaStates);
+            this.kafkaProducerSender.beginTransaction();
+        } else {
+            this.kafkaProducerSender = new KafkaNoTransactionSender<>(
+                getKafkaProperties(pluginConfig),
+                getSerializer(pluginConfig));
+        }
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) {
+        kafkaProducerSender.send(element);
+    }
+
+    @Override
+    public List<KafkaState> snapshotState() {
+        return kafkaProducerSender.snapshotState();
+    }
+
+    @Override
+    public Optional<KafkaCommitInfo> prepareCommit() {
+        return kafkaProducerSender.prepareCommit();
+    }
+
+    @Override
+    public void abort() {
+        kafkaProducerSender.abortTransaction();
+    }
+
+    @Override
+    public void close() {
+        try (KafkaProduceSender<?, ?> kafkaProduceSender = 
kafkaProducerSender) {
+            // no-opt
+        } catch (Exception e) {
+            throw new RuntimeException("Close kafka sink writer error", e);
+        }
+    }
+
+    private Properties getKafkaProperties(Config pluginConfig) {
+        Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
+            
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX,
 true);
+        Properties kafkaProperties = new Properties();
+        kafkaConfig.entrySet().forEach(entry -> {
+            kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
+        });
+        return kafkaProperties;
+    }
+
+    private SeaTunnelRowSerializer<?, ?> getSerializer(Config pluginConfig) {
+        return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString("topic"));
+    }
+
+    private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
+        if (pluginConfig.hasPath("semantics")) {
+            return pluginConfig.getEnum(KafkaSemantics.class, "semantics");
+        }
+        return KafkaSemantics.NON;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
new file mode 100644
index 00000000..2956e361
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import com.google.common.collect.Lists;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * This sender will use kafka transaction to guarantee the data is sent to 
kafka at exactly-once.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaTransactionSender.class);
+
+    private final KafkaProducer<K, V> kafkaProducer;
+    private final SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer;
+    private final String transactionId;
+    private final Properties kafkaProperties;
+
+    public KafkaTransactionSender(Properties kafkaProperties,
+                                  SeaTunnelRowSerializer<K, V> 
seaTunnelRowSerializer) {
+        this.kafkaProperties = kafkaProperties;
+        this.seaTunnelRowSerializer = seaTunnelRowSerializer;
+        this.transactionId = getTransactionId();
+        this.kafkaProducer = getTransactionProducer(kafkaProperties, 
transactionId);
+    }
+
+    @Override
+    public void send(SeaTunnelRow seaTunnelRow) {
+        ProducerRecord<K, V> producerRecord = 
seaTunnelRowSerializer.serializeRow(seaTunnelRow);
+        kafkaProducer.send(producerRecord);
+    }
+
+    @Override
+    public void beginTransaction() {
+        kafkaProducer.beginTransaction();
+    }
+
+    @Override
+    public Optional<KafkaCommitInfo> prepareCommit() {
+        KafkaCommitInfo kafkaCommitInfo = new KafkaCommitInfo(transactionId, 
kafkaProperties);
+        return Optional.of(kafkaCommitInfo);
+    }
+
+    @Override
+    public void abortTransaction() {
+        kafkaProducer.abortTransaction();
+    }
+
+    @Override
+    public void abortTransaction(List<KafkaState> kafkaStates) {
+        if (kafkaStates.isEmpty()) {
+            return;
+        }
+        for (KafkaState kafkaState : kafkaStates) {
+            // create the transaction producer
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Abort kafka transaction: {}", 
kafkaState.getTransactionId());
+            }
+            KafkaProducer<K, V> historyProducer = 
getTransactionProducer(kafkaProperties, kafkaState.getTransactionId());
+            historyProducer.initTransactions();
+            historyProducer.abortTransaction();
+        }
+    }
+
+    @Override
+    public List<KafkaState> snapshotState() {
+        return Lists.newArrayList(new KafkaState(transactionId, 
kafkaProperties));
+    }
+
+    @Override
+    public void close() {
+        kafkaProducer.flush();
+        try (KafkaProducer<?, ?> closedProducer = kafkaProducer) {
+            // no-op
+        }
+    }
+
+    private KafkaProducer<K, V> getTransactionProducer(Properties properties, 
String transactionId) {
+        Properties transactionProperties = new Properties(properties);
+        transactionProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionId);
+        KafkaProducer<K, V> transactionProducer = new 
KafkaProducer<>(transactionProperties);
+        transactionProducer.initTransactions();
+        return transactionProducer;
+    }
+
+    // todo: use a better way to generate the transaction id
+    private String getTransactionId() {
+        return "SeaTunnel-" + System.currentTimeMillis();
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
similarity index 82%
copy from 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
copy to 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
index fa6a5518..c55c3efd 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
@@ -19,5 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.state;
 
 import java.io.Serializable;
 
-public class KafkaState implements Serializable {
+/**
+ * Right now, we don't need aggregated commit in kafka.
+ * Todo: we need to add a default implementation of this state.
+ */
+public class KafkaAggregatedCommitInfo implements Serializable {
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
similarity index 78%
copy from 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
copy to 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
index fa6a5518..b1a032f7 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
@@ -17,7 +17,17 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.state;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
 import java.io.Serializable;
+import java.util.Properties;
+
+@Data
+@AllArgsConstructor
+public class KafkaCommitInfo implements Serializable {
+
+    private final String transactionId;
+    private final Properties kafkaProperties;
 
-public class KafkaState implements Serializable {
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
index fa6a5518..7ab11ddf 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
@@ -17,7 +17,17 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.state;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
 import java.io.Serializable;
+import java.util.Properties;
 
+@Data
+@AllArgsConstructor
 public class KafkaState implements Serializable {
+
+    private final String transactionId;
+    private final Properties kafkaProperties;
+
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index bfdb2db4..9f4f1a7b 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.io.InvalidClassException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements 
SinkWriter<InputT, CommT, WriterStateT> {
 
@@ -51,7 +52,8 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT> 
implements SinkWriter<
 
     @Override
     public List<CommT> prepareCommit(boolean flush) throws IOException {
-        return Collections.singletonList(sinkWriter.prepareCommit());
+        Optional<CommT> commTOptional = sinkWriter.prepareCommit();
+        return 
commTOptional.map(Collections::singletonList).orElse(Collections.emptyList());
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
index dbf813b8..59360bdf 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
@@ -28,12 +28,14 @@ import 
org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 public class SparkDataSourceWriter<CommitInfoT, StateT, AggregatedCommitInfoT> 
implements DataSourceWriter {
@@ -64,7 +66,7 @@ public class SparkDataSourceWriter<CommitInfoT, StateT, 
AggregatedCommitInfoT> i
     public void commit(WriterCommitMessage[] messages) {
         if (sinkAggregatedCommitter != null) {
             try {
-                sinkAggregatedCommitter.commit(combineCommitMessage(messages));
+                
sinkAggregatedCommitter.commit(combineCommitMessage(extractCommitInfo(messages)));
             } catch (IOException e) {
                 throw new RuntimeException("commit failed in driver", e);
             }
@@ -73,18 +75,31 @@ public class SparkDataSourceWriter<CommitInfoT, StateT, 
AggregatedCommitInfoT> i
 
     @Override
     public void abort(WriterCommitMessage[] messages) {
+        final List<CommitInfoT> commitInfos = extractCommitInfo(messages);
+        if (sinkCommitter != null) {
+            try {
+                sinkCommitter.abort(commitInfos);
+            } catch (IOException e) {
+                throw new RuntimeException("SinkCommitter abort failed in 
driver", e);
+            }
+        }
         if (sinkAggregatedCommitter != null) {
             try {
-                sinkAggregatedCommitter.abort(combineCommitMessage(messages));
+                
sinkAggregatedCommitter.abort(combineCommitMessage(commitInfos));
             } catch (Exception e) {
-                throw new RuntimeException("abort failed in driver", e);
+                throw new RuntimeException("SinkAggregatedCommitter abort 
failed in driver", e);
             }
         }
     }
 
-    private List<AggregatedCommitInfoT> 
combineCommitMessage(WriterCommitMessage[] messages) {
-        return Collections.singletonList(sinkAggregatedCommitter.combine(
-                Arrays.stream(messages).map(m -> 
((SparkWriterCommitMessage<CommitInfoT>) m).getMessage())
-                        .collect(Collectors.toList())));
+    private @Nonnull List<CommitInfoT> extractCommitInfo(WriterCommitMessage[] 
messages) {
+        return Arrays.stream(messages)
+            .map(m -> ((SparkWriterCommitMessage<CommitInfoT>) m).getMessage())
+            .filter(Objects::nonNull)
+            .collect(Collectors.toList());
+    }
+
+    private @Nonnull List<AggregatedCommitInfoT> 
combineCommitMessage(List<CommitInfoT> commitInfos) {
+        return 
Collections.singletonList(sinkAggregatedCommitter.combine(commitInfos));
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index c79367a5..02272ae0 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Optional;
 
 public class SparkDataWriter<CommitInfoT, StateT> implements 
DataWriter<InternalRow> {
 
@@ -40,6 +41,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements 
DataWriter<Internal
     @Nullable
     private final SinkCommitter<CommitInfoT> sinkCommitter;
     private final RowSerialization<InternalRow> rowSerialization;
+    private CommitInfoT latestCommitInfoT;
 
     SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
                     SinkCommitter<CommitInfoT> sinkCommitter,
@@ -56,17 +58,40 @@ public class SparkDataWriter<CommitInfoT, StateT> 
implements DataWriter<Internal
 
     @Override
     public WriterCommitMessage commit() throws IOException {
-        CommitInfoT commitInfo = sinkWriter.prepareCommit();
+        // We combine the prepareCommit and commit in this method.
+        // If this method fails, we need to rollback the transaction in the 
abort method.
+        // 1. prepareCommit fails:
+        //   1.1. We don't have the commit info, we need to execute the 
sinkWriter#abort to rollback the transaction.
+        // 2. commit fails
+        //   2.1. We have the commit info, we need to execute the 
sinkCommitter#abort to rollback the transaction.
+        Optional<CommitInfoT> commitInfoTOptional = sinkWriter.prepareCommit();
+        commitInfoTOptional.ifPresent(commitInfoT -> latestCommitInfoT = 
commitInfoT);
         if (sinkCommitter != null) {
-            sinkCommitter.commit(Collections.singletonList(commitInfo));
+            if (latestCommitInfoT == null) {
+                sinkCommitter.commit(Collections.emptyList());
+            } else {
+                
sinkCommitter.commit(Collections.singletonList(latestCommitInfoT));
+            }
         }
-        return new SparkWriterCommitMessage<>(commitInfo);
+        SparkWriterCommitMessage<CommitInfoT> sparkWriterCommitMessage = new 
SparkWriterCommitMessage<>(latestCommitInfoT);
+        cleanCommitInfo();
+        return sparkWriterCommitMessage;
     }
 
     @Override
     public void abort() throws IOException {
+        sinkWriter.abort();
         if (sinkCommitter != null) {
-            sinkCommitter.abort();
+            if (latestCommitInfoT == null) {
+                sinkCommitter.abort(Collections.emptyList());
+            } else {
+                
sinkCommitter.abort(Collections.singletonList(latestCommitInfoT));
+            }
         }
+        cleanCommitInfo();
+    }
+
+    private void cleanCommitInfo() {
+        latestCommitInfoT = null;
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
index b292282a..b6b8a5b6 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
@@ -19,9 +19,11 @@ package org.apache.seatunnel.translation.spark.sink;
 
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 
+import javax.annotation.Nullable;
+
 public class SparkWriterCommitMessage<T> implements WriterCommitMessage {
 
-    private T message;
+    private @Nullable T message;
 
     SparkWriterCommitMessage(T message) {
         this.message = message;

Reply via email to