[ 
https://issues.apache.org/jira/browse/FLINK-33437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33437.
----------------------------------
    Fix Version/s: 1.19.0
       Resolution: Fixed

merged efbd8c40 into master

> Flink 1.17 sink commited legacy Committable state, but it was not removed 
> from state backend
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33437
>                 URL: https://issues.apache.org/jira/browse/FLINK-33437
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Runtime / Checkpointing
>    Affects Versions: 1.17.1
>         Environment: K8s, Flink 1.17.1
>            Reporter: Yuchi Duan
>            Assignee: Yuchi Duan
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.19.0
>
>
> My Flink job graph: kafka source -> process -> kafka sink.
> I used savepoint to upgrade Flink 1.14.5 to 1.17.1, and the program ran 
> normally.A month later, I restarted the Flink job using a savepoint, and the 
> job was started normally.Unfortunately, the Flink job failed every time when 
> it did a checkpoint.For example the following scenario:
>  
>  # The program uses Kafka sink
>  # Suspend flink job with savepoint A, and Flink Version is 1.14.x
>  # Recover the job with savepoint A, and Flink Version is 1.17.1
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Suspend flink job with savepoint B, and Flink Version is 1.17.1
>  # Recover the job with savepoint B, and Flink Version is 1.17.1
>  # Trigger checkpoint ,the Flink job will fail with the following error:
> {code:java}
> java.io.IOException: Could not perform checkpoint 1009710 for operator 
> kafka-sink: Committer (2/2)#2.
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:96)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emit(SinkWriterOperator.java:245)
>     at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:215)
>     at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:177)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244)
>     ... 22 more
> Caused by: java.lang.IllegalStateException: Failed to commit 
> KafkaCommittable{producerId=47326303, epoch=0, 
> transactionalId=kafka-sink-xxx-1-882965}
>     at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
>     at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
>     at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126)
>     at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:177)
>     at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:161)
>     at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:200)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
>     ... 33 more
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to