[
https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856606#comment-17856606
]
Alexis Sarda-Espinosa commented on FLINK-25920:
-----------------------------------------------
Sure thing, I still have a stacktrace in OpenSearch, but first some more info.
Flink 1.18.1 with Kafka connector 3.1.0-1.18, still using Java 11. I don't see
any other logs that could indicate why it happened, it seems random (and like I
said, it recovered fine after the restart).
An excerpt of the config (using Azure abfss):
{code:java}
execution.checkpointing.externalized-checkpoint-retention:
DELETE_ON_CANCELLATION execution.checkpointing.interval: 1 min
execution.checkpointing.mode: AT_LEAST_ONCE
execution.checkpointing.timeout: 50 s
execution.checkpointing.tolerable-failed-checkpoints: 1
pipeline.max-parallelism: 120
state.backend.incremental: true
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
state.backend.type: rocksdb{code}
And the stacktrace:
{code:java}
java.io.IOException: Could not perform checkpoint 30046 for operator Detector
-> OutputEventMapper -> EventInternalSink: Writer -> EventInternalSink:
Committer (2/2)#0.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1275)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emit(SinkWriterOperator.java:227)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:214)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:169)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:322)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1263)
... 17 more
Caused by: java.lang.UnsupportedOperationException: Currently it is not
supported to update the CommittableSummary for a checkpoint coming from the
same subtask. Please check the status of FLINK-25920
at
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:83)
at
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
at
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
at
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:193)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
... 28 more
{code}
> Allow receiving updates of CommittableSummary
> ---------------------------------------------
>
> Key: FLINK-25920
> URL: https://issues.apache.org/jira/browse/FLINK-25920
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream, Connectors / Common
> Affects Versions: 1.15.0, 1.16.0
> Reporter: Fabian Paul
> Assignee: Fabian Paul
> Priority: Major
>
> In the case of unaligned checkpoints, it might happen that the checkpoint
> barrier overtakes the records and an empty committable summary is emitted
> that needs to be correct at a later point when the records arrive.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)