[ 
https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-16419:
-----------------------------------
    Labels: auto-deprioritized-major stale-minor usability  (was: 
auto-deprioritized-major usability)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Avoid to recommit transactions which are known committed successfully to 
> Kafka upon recovery
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-16419
>                 URL: https://issues.apache.org/jira/browse/FLINK-16419
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Runtime / Checkpointing
>            Reporter: Jun Qin
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor, usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
> tries to recommit all pre-committed transactions which are in the snapshot, 
> even if those transactions were successfully committed before (i.e., the call 
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
> returns OK). This may lead to recovery failures when recovering from a very 
> old snapshot because the transactional IDs in that snapshot may have been 
> expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>   - Attempting to resume transaction Source: Custom Source -> Sink: 
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
> 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata            
>                 - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>              - [Producer clientId=producer-1, transactionalId=Source: Custom 
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task       
>              - Source: Custom Source -> Sink: Unnamed (1/1) 
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call 
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as 
> it may hide real transaction timeout errors. 
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
> way is to let JobManager, after successfully notifies all operators the 
> completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
> success, e.g., write the successful transactional IDs somewhere in the 
> snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to