[
https://issues.apache.org/jira/browse/FLINK-23509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387882#comment-17387882
]
Fabian Paul commented on FLINK-23509:
-------------------------------------
Hi [~Matthias Schwalbe],
Unfortunately, we cannot easily set a new epoch and producer id because it
might implicate data loss. It is important to commit all pending transactions
from either previous checkpoints (which have not been completed successfully
yet) or from other subtasks in case of a scale-in event.
I think the problem you are seeing is related to
https://issues.apache.org/jira/browse/FLINK-16419
> FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE
> during transaction recovery (fails)
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-23509
> URL: https://issues.apache.org/jira/browse/FLINK-23509
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.0
> Reporter: Matthias Schwalbe
> Priority: Major
> Attachments: 2021-07-26_16-47-48.png
>
>
> When recovering Kafka transactions from a snapshot,
> FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE
> here:
> [FlinkKafkaInternalProducer#resumeTransaction|https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229]
> and consequently TransactionManager initializes transactions as new
> transactions instead of recovered ones. Here:
> [TransactionManager#initializeTransactions|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L332]
> TransactionManager log (edited for readability):
> {quote}{{[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Overriding the default enable.idempotence to
> true since transactional.id is specified.
> [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Instantiated a transactional producer.
> [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Overriding the default retries config to the
> recommended value of 2147483647 since the idempotent producer is enabled.
> [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Overriding the default acks to all since
> idempotence is enabled.
> [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Transition from state UNINITIALIZED to
> INITIALIZING
> [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Invoking InitProducerId for the first time in
> order to acquire a producer ID
> [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Enqueuing transactional request
> InitProducerIdRequestData(transactionalId='Sink:
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
> producerEpoch=17)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
> InitProducerIdRequestData(transactionalId='Sink:
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
> producerEpoch=17) dequeued for sending
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional
> request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2',
> keyType=1)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional
> request InitProducerIdRequestData(transactionalId='Sink:
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
> producerEpoch=17)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
> FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2',
> keyType=1) dequeued for sending
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional
> response FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0,
> errorMessage='NONE', nodeId=3, host='ulxxtkafbrk03.adgr.net', port=9093) for
> request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2',
> keyType=1)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> INFO org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Discovered transaction
> coordinator ulxxtkafbrk03.adgr.net:9093 (id: 3 rack: null)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request
> InitProducerIdRequestData(transactionalId='Sink:
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
> producerEpoch=17) dequeued for sending
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional
> response InitProducerIdResponseData(throttleTimeMs=0, errorCode=47,
> producerId=-1, producerEpoch=-1) for request
> InitProducerIdRequestData(transactionalId='Sink:
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118,
> producerEpoch=17)
> [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2]
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2,
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state
> INITIALIZING to error state FATAL_ERROR
> [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink:
> trxRollupKafkaSink-...8b6-2] Closing the Kafka producer with timeoutMillis =
> 0 ms.
> org.apache.kafka.common.KafkaException: Unexpected error in
> InitProducerIdResponse; Producer attempted an operation with an old epoch.
> Either there is a newer producer with the same transactionalId, or the
> producer's transaction has been expired by the broker.
> at
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
> at java.lang.Thread.run(Thread.java:748)
> }}
> {quote}
> Notice here "Invoking InitProducerId for the first time in order to acquire
> a producer ID" indicates a request for a new transaction (-1, -1) but below
> we see instead: "Enqueuing transactional request
> InitProducerIdRequestData(transactionalId='Sink: ...', ...,
> producerId=1545118, producerEpoch=17)" because of changed
> ProducerIdAndEpoch#NONE
>
> TransactionManager#initializeTransactions variables:
> !2021-07-26_16-47-48.png!
> Notice the values above which should be -1, -1.
> Stack trace of TransactionManager#initializeTransactions:
> {quote}initializeTransactions:314, TransactionManager
> (org.apache.kafka.clients.producer.internals)
> initializeTransactions:310, TransactionManager
> (org.apache.kafka.clients.producer.internals)
> initTransactions:591, KafkaProducer (org.apache.kafka.clients.producer)
> initTransactions:88, FlinkKafkaInternalProducer
> (org.apache.flink.streaming.connectors.kafka.internals)
> recoverAndAbort:1060, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> recoverAndAbort:99, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> initializeState:371, TwoPhaseCommitSinkFunction
> (org.apache.flink.streaming.api.functions.sink)
> initializeState:1195, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> tryRestoreFunction:189, StreamingFunctionUtils
> (org.apache.flink.streaming.util.functions)
> restoreFunctionState:171, StreamingFunctionUtils
> (org.apache.flink.streaming.util.functions)
> initializeState:96, AbstractUdfStreamOperator
> (org.apache.flink.streaming.api.operators)
> initializeOperatorState:118, StreamOperatorStateHandler
> (org.apache.flink.streaming.api.operators)
> initializeState:290, AbstractStreamOperator
> (org.apache.flink.streaming.api.operators)
> initializeStateAndOpenOperators:436, OperatorChain
> (org.apache.flink.streaming.runtime.tasks)
> restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
> call:-1, 412600778
> (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
> call:55, StreamTaskActionExecutor$1
> (org.apache.flink.streaming.runtime.tasks)
> restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
> doRun:756, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)
> {quote}
>
> Stack trace of FlinkKafkaInternalProducer#resumeTransaction when the values
> are overridden:
> {quote}resumeTransaction:204, FlinkKafkaInternalProducer
> (org.apache.flink.streaming.connectors.kafka.internals)
> resumeTransaction:196, KafkaProducerJobSink$$anon$1$$anon$2
> (ch.viseca.flink.connectors.kafka.sinks)
> recoverAndCommit:1029, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> recoverAndCommit:99, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> recoverAndCommitInternal:414, TwoPhaseCommitSinkFunction
> (org.apache.flink.streaming.api.functions.sink)
> initializeState:364, TwoPhaseCommitSinkFunction
> (org.apache.flink.streaming.api.functions.sink)
> initializeState:1195, FlinkKafkaProducer
> (org.apache.flink.streaming.connectors.kafka)
> tryRestoreFunction:189, StreamingFunctionUtils
> (org.apache.flink.streaming.util.functions)
> restoreFunctionState:171, StreamingFunctionUtils
> (org.apache.flink.streaming.util.functions)
> initializeState:96, AbstractUdfStreamOperator
> (org.apache.flink.streaming.api.operators)
> initializeOperatorState:118, StreamOperatorStateHandler
> (org.apache.flink.streaming.api.operators)
> initializeState:290, AbstractStreamOperator
> (org.apache.flink.streaming.api.operators)
> initializeStateAndOpenOperators:436, OperatorChain
> (org.apache.flink.streaming.runtime.tasks)
> restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
> call:-1, 412600778
> (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
> call:55, StreamTaskActionExecutor$1
> (org.apache.flink.streaming.runtime.tasks)
> restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
> doRun:756, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)
> {quote}
>
> Background:
> * we recently upgraded from Flink 1.8.0 to 1.13.0
> * FlinkKafkaInternalProducer#resumeTransaction has not changed between these
> versions, however
> * in Flink 1.8.0 we never observed any resumable transaction as part of a
> checkpoint
> * we could not determine what actually made the change that causes the
> failure, however:
> * it would probably be much saver to instead of assigning new values to an
> arbitrary ProducerIdAndEpoch held by TransactionManager to directly assign a
> fresh ProducerIdAndEpoch and thus avoid overriding ProducerIdAndEpoch#NONE
> Sample workaround (scala):
> {quote}val sink = new FlinkKafkaProducer[T](val sink = new
> FlinkKafkaProducer[T]( defaultTopic, schema, getProperties, getSemantic,
> getProducerPoolSize) {
> override protected def createProducer:
> FlinkKafkaInternalProducer[Array[Byte], Array[Byte]] =
> Unknown macro: \{ new FlinkKafkaInternalProducer[Array[Byte],
> Array[Byte]](this.producerConfig) Unknown macro}
> override def resumeTransaction(producerId: Long, epoch: Short): Unit =
> Unknown macro: \{ val transactionManager =
> FlinkKafkaInternalProducer.getField(kafkaProducer, "transactionManager")
> transactionManager.synchronized Unknown macro}
> super.resumeTransaction(producerId, epoch) } } }}
> {quote}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)