[ 
https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17622315#comment-17622315
 ] 

Yordan Pavlov commented on FLINK-16419:
---------------------------------------

Again, thanks for you input [~martijnvisser] 

> 1. When you're executing a stop-with-savepoint, there is a guarantee that 
> transactions are committed.

Thanks for verifying, those are also our observations. However, once the job is 
stopped it would then need to re-fetch the whole savepoint again to resume, 
this can be very slow in our setup and cause huge latency on the normal flow of 
data processing. Is there a way how we can stop (close all transactions that 
is) and start the job without loosing the state?

> 2. For regular savepoints, there is a very small chance that 
> notifyCheckpointCompleted() has been lost. Virtually nil, but theoretically 
> possible.

What you are describing sounds like some rare race condition event, this does 
not match our observation. On recovery from many of the savepoints, Flink would 
try to recover a previous transaction producer and end with the error:

 
{code:java}
The producer attempted to use a producer id which is not currently assigned to 
its transactional id.{code}
In the previous discussions on this ticket I was left with the impression that 
this is expected behavior and not a bug due to notifyCheckpointCompleted() 
being lost. If we establish that this is behavior is not expected I can come 
back with a sequence of events that reproduce the problem.

 

> 3. If after a savepoint there was at least one following successfull 
> checkpoint, then the chances that same subtask lost two 
> notifyCheckpointCompleted() calls in a row is nil^2

In our case there are thousands of checkpoints (and savepoints) after the 
savepoint we recover from. And all those transactions are closed as we are 
seeing the data in Kafka, reading only commited. Still on recovery from the old 
savepoint some re-play is taking place which causes the problem.

> Avoid to recommit transactions which are known committed successfully to 
> Kafka upon recovery
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-16419
>                 URL: https://issues.apache.org/jira/browse/FLINK-16419
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Runtime / Checkpointing
>            Reporter: Jun Qin
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
> tries to recommit all pre-committed transactions which are in the snapshot, 
> even if those transactions were successfully committed before (i.e., the call 
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
> returns OK). This may lead to recovery failures when recovering from a very 
> old snapshot because the transactional IDs in that snapshot may have been 
> expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>   - Attempting to resume transaction Source: Custom Source -> Sink: 
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
> 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata            
>                 - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>              - [Producer clientId=producer-1, transactionalId=Source: Custom 
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task       
>              - Source: Custom Source -> Sink: Unnamed (1/1) 
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call 
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as 
> it may hide real transaction timeout errors. 
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
> way is to let JobManager, after successfully notifies all operators the 
> completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
> success, e.g., write the successful transactional IDs somewhere in the 
> snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to