[
https://issues.apache.org/jira/browse/FLINK-33692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martijn Visser closed FLINK-33692.
----------------------------------
Resolution: Invalid
See PR; we believe the current code is fine, and we haven't had a response from
the original reporter
> FlinkKafkaProducer could miss super.close
> -----------------------------------------
>
> Key: FLINK-33692
> URL: https://issues.apache.org/jira/browse/FLINK-33692
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.6, 1.17.1
> Reporter: zhuming
> Priority: Major
> Labels: pull-request-available
> Original Estimate: 96h
> Remaining Estimate: 96h
>
>
> When flink job restarted 、canceled or failed. It will execute close method.
> But in following FlinkKafkaProducer source code. If flush or close throw
> InterruptedExecption. super.close method must be missed.
> {code:java}
> @Override
> public void close() throws FlinkKafkaException {
> // First close the producer for current transaction.
> try {
> final KafkaTransactionState currentTransaction = currentTransaction();
> if (currentTransaction != null) {
> // to avoid exceptions on aborting transactions with some pending
> records
> flush(currentTransaction);
> // normal abort for AT_LEAST_ONCE and NONE do not clean up
> resources because of
> // producer reusing, thus
> // we need to close it manually
> switch (semantic) {
> case EXACTLY_ONCE:
> break;
> case AT_LEAST_ONCE:
> case NONE:
> currentTransaction.producer.flush();
> currentTransaction.producer.close(Duration.ofSeconds(0));
> break;
> }
> }
> // If flush() or close() wasinterrupted, super.close might be missed.
> super.close();
> } catch (Exception e) {
> asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
> } finally {
> // We may have to close producer of the current transaction in case
> some exception was
> // thrown before
> // the normal close routine finishes.
> if (currentTransaction() != null) {
> try {
> currentTransaction().producer.close(Duration.ofSeconds(0));
> } catch (Throwable t) {
> LOG.warn("Error closing producer.", t);
> }
> }
> // Make sure all the producers for pending transactions are closed.
> pendingTransactions()
> .forEach(
> transaction -> {
> try {
>
> transaction.getValue().producer.close(Duration.ofSeconds(0));
> } catch (Throwable t) {
> LOG.warn("Error closing producer.", t);
> }
> });
> // make sure we propagate pending errors
> checkErroneous();
> }
> } {code}
> super.close() method is to execute following code. It ensures
> '{*}KafkaTransactionState{*}' released correctlly.
> {code:java}
> @Override
> public void close() throws Exception {
> super.close();
> if (currentTransactionHolder != null) {
> abort(currentTransactionHolder.handle);
> currentTransactionHolder = null;
> }
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)