Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5030#discussion_r151976605
--- Diff:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
---
@@ -720,11 +719,10 @@ protected void abort(KafkaTransactionState
transaction) {
protected void recoverAndAbort(KafkaTransactionState transaction) {
switch (semantic) {
case EXACTLY_ONCE:
- FlinkKafkaProducer<byte[], byte[]> producer =
-
initTransactionalProducer(transaction.transactionalId, false);
-
producer.resumeTransaction(transaction.producerId, transaction.epoch);
- producer.abortTransaction();
- producer.close();
+ try (FlinkKafkaProducer<byte[], byte[]>
producer =
+
initTransactionalProducer(transaction.transactionalId, false)) {
+ producer.initTransactions();
--- End diff --
I see, thanks! I will merge this now. ð
---