[
https://issues.apache.org/jira/browse/FLINK-37460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gabor Somogyi resolved FLINK-37460.
-----------------------------------
Fix Version/s: 1.20.2
2.0.1
Resolution: Fixed
> Using State Processor API and Kafka Sink with Exactly once delivery leads to
> org.apache.kafka.common.errors.InvalidPidMappingException
> --------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-37460
> URL: https://issues.apache.org/jira/browse/FLINK-37460
> Project: Flink
> Issue Type: Bug
> Components: API / State Processor
> Affects Versions: 1.20.1
> Reporter: Grzegorz Liter
> Assignee: Gabor Somogyi
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.20.2, 2.0.1
>
>
> Setup:
> # Job with KafkaSink with `DeliveryGuarantee.EXACTLY_ONCE`
> # Kafka Cluster with max transaction time e.g. 24h
> Steps to reproduce:
> # Run the job for at least 24h
> # Stop with savepoint
> # Use State Processor API to rewrite savepoint
> # Start job from rewritten savepoint
> Actual:
> Job will fail after about 24h with
> ```
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The
> producer attempted to use a producer id which is not currently assigned to
> its transactional id.2025-03-03 08:42:14.749
> at java.lang.Thread.run(Unknown Source) ~[?:?]2025-03-03 08:42:14.749
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:135)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:174)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:190)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:134)
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
> at
> org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
> ~[?:?]2025-03-03 08:42:14.749
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:85)
> ~[flink-dist-1.20.0.jar:1.20.0]
> ```
> Explanation:
> When using EXACTLY_ONCE, KafkaSink writes KafkaCommitable to the checkpoint
> which contains current transactionalId with current producerId. After
> checkpoint is completed transaction is commited.
> When Flink start up the job from checkpoint/savepoint in state initialization
> `AbstractStreamOperator.initializeState` it will in
> `CommitterOperator.commitAndEmitCheckpoints` get all checkpoint commitables
> up to last completed checkpoint
> `committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)`
> and try to commit them.
> Problem starts when we use State Processing API, which is resetting
> checkpointId to 0. Initially there is no problem but when we reach the
> checkpointId equal to the checkpointId of savepoint we were migrating.
> The
> `committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)`
> loads not only latest transaction details but also the last transaction that
> happen before migration. If the migration happen earlier than Kafka
> transaction expiration time KafkaCommiter will get
> `org.apache.kafka.common.errors.InvalidPidMappingException`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)