This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new b2cd4c02 Add SeaTunnel kafka sink (#1952)
b2cd4c02 is described below
commit b2cd4c02fc9452c724f09eefa924cc87b4bee16b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed May 25 23:20:38 2022 +0800
Add SeaTunnel kafka sink (#1952)
---
.../apache/seatunnel/api/sink/SinkCommitter.java | 5 +-
.../org/apache/seatunnel/api/sink/SinkWriter.java | 12 +-
.../seatunnel-connectors-seatunnel-dist/pom.xml | 5 +
.../seatunnel/console/sink/ConsoleSinkWriter.java | 10 +-
.../connectors/seatunnel/kafka/config/Config.java | 2 +
.../seatunnel/kafka/config/KafkaSemantics.java | 23 ++++
.../serialize/DefaultSeaTunnelRowSerializer.java | 19 ++++
.../kafka/serialize/SeaTunnelRowSerializer.java | 16 +++
.../kafka/sink/KafkaNoTransactionSender.java | 88 +++++++++++++++
.../seatunnel/kafka/sink/KafkaProduceSender.java | 63 +++++++++++
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 86 +++++++++++++++
.../seatunnel/kafka/sink/KafkaSinkCommitter.java | 77 +++++++++++++
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 118 ++++++++++++++++++++
.../kafka/sink/KafkaTransactionSender.java | 122 +++++++++++++++++++++
...kaState.java => KafkaAggregatedCommitInfo.java} | 6 +-
.../{KafkaState.java => KafkaCommitInfo.java} | 12 +-
.../seatunnel/kafka/state/KafkaState.java | 10 ++
.../translation/flink/sink/FlinkSinkWriter.java | 4 +-
.../spark/sink/SparkDataSourceWriter.java | 29 +++--
.../translation/spark/sink/SparkDataWriter.java | 33 +++++-
.../spark/sink/SparkWriterCommitMessage.java | 4 +-
21 files changed, 723 insertions(+), 21 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 6fff7762..4dad4306 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -38,9 +38,10 @@ public interface SinkCommitter<CommitInfoT> extends
Serializable {
List<CommitInfoT> commit(List<CommitInfoT> committables) throws
IOException;
/**
- * Close this resource.
+ * Abort the transaction, this method will be called when the commit is
failed.
*
+ * @param commitInfos The list of commit message, used to abort the commit.
* @throws IOException throw IOException when close failed.
*/
- void abort() throws IOException;
+ void abort(List<CommitInfoT> commitInfos) throws IOException;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 456e78de..6ce5a79a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* The sink writer use to write data to third party data receiver. This class
will run on taskManger/Worker.
@@ -42,11 +43,12 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends
Serializable {
void write(T element) throws IOException;
/**
- * prepare the commit, will be called before {@link #snapshotState()}
+ * prepare the commit, will be called before {@link #snapshotState()}.
+ * If you need to use 2pc, you can return the commit info in this method,
and receive the commit info in {@link SinkCommitter#commit(List)}.
*
* @return the commit info need to commit
*/
- CommitInfoT prepareCommit() throws IOException;
+ Optional<CommitInfoT> prepareCommit() throws IOException;
/**
* @return The writer's state.
@@ -56,6 +58,12 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends
Serializable {
return Collections.emptyList();
}
+ /**
+ * Used to abort the prepareCommit, if the prepareCommit failed,
+ * there is no CommitInfoT, so the rollback work cannot be done by {@link
SinkCommitter}.
+ */
+ void abort();
+
/**
* call it when SinkWriter close
*
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index b487fe1a..f0e03b24 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -40,6 +40,11 @@
<artifactId>seatunnel-connectors-seatunnel-console</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-seatunnel-kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 650aa4c2..b567e6a4 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
+import java.util.Optional;
public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow,
ConsoleCommitInfo, ConsoleState> {
@@ -44,8 +45,13 @@ public class ConsoleSinkWriter implements
SinkWriter<SeaTunnelRow, ConsoleCommit
}
@Override
- public ConsoleCommitInfo prepareCommit() {
- return null;
+ public Optional<ConsoleCommitInfo> prepareCommit() {
+ return Optional.empty();
+ }
+
+ @Override
+ public void abort() {
+
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index c5fddd25..1eebcdb5 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -27,4 +27,6 @@ public class Config {
* The server address of kafka cluster.
*/
public static final String BOOTSTRAP_SERVER = "bootstrap.server";
+
+ public static final String KAFKA_CONFIG_PREFIX = "kafka.";
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSemantics.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSemantics.java
new file mode 100644
index 00000000..816eadf0
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSemantics.java
@@ -0,0 +1,23 @@
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+public enum KafkaSemantics {
+
+ /**
+ * At this semantics, we will directly send the message to kafka, the data
may duplicat/lost
+ * if job restart/retry or network error.
+ */
+ NON,
+
+ /**
+ * At this semantics, we will retry sending the message to kafka, if the
response is not ack.
+ */
+ AT_LEAST_ONCE,
+
+ /**
+ * AT this semantics, we will use 2pc to guarantee the message is sent to
kafka exactly once.
+ */
+ EXACTLY_ONCE,
+ ;
+
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
new file mode 100644
index 00000000..bb6c14f8
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -0,0 +1,19 @@
+package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<String, SeaTunnelRow> {
+
+ private final String topic;
+
+ public DefaultSeaTunnelRowSerializer(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public ProducerRecord<String, SeaTunnelRow> serializeRow(SeaTunnelRow row)
{
+ return new ProducerRecord<>(topic, null, row);
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 00000000..ae92e2a9
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,16 @@
+package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+public interface SeaTunnelRowSerializer<K, V> {
+
+ /**
+ * Serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}.
+ *
+ * @param row seatunnel row
+ * @return kafka record.
+ */
+ ProducerRecord<K, V> serializeRow(SeaTunnelRow row);
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
new file mode 100644
index 00000000..c1339e48
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * This sender will send the data to the Kafka, and will not guarantee the
data is committed to the Kafka exactly-once.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public class KafkaNoTransactionSender<K, V> implements KafkaProduceSender<K,
V> {
+
+ private final KafkaProducer<K, V> kafkaProducer;
+ private final SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer;
+
+ public KafkaNoTransactionSender(Properties properties,
SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer) {
+ this.seaTunnelRowSerializer = seaTunnelRowSerializer;
+ this.kafkaProducer = new KafkaProducer<>(properties);
+ }
+
+ @Override
+ public void send(SeaTunnelRow seaTunnelRow) {
+ ProducerRecord<K, V> producerRecord =
seaTunnelRowSerializer.serializeRow(seaTunnelRow);
+ kafkaProducer.send(producerRecord);
+ }
+
+ @Override
+ public void beginTransaction() {
+ // no-op
+ }
+
+ @Override
+ public Optional<KafkaCommitInfo> prepareCommit() {
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortTransaction() {
+ // no-op
+ }
+
+ @Override
+ public void abortTransaction(List<KafkaState> kafkaStates) {
+ // no-op
+ }
+
+ @Override
+ public List<KafkaState> snapshotState() {
+ kafkaProducer.flush();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void close() {
+ kafkaProducer.flush();
+ try (KafkaProducer<?, ?> closedKafkaProducer = kafkaProducer) {
+ // close the producer
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
new file mode 100644
index 00000000..1444755a
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface KafkaProduceSender<K, V> extends AutoCloseable {
+ /**
+ * Send data to kafka.
+ *
+ * @param seaTunnelRow data to send
+ */
+ void send(SeaTunnelRow seaTunnelRow);
+
+ void beginTransaction();
+
+ /**
+ * Prepare a transaction commit.
+ *
+ * @return commit info, or empty if no commit is needed.
+ */
+ Optional<KafkaCommitInfo> prepareCommit();
+
+ /**
+ * Abort the current transaction.
+ */
+ void abortTransaction();
+
+ /**
+ * Abort the given transaction.
+ *
+ * @param kafkaStates kafka states about the transaction info.
+ */
+ void abortTransaction(List<KafkaState> kafkaStates);
+
+ /**
+ * Get the current kafka state of the sender.
+ *
+ * @return kafka state List, or empty if no state is available.
+ */
+ List<KafkaState> snapshotState();
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
new file mode 100644
index 00000000..c6a96e3f
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Kafka Sink implementation by using SeaTunnel sink API.
+ * This class contains the method to create {@link KafkaSinkWriter} and {@link
KafkaSinkCommitter}.
+ */
+public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaState,
KafkaCommitInfo, KafkaAggregatedCommitInfo> {
+
+ private Config pluginConfig;
+ private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaState>
createWriter(SinkWriter.Context context) {
+ return new KafkaSinkWriter(context, seaTunnelRowTypeInfo,
pluginConfig, Collections.emptyList());
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaState>
restoreWriter(SinkWriter.Context context, List<KafkaState> states) {
+ return new KafkaSinkWriter(context, seaTunnelRowTypeInfo,
pluginConfig, states);
+ }
+
+ @Override
+ public Optional<Serializer<KafkaState>> getWriterStateSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<SinkCommitter<KafkaCommitInfo>> createCommitter() {
+ return Optional.of(new KafkaSinkCommitter(pluginConfig));
+ }
+
+ @Override
+ public Optional<Serializer<KafkaCommitInfo>> getCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public String getPluginName() {
+ return "Kafka";
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
new file mode 100644
index 00000000..5d70ff8e
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaSinkCommitter.class);
+
+ private final Config pluginConfig;
+
+ public KafkaSinkCommitter(Config pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public List<KafkaCommitInfo> commit(List<KafkaCommitInfo> commitInfos) {
+ if (commitInfos.isEmpty()) {
+ return commitInfos;
+ }
+ for (KafkaCommitInfo commitInfo : commitInfos) {
+ String transactionId = commitInfo.getTransactionId();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Committing transaction {}", transactionId);
+ }
+ KafkaProducer<?, ?> producer = getProducer(commitInfo);
+ producer.commitTransaction();
+ }
+ return commitInfos;
+ }
+
+ @Override
+ public void abort(List<KafkaCommitInfo> commitInfos) {
+ if (commitInfos.isEmpty()) {
+ return;
+ }
+ for (KafkaCommitInfo commitInfo : commitInfos) {
+ KafkaProducer<?, ?> producer = getProducer(commitInfo);
+ producer.abortTransaction();
+ }
+ }
+
+ private KafkaProducer<?, ?> getProducer(KafkaCommitInfo kafkaCommitInfo) {
+ Properties kafkaProperties = kafkaCommitInfo.getKafkaProperties();
+ kafkaProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
kafkaCommitInfo.getTransactionId());
+ KafkaProducer<?, ?> kafkaProducer = new
KafkaProducer<>(kafkaCommitInfo.getKafkaProperties());
+ kafkaProducer.initTransactions();
+ return kafkaProducer;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
new file mode 100644
index 00000000..c202c699
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to
Kafka.
+ */
+public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow,
KafkaCommitInfo, KafkaState> {
+
+ private final SinkWriter.Context context;
+ private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+ private final Config pluginConfig;
+
+ private KafkaProduceSender<?, ?> kafkaProducerSender;
+
+ public KafkaSinkWriter(
+ SinkWriter.Context context,
+ SeaTunnelRowTypeInfo seaTunnelRowTypeInfo,
+ Config pluginConfig,
+ List<KafkaState> kafkaStates) {
+ this.context = context;
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ this.pluginConfig = pluginConfig;
+ if
(KafkaSemantics.AT_LEAST_ONCE.equals(getKafkaSemantics(pluginConfig))) {
+ // the recover state
+ this.kafkaProducerSender = new KafkaTransactionSender<>(
+ getKafkaProperties(pluginConfig),
+ getSerializer(pluginConfig));
+ this.kafkaProducerSender.abortTransaction(kafkaStates);
+ this.kafkaProducerSender.beginTransaction();
+ } else {
+ this.kafkaProducerSender = new KafkaNoTransactionSender<>(
+ getKafkaProperties(pluginConfig),
+ getSerializer(pluginConfig));
+ }
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) {
+ kafkaProducerSender.send(element);
+ }
+
+ @Override
+ public List<KafkaState> snapshotState() {
+ return kafkaProducerSender.snapshotState();
+ }
+
+ @Override
+ public Optional<KafkaCommitInfo> prepareCommit() {
+ return kafkaProducerSender.prepareCommit();
+ }
+
+ @Override
+ public void abort() {
+ kafkaProducerSender.abortTransaction();
+ }
+
+ @Override
+ public void close() {
+ try (KafkaProduceSender<?, ?> kafkaProduceSender =
kafkaProducerSender) {
+ // no-opt
+ } catch (Exception e) {
+ throw new RuntimeException("Close kafka sink writer error", e);
+ }
+ }
+
+ private Properties getKafkaProperties(Config pluginConfig) {
+ Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
+
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX,
true);
+ Properties kafkaProperties = new Properties();
+ kafkaConfig.entrySet().forEach(entry -> {
+ kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
+ });
+ return kafkaProperties;
+ }
+
+ private SeaTunnelRowSerializer<?, ?> getSerializer(Config pluginConfig) {
+ return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString("topic"));
+ }
+
+ private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
+ if (pluginConfig.hasPath("semantics")) {
+ return pluginConfig.getEnum(KafkaSemantics.class, "semantics");
+ }
+ return KafkaSemantics.NON;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
new file mode 100644
index 00000000..2956e361
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import com.google.common.collect.Lists;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * This sender will use kafka transaction to guarantee the data is sent to
kafka at exactly-once.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaTransactionSender.class);
+
+ private final KafkaProducer<K, V> kafkaProducer;
+ private final SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer;
+ private final String transactionId;
+ private final Properties kafkaProperties;
+
+ public KafkaTransactionSender(Properties kafkaProperties,
+ SeaTunnelRowSerializer<K, V>
seaTunnelRowSerializer) {
+ this.kafkaProperties = kafkaProperties;
+ this.seaTunnelRowSerializer = seaTunnelRowSerializer;
+ this.transactionId = getTransactionId();
+ this.kafkaProducer = getTransactionProducer(kafkaProperties,
transactionId);
+ }
+
+ @Override
+ public void send(SeaTunnelRow seaTunnelRow) {
+ ProducerRecord<K, V> producerRecord =
seaTunnelRowSerializer.serializeRow(seaTunnelRow);
+ kafkaProducer.send(producerRecord);
+ }
+
+ @Override
+ public void beginTransaction() {
+ kafkaProducer.beginTransaction();
+ }
+
+ @Override
+ public Optional<KafkaCommitInfo> prepareCommit() {
+ KafkaCommitInfo kafkaCommitInfo = new KafkaCommitInfo(transactionId,
kafkaProperties);
+ return Optional.of(kafkaCommitInfo);
+ }
+
+ @Override
+ public void abortTransaction() {
+ kafkaProducer.abortTransaction();
+ }
+
+ @Override
+ public void abortTransaction(List<KafkaState> kafkaStates) {
+ if (kafkaStates.isEmpty()) {
+ return;
+ }
+ for (KafkaState kafkaState : kafkaStates) {
+ // create the transaction producer
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Abort kafka transaction: {}",
kafkaState.getTransactionId());
+ }
+ KafkaProducer<K, V> historyProducer =
getTransactionProducer(kafkaProperties, kafkaState.getTransactionId());
+ historyProducer.initTransactions();
+ historyProducer.abortTransaction();
+ }
+ }
+
+ @Override
+ public List<KafkaState> snapshotState() {
+ return Lists.newArrayList(new KafkaState(transactionId,
kafkaProperties));
+ }
+
+ @Override
+ public void close() {
+ kafkaProducer.flush();
+ try (KafkaProducer<?, ?> closedProducer = kafkaProducer) {
+ // no-op
+ }
+ }
+
+ private KafkaProducer<K, V> getTransactionProducer(Properties properties,
String transactionId) {
+ Properties transactionProperties = new Properties(properties);
+ transactionProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transactionId);
+ KafkaProducer<K, V> transactionProducer = new
KafkaProducer<>(transactionProperties);
+ transactionProducer.initTransactions();
+ return transactionProducer;
+ }
+
+ // todo: use a better way to generate the transaction id
+ private String getTransactionId() {
+ return "SeaTunnel-" + System.currentTimeMillis();
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
similarity index 82%
copy from
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
index fa6a5518..c55c3efd 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
@@ -19,5 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.state;
import java.io.Serializable;
-public class KafkaState implements Serializable {
+/**
+ * Right now, we don't need aggregated commit in kafka.
+ * Todo: we need to add a default implementation of this state.
+ */
+public class KafkaAggregatedCommitInfo implements Serializable {
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
similarity index 78%
copy from
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
index fa6a5518..b1a032f7 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
@@ -17,7 +17,17 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.state;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
import java.io.Serializable;
+import java.util.Properties;
+
+@Data
+@AllArgsConstructor
+public class KafkaCommitInfo implements Serializable {
+
+ private final String transactionId;
+ private final Properties kafkaProperties;
-public class KafkaState implements Serializable {
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
index fa6a5518..7ab11ddf 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
@@ -17,7 +17,17 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.state;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
import java.io.Serializable;
+import java.util.Properties;
+@Data
+@AllArgsConstructor
public class KafkaState implements Serializable {
+
+ private final String transactionId;
+ private final Properties kafkaProperties;
+
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index bfdb2db4..9f4f1a7b 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.io.InvalidClassException;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements
SinkWriter<InputT, CommT, WriterStateT> {
@@ -51,7 +52,8 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
implements SinkWriter<
@Override
public List<CommT> prepareCommit(boolean flush) throws IOException {
- return Collections.singletonList(sinkWriter.prepareCommit());
+ Optional<CommT> commTOptional = sinkWriter.prepareCommit();
+ return
commTOptional.map(Collections::singletonList).orElse(Collections.emptyList());
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
index dbf813b8..59360bdf 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
@@ -28,12 +28,14 @@ import
org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
public class SparkDataSourceWriter<CommitInfoT, StateT, AggregatedCommitInfoT>
implements DataSourceWriter {
@@ -64,7 +66,7 @@ public class SparkDataSourceWriter<CommitInfoT, StateT,
AggregatedCommitInfoT> i
public void commit(WriterCommitMessage[] messages) {
if (sinkAggregatedCommitter != null) {
try {
- sinkAggregatedCommitter.commit(combineCommitMessage(messages));
+
sinkAggregatedCommitter.commit(combineCommitMessage(extractCommitInfo(messages)));
} catch (IOException e) {
throw new RuntimeException("commit failed in driver", e);
}
@@ -73,18 +75,31 @@ public class SparkDataSourceWriter<CommitInfoT, StateT,
AggregatedCommitInfoT> i
@Override
public void abort(WriterCommitMessage[] messages) {
+ final List<CommitInfoT> commitInfos = extractCommitInfo(messages);
+ if (sinkCommitter != null) {
+ try {
+ sinkCommitter.abort(commitInfos);
+ } catch (IOException e) {
+ throw new RuntimeException("SinkCommitter abort failed in
driver", e);
+ }
+ }
if (sinkAggregatedCommitter != null) {
try {
- sinkAggregatedCommitter.abort(combineCommitMessage(messages));
+
sinkAggregatedCommitter.abort(combineCommitMessage(commitInfos));
} catch (Exception e) {
- throw new RuntimeException("abort failed in driver", e);
+ throw new RuntimeException("SinkAggregatedCommitter abort
failed in driver", e);
}
}
}
- private List<AggregatedCommitInfoT>
combineCommitMessage(WriterCommitMessage[] messages) {
- return Collections.singletonList(sinkAggregatedCommitter.combine(
- Arrays.stream(messages).map(m ->
((SparkWriterCommitMessage<CommitInfoT>) m).getMessage())
- .collect(Collectors.toList())));
+ private @Nonnull List<CommitInfoT> extractCommitInfo(WriterCommitMessage[]
messages) {
+ return Arrays.stream(messages)
+ .map(m -> ((SparkWriterCommitMessage<CommitInfoT>) m).getMessage())
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private @Nonnull List<AggregatedCommitInfoT>
combineCommitMessage(List<CommitInfoT> commitInfos) {
+ return
Collections.singletonList(sinkAggregatedCommitter.combine(commitInfos));
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index c79367a5..02272ae0 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
+import java.util.Optional;
public class SparkDataWriter<CommitInfoT, StateT> implements
DataWriter<InternalRow> {
@@ -40,6 +41,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements
DataWriter<Internal
@Nullable
private final SinkCommitter<CommitInfoT> sinkCommitter;
private final RowSerialization<InternalRow> rowSerialization;
+ private CommitInfoT latestCommitInfoT;
SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
SinkCommitter<CommitInfoT> sinkCommitter,
@@ -56,17 +58,40 @@ public class SparkDataWriter<CommitInfoT, StateT>
implements DataWriter<Internal
@Override
public WriterCommitMessage commit() throws IOException {
- CommitInfoT commitInfo = sinkWriter.prepareCommit();
+ // We combine the prepareCommit and commit in this method.
+ // If this method fails, we need to rollback the transaction in the
abort method.
+ // 1. prepareCommit fails:
+ // 1.1. We don't have the commit info, we need to execute the
sinkWriter#abort to rollback the transaction.
+ // 2. commit fails
+ // 2.1. We have the commit info, we need to execute the
sinkCommitter#abort to rollback the transaction.
+ Optional<CommitInfoT> commitInfoTOptional = sinkWriter.prepareCommit();
+ commitInfoTOptional.ifPresent(commitInfoT -> latestCommitInfoT =
commitInfoT);
if (sinkCommitter != null) {
- sinkCommitter.commit(Collections.singletonList(commitInfo));
+ if (latestCommitInfoT == null) {
+ sinkCommitter.commit(Collections.emptyList());
+ } else {
+
sinkCommitter.commit(Collections.singletonList(latestCommitInfoT));
+ }
}
- return new SparkWriterCommitMessage<>(commitInfo);
+ SparkWriterCommitMessage<CommitInfoT> sparkWriterCommitMessage = new
SparkWriterCommitMessage<>(latestCommitInfoT);
+ cleanCommitInfo();
+ return sparkWriterCommitMessage;
}
@Override
public void abort() throws IOException {
+ sinkWriter.abort();
if (sinkCommitter != null) {
- sinkCommitter.abort();
+ if (latestCommitInfoT == null) {
+ sinkCommitter.abort(Collections.emptyList());
+ } else {
+
sinkCommitter.abort(Collections.singletonList(latestCommitInfoT));
+ }
}
+ cleanCommitInfo();
+ }
+
+ private void cleanCommitInfo() {
+ latestCommitInfoT = null;
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
index b292282a..b6b8a5b6 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
@@ -19,9 +19,11 @@ package org.apache.seatunnel.translation.spark.sink;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import javax.annotation.Nullable;
+
public class SparkWriterCommitMessage<T> implements WriterCommitMessage {
- private T message;
+ private @Nullable T message;
SparkWriterCommitMessage(T message) {
this.message = message;