[
https://issues.apache.org/jira/browse/FLINK-23509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias Schwalbe updated FLINK-23509:
--------------------------------------
Description:
When recovering Kafka transactions from a snapshot, FlinkKafkaInternalProducer
overrides static final ProducerIdAndEpoch#NONE here:
[FlinkKafkaInternalProducer#resumeTransaction|https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229]
and consequently TransactionManager initializes transactions as new
transactions instead of recovered ones. Here:
[TransactionManager#initializeTransactions|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L332]
TransactionManager log (edited for readability):
{quote}{{[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Overriding the default enable.idempotence to true
since transactional.id is specified.
[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Instantiated a transactional producer.
[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Overriding the default retries config to the
recommended value of 2147483647 since the idempotent producer is enabled.
[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Overriding the default acks to all since
idempotence is enabled.
[Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Transition from state UNINITIALIZED to INITIALIZING
[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Invoking InitProducerId for the first time in
order to acquire a producer ID
[Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Enqueuing transactional request
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2',
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2',
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for
sending
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional
request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2',
keyType=1)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional
request InitProducerIdRequestData(transactionalId='Sink:
trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
producerEpoch=17)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', keyType=1)
dequeued for sending
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional
response FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0,
errorMessage='NONE', nodeId=3, host='ulxxtkafbrk03.adgr.net', port=9093) for
request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2',
keyType=1)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Discovered transaction coordinator
ulxxtkafbrk03.adgr.net:9093 (id: 3 rack: null)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2',
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for
sending
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional
response InitProducerIdResponseData(throttleTimeMs=0, errorCode=47,
producerId=-1, producerEpoch=-1) for request
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2',
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state
INITIALIZING to error state FATAL_ERROR
[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Closing the Kafka producer with timeoutMillis = 0
ms.
org.apache.kafka.common.KafkaException: Unexpected error in
InitProducerIdResponse; Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
at
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
at
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at java.lang.Thread.run(Thread.java:748)
}}
{quote}
Notice here "Invoking InitProducerId for the first time in order to acquire a
producer ID" a request for a new transaction (-1, -1) but below we see:
"Enqueuing transactional request
InitProducerIdRequestData(transactionalId='Sink: ...', ..., producerId=1545118,
producerEpoch=17)" because of changed ProducerIdAndEpoch#NONE
TransactionManager#initializeTransactions variables:
!2021-07-26_16-47-48.png!
Notice the values above which should be -1, -1.
Stack trace of TransactionManager#initializeTransactions:
{quote}initializeTransactions:314, TransactionManager
(org.apache.kafka.clients.producer.internals)
initializeTransactions:310, TransactionManager
(org.apache.kafka.clients.producer.internals)
initTransactions:591, KafkaProducer (org.apache.kafka.clients.producer)
initTransactions:88, FlinkKafkaInternalProducer
(org.apache.flink.streaming.connectors.kafka.internals)
recoverAndAbort:1060, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
recoverAndAbort:99, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
initializeState:371, TwoPhaseCommitSinkFunction
(org.apache.flink.streaming.api.functions.sink)
initializeState:1195, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
tryRestoreFunction:189, StreamingFunctionUtils
(org.apache.flink.streaming.util.functions)
restoreFunctionState:171, StreamingFunctionUtils
(org.apache.flink.streaming.util.functions)
initializeState:96, AbstractUdfStreamOperator
(org.apache.flink.streaming.api.operators)
initializeOperatorState:118, StreamOperatorStateHandler
(org.apache.flink.streaming.api.operators)
initializeState:290, AbstractStreamOperator
(org.apache.flink.streaming.api.operators)
initializeStateAndOpenOperators:436, OperatorChain
(org.apache.flink.streaming.runtime.tasks)
restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
call:-1, 412600778
(org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
call:55, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
doRun:756, Task (org.apache.flink.runtime.taskmanager)
run:563, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
{quote}
Stack trace of FlinkKafkaInternalProducer#resumeTransaction when the values are
overridden:
{quote}resumeTransaction:204, FlinkKafkaInternalProducer
(org.apache.flink.streaming.connectors.kafka.internals)
resumeTransaction:196, KafkaProducerJobSink$$anon$1$$anon$2
(ch.viseca.flink.connectors.kafka.sinks)
recoverAndCommit:1029, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
recoverAndCommit:99, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
recoverAndCommitInternal:414, TwoPhaseCommitSinkFunction
(org.apache.flink.streaming.api.functions.sink)
initializeState:364, TwoPhaseCommitSinkFunction
(org.apache.flink.streaming.api.functions.sink)
initializeState:1195, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
tryRestoreFunction:189, StreamingFunctionUtils
(org.apache.flink.streaming.util.functions)
restoreFunctionState:171, StreamingFunctionUtils
(org.apache.flink.streaming.util.functions)
initializeState:96, AbstractUdfStreamOperator
(org.apache.flink.streaming.api.operators)
initializeOperatorState:118, StreamOperatorStateHandler
(org.apache.flink.streaming.api.operators)
initializeState:290, AbstractStreamOperator
(org.apache.flink.streaming.api.operators)
initializeStateAndOpenOperators:436, OperatorChain
(org.apache.flink.streaming.runtime.tasks)
restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
call:-1, 412600778
(org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
call:55, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
doRun:756, Task (org.apache.flink.runtime.taskmanager)
run:563, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
{quote}
Background:
* we recently upgraded from Flink 1.8.0 to 1.13.0
* FlinkKafkaInternalProducer#resumeTransaction has not changed between these
versions, however
* in Flink 1.8.0 we never observed any resumable transaction as part of a
checkpoint
* we could not determine what actually made the change that causes the
failure, however:
* it would probably much saver to instead over assigning new values to an
arbitrary ProducerIdAndEpoch held by TransactionManager to instead assign a
fresh ProducerIdAndEpoch and avoid overriding ProducerIdAndEpoch#NONE
Sample workaround (scala):
{quote}val sink = new FlinkKafkaProducer[T](val sink = new
FlinkKafkaProducer[T]( defaultTopic, schema, getProperties, getSemantic,
getProducerPoolSize) {
override protected def createProducer:
FlinkKafkaInternalProducer[Array[Byte], Array[Byte]] = { new
FlinkKafkaInternalProducer[Array[Byte], Array[Byte]](this.producerConfig){
override def sendOffsetsToTransaction(offsets: util.Map[TopicPartition,
OffsetAndMetadata], groupMetadata: ConsumerGroupMetadata): Unit =
Unknown macro: \{ super.sendOffsetsToTransaction(offsets,
groupMetadata.groupId()) }
override def resumeTransaction(producerId: Long, epoch: Short): Unit = {
val transactionManager =
FlinkKafkaInternalProducer.getField(kafkaProducer, "transactionManager")
transactionManager.synchronized
Unknown macro: \{ val producerIdAndEpoch =
FlinkKafkaInternalProducer.getField(transactionManager,
"producerIdAndEpoch").asInstanceOf[ProducerIdAndEpoch]
FlinkKafkaInternalProducer.invoke(transactionManager, "setProducerIdAndEpoch",
new ProducerIdAndEpoch(producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch)) }
super.resumeTransaction(producerId, epoch) } } }}
{quote}
was:
When recovering Kafka transactions from a snapshot, FlinkKafkaInternalProducer
overrides static final ProducerIdAndEpoch#NONE here:
[FlinkKafkaInternalProducer#resumeTransaction|https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229]
and consequently TransactionManager initializes transactions as new
transactions instead of recovered ones. Here:
[TransactionManager#initializeTransactions|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L332]
TransactionManager log (edited for readability):
{quote}{{[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Overriding the default enable.idempotence to true
since transactional.id is specified.
[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Instantiated a transactional producer.
[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Overriding the default retries config to the
recommended value of 2147483647 since the idempotent producer is enabled.
[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Overriding the default acks to all since
idempotence is enabled.
[Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Transition from state UNINITIALIZED to INITIALIZING
[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Invoking InitProducerId for the first time in
order to acquire a producer ID
[Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Enqueuing transactional request
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2',
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2',
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for
sending
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional
request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2',
keyType=1)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional
request InitProducerIdRequestData(transactionalId='Sink:
trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
producerEpoch=17)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', keyType=1)
dequeued for sending
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional
response FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0,
errorMessage='NONE', nodeId=3, host='ulxxtkafbrk03.adgr.net', port=9093) for
request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2',
keyType=1)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Discovered transaction coordinator
ulxxtkafbrk03.adgr.net:9093 (id: 3 rack: null)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2',
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for
sending
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional
response InitProducerIdResponseData(throttleTimeMs=0, errorCode=47,
producerId=-1, producerEpoch=-1) for request
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2',
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
[kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state
INITIALIZING to error state FATAL_ERROR
[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
trxRollupKafkaSink-...8b6-2] Closing the Kafka producer with timeoutMillis = 0
ms.
org.apache.kafka.common.KafkaException: Unexpected error in
InitProducerIdResponse; Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
at
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
at
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at java.lang.Thread.run(Thread.java:748)
}}
{quote}
TransactionManager#initializeTransactions variables:
!2021-07-26_16-47-48.png!
Notice the values above which should be -1, -1.
Stack trace of TransactionManager#initializeTransactions:
{quote}initializeTransactions:314, TransactionManager
(org.apache.kafka.clients.producer.internals)
initializeTransactions:310, TransactionManager
(org.apache.kafka.clients.producer.internals)
initTransactions:591, KafkaProducer (org.apache.kafka.clients.producer)
initTransactions:88, FlinkKafkaInternalProducer
(org.apache.flink.streaming.connectors.kafka.internals)
recoverAndAbort:1060, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
recoverAndAbort:99, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
initializeState:371, TwoPhaseCommitSinkFunction
(org.apache.flink.streaming.api.functions.sink)
initializeState:1195, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
tryRestoreFunction:189, StreamingFunctionUtils
(org.apache.flink.streaming.util.functions)
restoreFunctionState:171, StreamingFunctionUtils
(org.apache.flink.streaming.util.functions)
initializeState:96, AbstractUdfStreamOperator
(org.apache.flink.streaming.api.operators)
initializeOperatorState:118, StreamOperatorStateHandler
(org.apache.flink.streaming.api.operators)
initializeState:290, AbstractStreamOperator
(org.apache.flink.streaming.api.operators)
initializeStateAndOpenOperators:436, OperatorChain
(org.apache.flink.streaming.runtime.tasks)
restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
call:-1, 412600778
(org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
call:55, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
doRun:756, Task (org.apache.flink.runtime.taskmanager)
run:563, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
{quote}
Stack trace of FlinkKafkaInternalProducer#resumeTransaction when the values are
overridden:
{quote}resumeTransaction:204, FlinkKafkaInternalProducer
(org.apache.flink.streaming.connectors.kafka.internals)
resumeTransaction:196, KafkaProducerJobSink$$anon$1$$anon$2
(ch.viseca.flink.connectors.kafka.sinks)
recoverAndCommit:1029, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
recoverAndCommit:99, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
recoverAndCommitInternal:414, TwoPhaseCommitSinkFunction
(org.apache.flink.streaming.api.functions.sink)
initializeState:364, TwoPhaseCommitSinkFunction
(org.apache.flink.streaming.api.functions.sink)
initializeState:1195, FlinkKafkaProducer
(org.apache.flink.streaming.connectors.kafka)
tryRestoreFunction:189, StreamingFunctionUtils
(org.apache.flink.streaming.util.functions)
restoreFunctionState:171, StreamingFunctionUtils
(org.apache.flink.streaming.util.functions)
initializeState:96, AbstractUdfStreamOperator
(org.apache.flink.streaming.api.operators)
initializeOperatorState:118, StreamOperatorStateHandler
(org.apache.flink.streaming.api.operators)
initializeState:290, AbstractStreamOperator
(org.apache.flink.streaming.api.operators)
initializeStateAndOpenOperators:436, OperatorChain
(org.apache.flink.streaming.runtime.tasks)
restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
call:-1, 412600778
(org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
call:55, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
doRun:756, Task (org.apache.flink.runtime.taskmanager)
run:563, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
{quote}
Background:
* we recently upgraded from Flink 1.8.0 to 1.13.0
* FlinkKafkaInternalProducer#resumeTransaction has not changed between these
versions, however
* in Flink 1.8.0 we never observed any resumable transaction as part of a
checkpoint
* we could not determine what actually made the change that causes the
failure, however:
* it would probably much saver to instead over assigning new values to an
arbitrary ProducerIdAndEpoch held by TransactionManager to instead assign a
fresh ProducerIdAndEpoch and avoid overriding ProducerIdAndEpoch#NONE
Sample workaround (scala):
{quote}val sink = new FlinkKafkaProducer[T](val sink = new
FlinkKafkaProducer[T]( defaultTopic, schema, getProperties, getSemantic,
getProducerPoolSize) {
override protected def createProducer:
FlinkKafkaInternalProducer[Array[Byte], Array[Byte]] = \{ new
FlinkKafkaInternalProducer[Array[Byte], Array[Byte]](this.producerConfig){
override def sendOffsetsToTransaction(offsets: util.Map[TopicPartition,
OffsetAndMetadata], groupMetadata: ConsumerGroupMetadata): Unit = {
super.sendOffsetsToTransaction(offsets, groupMetadata.groupId()) }
override def resumeTransaction(producerId: Long, epoch: Short): Unit = \{
val transactionManager =
FlinkKafkaInternalProducer.getField(kafkaProducer, "transactionManager")
transactionManager.synchronized { val producerIdAndEpoch =
FlinkKafkaInternalProducer.getField(transactionManager,
"producerIdAndEpoch").asInstanceOf[ProducerIdAndEpoch]
FlinkKafkaInternalProducer.invoke(transactionManager, "setProducerIdAndEpoch",
new ProducerIdAndEpoch(producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch)) } super.resumeTransaction(producerId,
epoch) } } }}
{quote}
> FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE
> during transaction recovery (fails)
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-23509
> URL: https://issues.apache.org/jira/browse/FLINK-23509
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.0
> Reporter: Matthias Schwalbe
> Priority: Major
> Attachments: 2021-07-26_16-47-48.png
>
>
> When recovering Kafka transactions from a snapshot,
> FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE
> here:
> [FlinkKafkaInternalProducer#resumeTransaction|https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229]
> and consequently TransactionManager initializes transactions as new
> transactions instead of recovered ones. Here:
> [TransactionManager#initializeTransactions|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L332]
> TransactionManager log (edited for readability):
> {quote}{{[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Overriding the default enable.idempotence to
> true since transactional.id is specified.
> [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Instantiated a transactional producer.
> [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Overriding the default retries config to the
> recommended value of 2147483647 since the idempotent producer is enabled.
> [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Overriding the default acks to all since
> idempotence is enabled.
> [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Transition from state UNINITIALIZED to
> INITIALIZING
> [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Invoking InitProducerId for the first time in
> order to acquire a producer ID
> [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Enqueuing transactional request
> InitProducerIdRequestData(transactionalId='Sink:
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
> producerEpoch=17)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
> InitProducerIdRequestData(transactionalId='Sink:
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
> producerEpoch=17) dequeued for sending
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional
> request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2',
> keyType=1)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional
> request InitProducerIdRequestData(transactionalId='Sink:
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
> producerEpoch=17)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
> FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2',
> keyType=1) dequeued for sending
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional
> response FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0,
> errorMessage='NONE', nodeId=3, host='ulxxtkafbrk03.adgr.net', port=9093) for
> request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2',
> keyType=1)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> INFO org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Discovered transaction
> coordinator ulxxtkafbrk03.adgr.net:9093 (id: 3 rack: null)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
> InitProducerIdRequestData(transactionalId='Sink:
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
> producerEpoch=17) dequeued for sending
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional
> response InitProducerIdResponseData(throttleTimeMs=0, errorCode=47,
> producerId=-1, producerEpoch=-1) for request
> InitProducerIdRequestData(transactionalId='Sink:
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
> producerEpoch=17)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state
> INITIALIZING to error state FATAL_ERROR
> [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Closing the Kafka producer with timeoutMillis =
> 0 ms.
> org.apache.kafka.common.KafkaException: Unexpected error in
> InitProducerIdResponse; Producer attempted an operation with an old epoch.
> Either there is a newer producer with the same transactionalId, or the
> producer's transaction has been expired by the broker.
> at
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
> at java.lang.Thread.run(Thread.java:748)
> }}
> {quote}
> Notice here "Invoking InitProducerId for the first time in order to acquire
> a producer ID" a request for a new transaction (-1, -1) but below we see:
> "Enqueuing transactional request
> InitProducerIdRequestData(transactionalId='Sink: ...', ...,
> producerId=1545118, producerEpoch=17)" because of changed
> ProducerIdAndEpoch#NONE
>
> TransactionManager#initializeTransactions variables:
> !2021-07-26_16-47-48.png!
> Notice the values above which should be -1, -1.
> Stack trace of TransactionManager#initializeTransactions:
> {quote}initializeTransactions:314, TransactionManager
> (org.apache.kafka.clients.producer.internals)
> initializeTransactions:310, TransactionManager
> (org.apache.kafka.clients.producer.internals)
> initTransactions:591, KafkaProducer (org.apache.kafka.clients.producer)
> initTransactions:88, FlinkKafkaInternalProducer
> (org.apache.flink.streaming.connectors.kafka.internals)
> recoverAndAbort:1060, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> recoverAndAbort:99, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> initializeState:371, TwoPhaseCommitSinkFunction
> (org.apache.flink.streaming.api.functions.sink)
> initializeState:1195, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> tryRestoreFunction:189, StreamingFunctionUtils
> (org.apache.flink.streaming.util.functions)
> restoreFunctionState:171, StreamingFunctionUtils
> (org.apache.flink.streaming.util.functions)
> initializeState:96, AbstractUdfStreamOperator
> (org.apache.flink.streaming.api.operators)
> initializeOperatorState:118, StreamOperatorStateHandler
> (org.apache.flink.streaming.api.operators)
> initializeState:290, AbstractStreamOperator
> (org.apache.flink.streaming.api.operators)
> initializeStateAndOpenOperators:436, OperatorChain
> (org.apache.flink.streaming.runtime.tasks)
> restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
> call:-1, 412600778
> (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
> call:55, StreamTaskActionExecutor$1
> (org.apache.flink.streaming.runtime.tasks)
> restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
> doRun:756, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)
> {quote}
>
> Stack trace of FlinkKafkaInternalProducer#resumeTransaction when the values
> are overridden:
> {quote}resumeTransaction:204, FlinkKafkaInternalProducer
> (org.apache.flink.streaming.connectors.kafka.internals)
> resumeTransaction:196, KafkaProducerJobSink$$anon$1$$anon$2
> (ch.viseca.flink.connectors.kafka.sinks)
> recoverAndCommit:1029, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> recoverAndCommit:99, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> recoverAndCommitInternal:414, TwoPhaseCommitSinkFunction
> (org.apache.flink.streaming.api.functions.sink)
> initializeState:364, TwoPhaseCommitSinkFunction
> (org.apache.flink.streaming.api.functions.sink)
> initializeState:1195, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> tryRestoreFunction:189, StreamingFunctionUtils
> (org.apache.flink.streaming.util.functions)
> restoreFunctionState:171, StreamingFunctionUtils
> (org.apache.flink.streaming.util.functions)
> initializeState:96, AbstractUdfStreamOperator
> (org.apache.flink.streaming.api.operators)
> initializeOperatorState:118, StreamOperatorStateHandler
> (org.apache.flink.streaming.api.operators)
> initializeState:290, AbstractStreamOperator
> (org.apache.flink.streaming.api.operators)
> initializeStateAndOpenOperators:436, OperatorChain
> (org.apache.flink.streaming.runtime.tasks)
> restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
> call:-1, 412600778
> (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
> call:55, StreamTaskActionExecutor$1
> (org.apache.flink.streaming.runtime.tasks)
> restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
> doRun:756, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)
> {quote}
>
> Background:
> * we recently upgraded from Flink 1.8.0 to 1.13.0
> * FlinkKafkaInternalProducer#resumeTransaction has not changed between these
> versions, however
> * in Flink 1.8.0 we never observed any resumable transaction as part of a
> checkpoint
> * we could not determine what actually made the change that causes the
> failure, however:
> * it would probably much saver to instead over assigning new values to an
> arbitrary ProducerIdAndEpoch held by TransactionManager to instead assign a
> fresh ProducerIdAndEpoch and avoid overriding ProducerIdAndEpoch#NONE
> Sample workaround (scala):
> {quote}val sink = new FlinkKafkaProducer[T](val sink = new
> FlinkKafkaProducer[T]( defaultTopic, schema, getProperties, getSemantic,
> getProducerPoolSize) {
> override protected def createProducer:
> FlinkKafkaInternalProducer[Array[Byte], Array[Byte]] = { new
> FlinkKafkaInternalProducer[Array[Byte], Array[Byte]](this.producerConfig){
> override def sendOffsetsToTransaction(offsets: util.Map[TopicPartition,
> OffsetAndMetadata], groupMetadata: ConsumerGroupMetadata): Unit =
> Unknown macro: \{ super.sendOffsetsToTransaction(offsets,
> groupMetadata.groupId()) }
> override def resumeTransaction(producerId: Long, epoch: Short): Unit =
> { val transactionManager =
> FlinkKafkaInternalProducer.getField(kafkaProducer, "transactionManager")
> transactionManager.synchronized
> Unknown macro: \{ val producerIdAndEpoch =
> FlinkKafkaInternalProducer.getField(transactionManager,
> "producerIdAndEpoch").asInstanceOf[ProducerIdAndEpoch]
> FlinkKafkaInternalProducer.invoke(transactionManager,
> "setProducerIdAndEpoch", new
> ProducerIdAndEpoch(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch))
> }
> super.resumeTransaction(producerId, epoch) } } }}
> {quote}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)