[ 
https://issues.apache.org/jira/browse/FLINK-23509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Schwalbe updated FLINK-23509:
--------------------------------------
    Description: 
When recovering Kafka transactions from a snapshot, FlinkKafkaInternalProducer 
overrides static final ProducerIdAndEpoch#NONE here:

[FlinkKafkaInternalProducer#resumeTransaction|https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229]

and consequently TransactionManager initializes transactions as new 
transactions instead of recovered ones. Here:

[TransactionManager#initializeTransactions|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L332]

TransactionManager log (edited for readability):
{quote}{{[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Overriding the default enable.idempotence to true 
since transactional.id is specified.
 [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Instantiated a transactional producer.
 [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Overriding the default retries config to the 
recommended value of 2147483647 since the idempotent producer is enabled.
 [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Overriding the default acks to all since 
idempotence is enabled.
 [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Transition from state UNINITIALIZED to INITIALIZING
 [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Invoking InitProducerId for the first time in 
order to acquire a producer ID
 [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Enqueuing transactional request 
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', 
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', 
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for 
sending
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional 
request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', 
keyType=1)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional 
request InitProducerIdRequestData(transactionalId='Sink: 
trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
producerEpoch=17)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', keyType=1) 
dequeued for sending
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional 
response FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, 
errorMessage='NONE', nodeId=3, host='ulxxtkafbrk03.adgr.net', port=9093) for 
request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', 
keyType=1)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Discovered transaction coordinator 
ulxxtkafbrk03.adgr.net:9093 (id: 3 rack: null)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', 
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for 
sending
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional 
response InitProducerIdResponseData(throttleTimeMs=0, errorCode=47, 
producerId=-1, producerEpoch=-1) for request 
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', 
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state 
INITIALIZING to error state FATAL_ERROR
 [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Closing the Kafka producer with timeoutMillis = 0 
ms.
 org.apache.kafka.common.KafkaException: Unexpected error in 
InitProducerIdResponse; Producer attempted an operation with an old epoch. 
Either there is a newer producer with the same transactionalId, or the 
producer's transaction has been expired by the broker.
 at 
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
 at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
 at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
 at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
 at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
 at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
 at java.lang.Thread.run(Thread.java:748)
 }}
{quote}
 Notice here "Invoking InitProducerId for the first time in order to acquire a 
producer ID" a request for a new transaction (-1, -1) but below we see: 
"Enqueuing transactional request 
InitProducerIdRequestData(transactionalId='Sink: ...', ..., producerId=1545118, 
producerEpoch=17)" because of changed ProducerIdAndEpoch#NONE

 

TransactionManager#initializeTransactions variables:

  !2021-07-26_16-47-48.png!

Notice the values above which should be -1, -1.

Stack trace of TransactionManager#initializeTransactions:
{quote}initializeTransactions:314, TransactionManager 
(org.apache.kafka.clients.producer.internals)
 initializeTransactions:310, TransactionManager 
(org.apache.kafka.clients.producer.internals)
 initTransactions:591, KafkaProducer (org.apache.kafka.clients.producer)
 initTransactions:88, FlinkKafkaInternalProducer 
(org.apache.flink.streaming.connectors.kafka.internals)
 recoverAndAbort:1060, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 recoverAndAbort:99, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 initializeState:371, TwoPhaseCommitSinkFunction 
(org.apache.flink.streaming.api.functions.sink)
 initializeState:1195, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 tryRestoreFunction:189, StreamingFunctionUtils 
(org.apache.flink.streaming.util.functions)
 restoreFunctionState:171, StreamingFunctionUtils 
(org.apache.flink.streaming.util.functions)
 initializeState:96, AbstractUdfStreamOperator 
(org.apache.flink.streaming.api.operators)
 initializeOperatorState:118, StreamOperatorStateHandler 
(org.apache.flink.streaming.api.operators)
 initializeState:290, AbstractStreamOperator 
(org.apache.flink.streaming.api.operators)
 initializeStateAndOpenOperators:436, OperatorChain 
(org.apache.flink.streaming.runtime.tasks)
 restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
 call:-1, 412600778 
(org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
 call:55, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
 restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
 doRun:756, Task (org.apache.flink.runtime.taskmanager)
 run:563, Task (org.apache.flink.runtime.taskmanager)
 run:748, Thread (java.lang)
{quote}
 

Stack trace of FlinkKafkaInternalProducer#resumeTransaction when the values are 
overridden:
{quote}resumeTransaction:204, FlinkKafkaInternalProducer 
(org.apache.flink.streaming.connectors.kafka.internals)
 resumeTransaction:196, KafkaProducerJobSink$$anon$1$$anon$2 
(ch.viseca.flink.connectors.kafka.sinks)
 recoverAndCommit:1029, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 recoverAndCommit:99, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 recoverAndCommitInternal:414, TwoPhaseCommitSinkFunction 
(org.apache.flink.streaming.api.functions.sink)
 initializeState:364, TwoPhaseCommitSinkFunction 
(org.apache.flink.streaming.api.functions.sink)
 initializeState:1195, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 tryRestoreFunction:189, StreamingFunctionUtils 
(org.apache.flink.streaming.util.functions)
 restoreFunctionState:171, StreamingFunctionUtils 
(org.apache.flink.streaming.util.functions)
 initializeState:96, AbstractUdfStreamOperator 
(org.apache.flink.streaming.api.operators)
 initializeOperatorState:118, StreamOperatorStateHandler 
(org.apache.flink.streaming.api.operators)
 initializeState:290, AbstractStreamOperator 
(org.apache.flink.streaming.api.operators)
 initializeStateAndOpenOperators:436, OperatorChain 
(org.apache.flink.streaming.runtime.tasks)
 restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
 call:-1, 412600778 
(org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
 call:55, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
 restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
 doRun:756, Task (org.apache.flink.runtime.taskmanager)
 run:563, Task (org.apache.flink.runtime.taskmanager)
 run:748, Thread (java.lang)
{quote}
 

Background:
 * we recently upgraded from Flink 1.8.0 to 1.13.0
 * FlinkKafkaInternalProducer#resumeTransaction has not changed between these 
versions, however
 * in Flink 1.8.0 we never observed any resumable transaction as part of a 
checkpoint
 * we could not determine what actually made the change that causes the 
failure, however:
 * it would probably much saver to instead over assigning new values to an 
arbitrary ProducerIdAndEpoch held by TransactionManager to instead assign a 
fresh ProducerIdAndEpoch and avoid overriding ProducerIdAndEpoch#NONE

Sample workaround (scala):
{quote}val sink = new FlinkKafkaProducer[T](val sink = new 
FlinkKafkaProducer[T](  defaultTopic,  schema,  getProperties,  getSemantic,  
getProducerPoolSize) {
   override protected def createProducer: 
FlinkKafkaInternalProducer[Array[Byte], Array[Byte]] = { new 
FlinkKafkaInternalProducer[Array[Byte], Array[Byte]](this.producerConfig){      
override def sendOffsetsToTransaction(offsets: util.Map[TopicPartition, 
OffsetAndMetadata], groupMetadata: ConsumerGroupMetadata): Unit =
Unknown macro: \{        super.sendOffsetsToTransaction(offsets, 
groupMetadata.groupId())      }
      override def resumeTransaction(producerId: Long, epoch: Short): Unit = {  
      val transactionManager = 
FlinkKafkaInternalProducer.getField(kafkaProducer, "transactionManager")        
transactionManager.synchronized
Unknown macro: \{          val producerIdAndEpoch = 
FlinkKafkaInternalProducer.getField(transactionManager, 
"producerIdAndEpoch").asInstanceOf[ProducerIdAndEpoch]          
FlinkKafkaInternalProducer.invoke(transactionManager, "setProducerIdAndEpoch", 
new ProducerIdAndEpoch(producerIdAndEpoch.producerId, 
producerIdAndEpoch.epoch))        }
        super.resumeTransaction(producerId, epoch)      }    }  }}
{quote}
 

  was:
When recovering Kafka transactions from a snapshot, FlinkKafkaInternalProducer 
overrides static final ProducerIdAndEpoch#NONE here:

[FlinkKafkaInternalProducer#resumeTransaction|https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229]

and consequently TransactionManager initializes transactions as new 
transactions instead of recovered ones. Here:

[TransactionManager#initializeTransactions|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L332]

TransactionManager log (edited for readability):
{quote}{{[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Overriding the default enable.idempotence to true 
since transactional.id is specified.
 [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Instantiated a transactional producer.
 [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Overriding the default retries config to the 
recommended value of 2147483647 since the idempotent producer is enabled.
 [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Overriding the default acks to all since 
idempotence is enabled.
 [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Transition from state UNINITIALIZED to INITIALIZING
 [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Invoking InitProducerId for the first time in 
order to acquire a producer ID
 [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Enqueuing transactional request 
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', 
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', 
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for 
sending
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional 
request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', 
keyType=1)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional 
request InitProducerIdRequestData(transactionalId='Sink: 
trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
producerEpoch=17)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', keyType=1) 
dequeued for sending
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional 
response FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, 
errorMessage='NONE', nodeId=3, host='ulxxtkafbrk03.adgr.net', port=9093) for 
request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', 
keyType=1)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Discovered transaction coordinator 
ulxxtkafbrk03.adgr.net:9093 (id: 3 rack: null)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', 
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for 
sending
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional 
response InitProducerIdResponseData(throttleTimeMs=0, errorCode=47, 
producerId=-1, producerEpoch=-1) for request 
InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', 
transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
 [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
[Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state 
INITIALIZING to error state FATAL_ERROR
 [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
trxRollupKafkaSink-...8b6-2] Closing the Kafka producer with timeoutMillis = 0 
ms.
 org.apache.kafka.common.KafkaException: Unexpected error in 
InitProducerIdResponse; Producer attempted an operation with an old epoch. 
Either there is a newer producer with the same transactionalId, or the 
producer's transaction has been expired by the broker.
 at 
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
 at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
 at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
 at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
 at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
 at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
 at java.lang.Thread.run(Thread.java:748)
 }}
{quote}
 TransactionManager#initializeTransactions variables:

  !2021-07-26_16-47-48.png!

Notice the values above which should be -1, -1.

Stack trace of TransactionManager#initializeTransactions:
{quote}initializeTransactions:314, TransactionManager 
(org.apache.kafka.clients.producer.internals)
 initializeTransactions:310, TransactionManager 
(org.apache.kafka.clients.producer.internals)
 initTransactions:591, KafkaProducer (org.apache.kafka.clients.producer)
 initTransactions:88, FlinkKafkaInternalProducer 
(org.apache.flink.streaming.connectors.kafka.internals)
 recoverAndAbort:1060, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 recoverAndAbort:99, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 initializeState:371, TwoPhaseCommitSinkFunction 
(org.apache.flink.streaming.api.functions.sink)
 initializeState:1195, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 tryRestoreFunction:189, StreamingFunctionUtils 
(org.apache.flink.streaming.util.functions)
 restoreFunctionState:171, StreamingFunctionUtils 
(org.apache.flink.streaming.util.functions)
 initializeState:96, AbstractUdfStreamOperator 
(org.apache.flink.streaming.api.operators)
 initializeOperatorState:118, StreamOperatorStateHandler 
(org.apache.flink.streaming.api.operators)
 initializeState:290, AbstractStreamOperator 
(org.apache.flink.streaming.api.operators)
 initializeStateAndOpenOperators:436, OperatorChain 
(org.apache.flink.streaming.runtime.tasks)
 restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
 call:-1, 412600778 
(org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
 call:55, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
 restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
 doRun:756, Task (org.apache.flink.runtime.taskmanager)
 run:563, Task (org.apache.flink.runtime.taskmanager)
 run:748, Thread (java.lang)
{quote}
 

Stack trace of FlinkKafkaInternalProducer#resumeTransaction when the values are 
overridden:
{quote}resumeTransaction:204, FlinkKafkaInternalProducer 
(org.apache.flink.streaming.connectors.kafka.internals)
 resumeTransaction:196, KafkaProducerJobSink$$anon$1$$anon$2 
(ch.viseca.flink.connectors.kafka.sinks)
 recoverAndCommit:1029, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 recoverAndCommit:99, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 recoverAndCommitInternal:414, TwoPhaseCommitSinkFunction 
(org.apache.flink.streaming.api.functions.sink)
 initializeState:364, TwoPhaseCommitSinkFunction 
(org.apache.flink.streaming.api.functions.sink)
 initializeState:1195, FlinkKafkaProducer 
(org.apache.flink.streaming.connectors.kafka)
 tryRestoreFunction:189, StreamingFunctionUtils 
(org.apache.flink.streaming.util.functions)
 restoreFunctionState:171, StreamingFunctionUtils 
(org.apache.flink.streaming.util.functions)
 initializeState:96, AbstractUdfStreamOperator 
(org.apache.flink.streaming.api.operators)
 initializeOperatorState:118, StreamOperatorStateHandler 
(org.apache.flink.streaming.api.operators)
 initializeState:290, AbstractStreamOperator 
(org.apache.flink.streaming.api.operators)
 initializeStateAndOpenOperators:436, OperatorChain 
(org.apache.flink.streaming.runtime.tasks)
 restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
 call:-1, 412600778 
(org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
 call:55, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
 restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
 doRun:756, Task (org.apache.flink.runtime.taskmanager)
 run:563, Task (org.apache.flink.runtime.taskmanager)
 run:748, Thread (java.lang)
{quote}
 

Background:
 * we recently upgraded from Flink 1.8.0 to 1.13.0
 * FlinkKafkaInternalProducer#resumeTransaction has not changed between these 
versions, however
 * in Flink 1.8.0 we never observed any resumable transaction as part of a 
checkpoint
 * we could not determine what actually made the change that causes the 
failure, however:
 * it would probably much saver to instead over assigning new values to an 
arbitrary ProducerIdAndEpoch held by TransactionManager to instead assign a 
fresh ProducerIdAndEpoch and avoid overriding ProducerIdAndEpoch#NONE

Sample workaround (scala):
{quote}val sink = new FlinkKafkaProducer[T](val sink = new 
FlinkKafkaProducer[T](  defaultTopic,  schema,  getProperties,  getSemantic,  
getProducerPoolSize) {
  override protected def createProducer: 
FlinkKafkaInternalProducer[Array[Byte], Array[Byte]] = \{ new 
FlinkKafkaInternalProducer[Array[Byte], Array[Byte]](this.producerConfig){      
override def sendOffsetsToTransaction(offsets: util.Map[TopicPartition, 
OffsetAndMetadata], groupMetadata: ConsumerGroupMetadata): Unit = {        
super.sendOffsetsToTransaction(offsets, groupMetadata.groupId())      }
      override def resumeTransaction(producerId: Long, epoch: Short): Unit = \{ 
       val transactionManager = 
FlinkKafkaInternalProducer.getField(kafkaProducer, "transactionManager")        
transactionManager.synchronized {          val producerIdAndEpoch = 
FlinkKafkaInternalProducer.getField(transactionManager, 
"producerIdAndEpoch").asInstanceOf[ProducerIdAndEpoch]          
FlinkKafkaInternalProducer.invoke(transactionManager, "setProducerIdAndEpoch", 
new ProducerIdAndEpoch(producerIdAndEpoch.producerId, 
producerIdAndEpoch.epoch))        }        super.resumeTransaction(producerId, 
epoch)      }    }  }}
{quote}
 


> FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE 
> during transaction recovery (fails)
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-23509
>                 URL: https://issues.apache.org/jira/browse/FLINK-23509
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.0
>            Reporter: Matthias Schwalbe
>            Priority: Major
>         Attachments: 2021-07-26_16-47-48.png
>
>
> When recovering Kafka transactions from a snapshot, 
> FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE 
> here:
> [FlinkKafkaInternalProducer#resumeTransaction|https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229]
> and consequently TransactionManager initializes transactions as new 
> transactions instead of recovered ones. Here:
> [TransactionManager#initializeTransactions|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L332]
> TransactionManager log (edited for readability):
> {quote}{{[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.KafkaProducer - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Overriding the default enable.idempotence to 
> true since transactional.id is specified.
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.KafkaProducer - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Instantiated a transactional producer.
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.KafkaProducer - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Overriding the default retries config to the 
> recommended value of 2147483647 since the idempotent producer is enabled.
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.KafkaProducer - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Overriding the default acks to all since 
> idempotence is enabled.
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG 
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Transition from state UNINITIALIZED to 
> INITIALIZING
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Invoking InitProducerId for the first time in 
> order to acquire a producer ID
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG 
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Enqueuing transactional request 
> InitProducerIdRequestData(transactionalId='Sink: 
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
> producerEpoch=17)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
> InitProducerIdRequestData(transactionalId='Sink: 
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
> producerEpoch=17) dequeued for sending
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional 
> request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', 
> keyType=1)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional 
> request InitProducerIdRequestData(transactionalId='Sink: 
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
> producerEpoch=17)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
> FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', 
> keyType=1) dequeued for sending
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional 
> response FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, 
> errorMessage='NONE', nodeId=3, host='ulxxtkafbrk03.adgr.net', port=9093) for 
> request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', 
> keyType=1)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> INFO org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Discovered transaction 
> coordinator ulxxtkafbrk03.adgr.net:9093 (id: 3 rack: null)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request 
> InitProducerIdRequestData(transactionalId='Sink: 
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
> producerEpoch=17) dequeued for sending
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> TRACE org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional 
> response InitProducerIdResponseData(throttleTimeMs=0, errorCode=47, 
> producerId=-1, producerEpoch=-1) for request 
> InitProducerIdRequestData(transactionalId='Sink: 
> trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, 
> producerEpoch=17)
>  [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] 
> DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - 
> [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, 
> transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state 
> INITIALIZING to error state FATAL_ERROR
>  [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO 
> org.apache.kafka.clients.producer.KafkaProducer - [Producer 
> clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: 
> trxRollupKafkaSink-...8b6-2] Closing the Kafka producer with timeoutMillis = 
> 0 ms.
>  org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; Producer attempted an operation with an old epoch. 
> Either there is a newer producer with the same transactionalId, or the 
> producer's transaction has been expired by the broker.
>  at 
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
>  at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>  at java.lang.Thread.run(Thread.java:748)
>  }}
> {quote}
>  Notice here "Invoking InitProducerId for the first time in order to acquire 
> a producer ID" a request for a new transaction (-1, -1) but below we see: 
> "Enqueuing transactional request 
> InitProducerIdRequestData(transactionalId='Sink: ...', ..., 
> producerId=1545118, producerEpoch=17)" because of changed 
> ProducerIdAndEpoch#NONE
>  
> TransactionManager#initializeTransactions variables:
>   !2021-07-26_16-47-48.png!
> Notice the values above which should be -1, -1.
> Stack trace of TransactionManager#initializeTransactions:
> {quote}initializeTransactions:314, TransactionManager 
> (org.apache.kafka.clients.producer.internals)
>  initializeTransactions:310, TransactionManager 
> (org.apache.kafka.clients.producer.internals)
>  initTransactions:591, KafkaProducer (org.apache.kafka.clients.producer)
>  initTransactions:88, FlinkKafkaInternalProducer 
> (org.apache.flink.streaming.connectors.kafka.internals)
>  recoverAndAbort:1060, FlinkKafkaProducer 
> (org.apache.flink.streaming.connectors.kafka)
>  recoverAndAbort:99, FlinkKafkaProducer 
> (org.apache.flink.streaming.connectors.kafka)
>  initializeState:371, TwoPhaseCommitSinkFunction 
> (org.apache.flink.streaming.api.functions.sink)
>  initializeState:1195, FlinkKafkaProducer 
> (org.apache.flink.streaming.connectors.kafka)
>  tryRestoreFunction:189, StreamingFunctionUtils 
> (org.apache.flink.streaming.util.functions)
>  restoreFunctionState:171, StreamingFunctionUtils 
> (org.apache.flink.streaming.util.functions)
>  initializeState:96, AbstractUdfStreamOperator 
> (org.apache.flink.streaming.api.operators)
>  initializeOperatorState:118, StreamOperatorStateHandler 
> (org.apache.flink.streaming.api.operators)
>  initializeState:290, AbstractStreamOperator 
> (org.apache.flink.streaming.api.operators)
>  initializeStateAndOpenOperators:436, OperatorChain 
> (org.apache.flink.streaming.runtime.tasks)
>  restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
>  call:-1, 412600778 
> (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
>  call:55, StreamTaskActionExecutor$1 
> (org.apache.flink.streaming.runtime.tasks)
>  restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
>  doRun:756, Task (org.apache.flink.runtime.taskmanager)
>  run:563, Task (org.apache.flink.runtime.taskmanager)
>  run:748, Thread (java.lang)
> {quote}
>  
> Stack trace of FlinkKafkaInternalProducer#resumeTransaction when the values 
> are overridden:
> {quote}resumeTransaction:204, FlinkKafkaInternalProducer 
> (org.apache.flink.streaming.connectors.kafka.internals)
>  resumeTransaction:196, KafkaProducerJobSink$$anon$1$$anon$2 
> (ch.viseca.flink.connectors.kafka.sinks)
>  recoverAndCommit:1029, FlinkKafkaProducer 
> (org.apache.flink.streaming.connectors.kafka)
>  recoverAndCommit:99, FlinkKafkaProducer 
> (org.apache.flink.streaming.connectors.kafka)
>  recoverAndCommitInternal:414, TwoPhaseCommitSinkFunction 
> (org.apache.flink.streaming.api.functions.sink)
>  initializeState:364, TwoPhaseCommitSinkFunction 
> (org.apache.flink.streaming.api.functions.sink)
>  initializeState:1195, FlinkKafkaProducer 
> (org.apache.flink.streaming.connectors.kafka)
>  tryRestoreFunction:189, StreamingFunctionUtils 
> (org.apache.flink.streaming.util.functions)
>  restoreFunctionState:171, StreamingFunctionUtils 
> (org.apache.flink.streaming.util.functions)
>  initializeState:96, AbstractUdfStreamOperator 
> (org.apache.flink.streaming.api.operators)
>  initializeOperatorState:118, StreamOperatorStateHandler 
> (org.apache.flink.streaming.api.operators)
>  initializeState:290, AbstractStreamOperator 
> (org.apache.flink.streaming.api.operators)
>  initializeStateAndOpenOperators:436, OperatorChain 
> (org.apache.flink.streaming.runtime.tasks)
>  restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
>  call:-1, 412600778 
> (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
>  call:55, StreamTaskActionExecutor$1 
> (org.apache.flink.streaming.runtime.tasks)
>  restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
>  doRun:756, Task (org.apache.flink.runtime.taskmanager)
>  run:563, Task (org.apache.flink.runtime.taskmanager)
>  run:748, Thread (java.lang)
> {quote}
>  
> Background:
>  * we recently upgraded from Flink 1.8.0 to 1.13.0
>  * FlinkKafkaInternalProducer#resumeTransaction has not changed between these 
> versions, however
>  * in Flink 1.8.0 we never observed any resumable transaction as part of a 
> checkpoint
>  * we could not determine what actually made the change that causes the 
> failure, however:
>  * it would probably much saver to instead over assigning new values to an 
> arbitrary ProducerIdAndEpoch held by TransactionManager to instead assign a 
> fresh ProducerIdAndEpoch and avoid overriding ProducerIdAndEpoch#NONE
> Sample workaround (scala):
> {quote}val sink = new FlinkKafkaProducer[T](val sink = new 
> FlinkKafkaProducer[T](  defaultTopic,  schema,  getProperties,  getSemantic,  
> getProducerPoolSize) {
>    override protected def createProducer: 
> FlinkKafkaInternalProducer[Array[Byte], Array[Byte]] = { new 
> FlinkKafkaInternalProducer[Array[Byte], Array[Byte]](this.producerConfig){    
>   override def sendOffsetsToTransaction(offsets: util.Map[TopicPartition, 
> OffsetAndMetadata], groupMetadata: ConsumerGroupMetadata): Unit =
> Unknown macro: \{        super.sendOffsetsToTransaction(offsets, 
> groupMetadata.groupId())      }
>       override def resumeTransaction(producerId: Long, epoch: Short): Unit = 
> {        val transactionManager = 
> FlinkKafkaInternalProducer.getField(kafkaProducer, "transactionManager")      
>   transactionManager.synchronized
> Unknown macro: \{          val producerIdAndEpoch = 
> FlinkKafkaInternalProducer.getField(transactionManager, 
> "producerIdAndEpoch").asInstanceOf[ProducerIdAndEpoch]          
> FlinkKafkaInternalProducer.invoke(transactionManager, 
> "setProducerIdAndEpoch", new 
> ProducerIdAndEpoch(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch))  
>       }
>         super.resumeTransaction(producerId, epoch)      }    }  }}
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to