[ 
https://issues.apache.org/jira/browse/FLINK-30068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635239#comment-17635239
 ] 

Piotr Nowojski commented on FLINK-30068:
----------------------------------------

A temporary workaround might be to drop old {{KafkaSink}}, and replace it with 
a new one, that has different {{uid}} (recovery would require to enable 
{{allowedNonRestoredState}} option).

> Allow users to configure what to do with errors while committing transactions 
> during recovery in KafkaSink
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30068
>                 URL: https://issues.apache.org/jira/browse/FLINK-30068
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Kafka
>    Affects Versions: 1.16.0, 1.17.0, 1.15.2
>            Reporter: Piotr Nowojski
>            Priority: Critical
>             Fix For: 1.17.0
>
>
> Currently it looks like {{KafkaSink}} fails the job on any failures to commit 
> transactions. As [reported by the 
> user|https://lists.apache.org/thread/4f6bb8j6qtvgp888y4dxgj86x3kw2b11], this 
> makes impossible for jobs to recover from older Savepoints.
> {noformat}
> 2022-11-16 10:01:07.168 [flink-akka.actor.default-dispatcher-13] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Balances 
> aggreagation ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> 
> Save to Kafka realtime ETH: Committer, Filter -> Map -> Save to Kafka daily 
> ETH: Writer -> Save to Kafka daily ETH: Committer) (4/5) 
> (6d4d91ab8657bba830695b9a011f7db6) switched from INITIALIZING to RUNNING.
> 2022-11-16 10:01:37.222 [Checkpoint Timer] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
> checkpoint 65436 (type=CheckpointType{name='Checkpoint', 
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1668592897201 for job 
> 00000000000000000000000000000000.
> 2022-11-16 10:01:39.082 [flink-akka.actor.default-dispatcher-13] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Balances 
> aggreagation ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> 
> Save to Kafka realtime ETH: Committer, Filter -> Map -> Save to Kafka daily 
> ETH: Writer -> Save to Kafka daily ETH: Committer) (1/5) 
> (cfaca46e7f4dc89629cdcaed5b48c059) switched from RUNNING to FAILED on 
> 10.42.145.181:33297-efc328 @ 
> eth-top-holders-v2-flink-taskmanager-0.eth-top-holders-v2-flink-taskmanager.flink.svc.cluster.local
>  (dataPort=43125).
> java.io.IOException: Could not perform checkpoint 65436 for operator Balances 
> aggreagation ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> 
> Save to Kafka realtime ETH: Committer, Filter -> Map -> Save to Kafka daily 
> ETH: Writer -> Save to Kafka daily ETH: Committer) (1/5)#0.
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>       at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>       at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emit(SinkWriterOperator.java:234)
>       at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:204)
>       at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
>       at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:300)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198)
>       ... 22 common frames omitted
> Caused by: java.lang.IllegalStateException: Failed to commit 
> KafkaCommittable{producerId=6640191, epoch=0, 
> transactionalId=eth_top_holders_daily_v11-0-65435}
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
>       at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
>       at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
>       at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
>       at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:199)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>       ... 35 common frames omitted
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to