zhuming created FLINK-33692:
-------------------------------
Summary: FlinkKafkaProducer could miss super.close
Key: FLINK-33692
URL: https://issues.apache.org/jira/browse/FLINK-33692
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.17.1, 1.13.6
Reporter: zhuming
When flink job restarted 、canceled or failed. It will execute close method. But
in following FlinkKafkaProducer source code. If flush or close throw
InterruptedExecption. super.close method must be missed.
{code:java}
@Override
public void close() throws FlinkKafkaException {
// First close the producer for current transaction.
try {
final KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null) {
// to avoid exceptions on aborting transactions with some pending
records
flush(currentTransaction);
// normal abort for AT_LEAST_ONCE and NONE do not clean up
resources because of
// producer reusing, thus
// we need to close it manually
switch (semantic) {
case EXACTLY_ONCE:
break;
case AT_LEAST_ONCE:
case NONE:
currentTransaction.producer.flush();
currentTransaction.producer.close(Duration.ofSeconds(0));
break;
}
}
// If flush() or close() wasinterrupted, super.close might be missed.
super.close();
} catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
} finally {
// We may have to close producer of the current transaction in case
some exception was
// thrown before
// the normal close routine finishes.
if (currentTransaction() != null) {
try {
currentTransaction().producer.close(Duration.ofSeconds(0));
} catch (Throwable t) {
LOG.warn("Error closing producer.", t);
}
}
// Make sure all the producers for pending transactions are closed.
pendingTransactions()
.forEach(
transaction -> {
try {
transaction.getValue().producer.close(Duration.ofSeconds(0));
} catch (Throwable t) {
LOG.warn("Error closing producer.", t);
}
});
// make sure we propagate pending errors
checkErroneous();
}
} {code}
super.close() method is to execute following code. It ensures
'{*}KafkaTransactionState{*}' released correctlly.
{code:java}
@Override
public void close() throws Exception {
super.close();
if (currentTransactionHolder != null) {
abort(currentTransactionHolder.handle);
currentTransactionHolder = null;
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)