[ 
https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856606#comment-17856606
 ] 

Alexis Sarda-Espinosa commented on FLINK-25920:
-----------------------------------------------

Sure thing, I still have a stacktrace in OpenSearch, but first some more info.

Flink 1.18.1 with Kafka connector 3.1.0-1.18, still using Java 11. I don't see 
any other logs that could indicate why it happened, it seems random (and like I 
said, it recovered fine after the restart).

An excerpt of the config (using Azure abfss):

 
{code:java}
execution.checkpointing.externalized-checkpoint-retention: 
DELETE_ON_CANCELLATION    execution.checkpointing.interval: 1 min
execution.checkpointing.mode: AT_LEAST_ONCE
execution.checkpointing.timeout: 50 s
execution.checkpointing.tolerable-failed-checkpoints: 1
pipeline.max-parallelism: 120
state.backend.incremental: true
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
state.backend.type: rocksdb{code}
And the stacktrace:

 

 
{code:java}
java.io.IOException: Could not perform checkpoint 30046 for operator Detector 
-> OutputEventMapper -> EventInternalSink: Writer -> EventInternalSink: 
Committer (2/2)#0.
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1275)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emit(SinkWriterOperator.java:227)
    at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:214)
    at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:169)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:322)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1263)
    ... 17 more
Caused by: java.lang.UnsupportedOperationException: Currently it is not 
supported to update the CommittableSummary for a checkpoint coming from the 
same subtask. Please check the status of FLINK-25920
    at 
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:83)
    at 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
    at 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
    at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:193)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
    ... 28 more
    
 {code}
 

 

> Allow receiving updates of CommittableSummary
> ---------------------------------------------
>
>                 Key: FLINK-25920
>                 URL: https://issues.apache.org/jira/browse/FLINK-25920
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream, Connectors / Common
>    Affects Versions: 1.15.0, 1.16.0
>            Reporter: Fabian Paul
>            Assignee: Fabian Paul
>            Priority: Major
>
> In the case of unaligned checkpoints, it might happen that the checkpoint 
> barrier overtakes the records and an empty committable summary is emitted 
> that needs to be correct at a later point when the records arrive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to