[ 
https://issues.apache.org/jira/browse/FLINK-26380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500091#comment-17500091
 ] 

Fabian Paul commented on FLINK-26380:
-------------------------------------

[~liufangliang] the exception you are seeing happens because the KafkaProducer 
could not send all outstanding messages to the Kafka broker. The 
FlinkKafkaProducer internally tracks the number of records that were sent to 
the broker and are not acknowledged by the broker. 

If exactly-once delivery is enabled the FlinkKafkaProducer needs to ensure that 
all messages have been received by the Kafka broker so it flushes all messages 
before a checkpoint. In your case, it seems that not all messages have been 
acknowledged and the job cannot guarantee exactly-once delivery anymore and 
throws an exception.

 

Can you maybe also share the configurations of your job and tell us about your 
scenario to better investigate the problem?

> java.lang.IllegalStateException:Pending record count must be zero at this 
> point: 5592
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-26380
>                 URL: https://issues.apache.org/jira/browse/FLINK-26380
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.2
>            Reporter: Fangliang Liu
>            Priority: Major
>
> Hi,[~lzljs3620320], [~becket_qin] ,[~jark] .
> Help introduce developers who are familiar with checkpoint and kafka 
> connector.
> The _exactly-once_ of kafka connector and checkpoint is turned on, and the 
> reason for the following exception occurs ?
> Main exception information:
> {code:java}
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to 
> send data to Kafka: Pending record count must be zero at this point: 5592   
> {code}
> Complete exception information: 
> {code:java}
> java.io.IOException: Could not perform checkpoint 2273 for operator 
> filterRuleProcess -> Sink: data_filter_sink (1/1)#1477.    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)    at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)    at 
> java.lang.Thread.run(Thread.java:748)    Suppressed: 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to 
> send data to Kafka: Pending record count must be zero at this point: 5592     
>    at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)
>         at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>         at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
>         ... 4 more    Caused by: java.lang.IllegalStateException: Pending 
> record count must be zero at this point: 5592        at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
>         ... 10 moreCaused by: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
> snapshot 2273 for operator filterRuleProcess -> Sink: data_filter_sink 
> (1/1)#1477. Failure reason: Checkpoint was declined.    at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
>     ... 19 moreCaused by: java.lang.IllegalStateException: Pending record 
> count must be zero at this point: 5592    at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1002)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:99)
>     at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:320)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
>     at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>     at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
>     ... 29 more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to