[ 
https://issues.apache.org/jira/browse/FLINK-23509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Paul updated FLINK-23509:
--------------------------------
    Component/s: Connectors / Kafka

> FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE 
> during transaction recovery (fails)
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-23509
>                 URL: https://issues.apache.org/jira/browse/FLINK-23509
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.0
>            Reporter: Matthias Schwalbe
>            Priority: Major
>
> When recovering Kafka transactions from a snapshot, 
> FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE 
> here:
> [FlinkKafkaInternalProducer#resumeTransaction|https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229]
> and consequently TransactionManager initializes transactions as new 
> transactions instead of recovered ones. Here:
> [TransactionManager#initializeTransactions|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L332]
> TransactionManager log (edited for readability):
> {{[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.KafkaProducer - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Overriding the default enable.idempotence to 
> true since transactional.id is specified.
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.KafkaProducer - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Instantiated a transactional producer.
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.KafkaProducer - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Overriding the default retries config to the 
> recommended value of 2147483647 since the idempotent producer is enabled.
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.KafkaProducer - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Overriding the default acks to all since 
> idempotence is enabled.
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG 
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Transition from state UNINITIALIZED to 
> INITIALIZING
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Invoking InitProducerId for the first time in 
> order to acquire a producer ID
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG 
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Enqueuing transactional request 
> InitProducerIdRequestData(transactionalId='Sink: 
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
> producerEpoch=17)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
> InitProducerIdRequestData(transactionalId='Sink: 
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
> producerEpoch=17) dequeued for sending
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional 
> request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', 
> keyType=1)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional 
> request InitProducerIdRequestData(transactionalId='Sink: 
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
> producerEpoch=17)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
> FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', 
> keyType=1) dequeued for sending
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional 
> response FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, 
> errorMessage='NONE', nodeId=3, host='ulxxtkafbrk03.adgr.net', port=9093) for 
> request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', 
> keyType=1)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> INFO org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Discovered transaction 
> coordinator ulxxtkafbrk03.adgr.net:9093 (id: 3 rack: null)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
> InitProducerIdRequestData(transactionalId='Sink: 
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
> producerEpoch=17) dequeued for sending
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional 
> response InitProducerIdResponseData(throttleTimeMs=0, errorCode=47, 
> producerId=-1, producerEpoch=-1) for request 
> InitProducerIdRequestData(transactionalId='Sink: 
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
> producerEpoch=17)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state 
> INITIALIZING to error state FATAL_ERROR
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.KafkaProducer - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Closing the Kafka producer with timeoutMillis = 
> 0 ms.
>  org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; Producer attempted an operation with an old epoch. 
> Either there is a newer producer with the same transactionalId, or the 
> producer's transaction has been expired by the broker.
>  at 
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
>  at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>  at java.lang.Thread.run(Thread.java:748)
>  }}
>  
> ...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to