Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5030#discussion_r151940978
--- Diff:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
---
@@ -720,11 +719,10 @@ protected void abort(KafkaTransactionState
transaction) {
protected void recoverAndAbort(KafkaTransactionState transaction) {
switch (semantic) {
case EXACTLY_ONCE:
- FlinkKafkaProducer<byte[], byte[]> producer =
-
initTransactionalProducer(transaction.transactionalId, false);
-
producer.resumeTransaction(transaction.producerId, transaction.epoch);
- producer.abortTransaction();
- producer.close();
+ try (FlinkKafkaProducer<byte[], byte[]>
producer =
+
initTransactionalProducer(transaction.transactionalId, false)) {
+ producer.initTransactions();
--- End diff --
`initTransaction()` does the same thing as resuming and aborting previous
transactions, but in a safer way, since `resumeTransaction()` is part of our
`KafkaProducer`s extension.
---