[
https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624315#comment-17624315
]
Yordan Pavlov commented on FLINK-16419:
---------------------------------------
Let me try to provide some more info on how the error happens in our case. I
have enabled Kafka debug logging and grepped only for the transaction ID which
would eventually fail, attaching the log [^tm0-transaction.log]
What we see inside is:
{noformat}
Attempting to resume transaction eth_stacks_v2_clickhouse-0-1-46 with
producerId 3762812 and epoch 0
...
Sending transactional request
EndTxnRequestData(transactionalId='eth_stacks_v2_clickhouse-0-1-46',
producerId=3762812, producerEpoch=0, committed=true)
...
Transiting to abortable error state due to
org.apache.kafka.common.errors.InvalidPidMappingException: The producer
attempted to use a producer id which is not currently assigned to its
transactional id.
{noformat}
I believe this transaction ID is being recovered from the savepoint.
Using the Kafka CLI tools and listing transactions I would get:
{noformat}
bin/kafka-transactions.sh --bootstrap-server kafka-hz.stage.san:30911 list
TransactionalId Coordinator ProducerId
TransactionState
eth_stacks_v2_clickhouse-0-1-46 0 7891435
Empty
{noformat}
Describing producers for this topic would give me:
{noformat}
kafka-transactions.sh describe-producers --topic eth_stacks_v2_clickhouse-0
--partition 0
ProducerId ProducerEpoch LatestCoordinatorEpoch LastSequence LastTimestamp
CurrentTransactionStartOffset
7182745 5 344 27023
1666768803527 None
7890919 0 350 297260
1666767581927 None
7892240 0 319 107588
1666767555718 None {noformat}
The way I interpret the above is that there is not Ongoing transaction for this
topic on the side of the Kafka broker. [~martijnvisser] Let me try if I can
test something more.
> Avoid to recommit transactions which are known committed successfully to
> Kafka upon recovery
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-16419
> URL: https://issues.apache.org/jira/browse/FLINK-16419
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka, Runtime / Checkpointing
> Reporter: Jun Qin
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor,
> usability
> Attachments: tm0-transaction.log
>
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer
> tries to recommit all pre-committed transactions which are in the snapshot,
> even if those transactions were successfully committed before (i.e., the call
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}}
> returns OK). This may lead to recovery failures when recovering from a very
> old snapshot because the transactional IDs in that snapshot may have been
> expired and removed from Kafka. For example the following scenario:
> # Start a Flink job with FlinkKafkaProducer sink with exactly-once
> # Suspend the Flink job with a savepoint A
> # Wait for time longer than {{transactional.id.expiration.ms}} +
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
> # Recover the job with savepoint A.
> # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
> - Attempting to resume transaction Source: Custom Source -> Sink:
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch
> 1202020-02-26 14:33:25,914 INFO org.apache.kafka.clients.Metadata
> - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO org.apache.kafka.clients.producer.KafkaProducer
> - [Producer clientId=producer-1, transactionalId=Source: Custom
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source -> Sink: Unnamed (1/1)
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse:
> The producer attempted to use a producer id which is not currently assigned
> to its transactional id.
> at
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as
> it may hide real transaction timeout errors.
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible
> way is to let JobManager, after successfully notifies all operators the
> completion of a snapshot (via {{notifyCheckpoingComplete}}), record the
> success, e.g., write the successful transactional IDs somewhere in the
> snapshot. Then those transactions need not recommit upon recovery.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)