[
https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899674#comment-17899674
]
Arvid Heise commented on FLINK-36743:
-------------------------------------
I try to dig a bit deeper because there may still be a general bug hidden. Just
a bit background because you looked at the state and couldn't find any channel
state for the faulty checkpoint:
* Channel state is split into resultSubpartitionState on the upstream operator
and input channel state in the downstream operator
* You just looked at the downstream operator but the faulty state is on the
upstream operator in the resultSubpartitionState.
* Upstream connects to downstream on one physical channel and creates a few
virtual channels which maps to old physical channels.
* The error indicates that upstream tries to send data over one virtual
channel that is unknown downstream.
* I need to figure out if this is a specific issue to rescale or to the
mapping. If it's an issue with rescale, you probably need to disable the
extensions on your job.
* The reason why it worked most of the time is probably because you don't have
much backpressure to begin with and most channel state turns out empty (as seen
in your screenshots). Backpressure first builds up on the input side and then
translates to the result partition side. Backpressure goes down the other way
around during checkpoint alignment: first result partitions are cleared, then
input channels. Unaligned checkpoints first wait a bit and check if an aligned
checkpoint would work. After the alignment timed out, it turns into unaligned
checkpoint and persists the data. In most cases, if there is only little
backpressure, the result partition is already cleared and the input channels
are still partially filled.
* In your case, you don't rescale the sink side, so the input side remains
stable and just works. The issues arise in the subpartition reassignment.
* For some unknown reason, it looks like in the faulty checkpoint, there is no
state in the input channels but there is state in the subpartitions on the
upstream operator. That is very odd as explained above. My only explanation is
a network issue, where data couldn't be sent downstream for a limited time
(because the checkpoint barrier eventually went through).
The changes could work but I really need to play around in my IDE to fully
understand the implications.
For now, I recommend to disable the changes (by unsetting the options) for that
particular pipeline. At least on the respective RESCALE, it doesn't seem to do
anything at all (look how empty the state is).
> Rescale from unaligend checkpoint failed
> ----------------------------------------
>
> Key: FLINK-36743
> URL: https://issues.apache.org/jira/browse/FLINK-36743
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Reporter: Feifan Wang
> Priority: Major
> Attachments:
> Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch,
> image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png,
> image-2024-11-19-17-30-14-816.png
>
>
> We encountered the following exception when scaling down a job from 5600 to
> 4200:
> {code:java}
> 2024-11-12 19:20:54,308 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink:
> xxxxxx (1358/1400)
> (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0)
> switched from RUNNING to FAILED on
> container_e33_1725519807238_6894116_01_000825 @ yg-
> java.lang.IllegalStateException: Cannot select
> SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071};
> known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357,
> outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357,
> outputSubtaskIndex=4200}]
> at
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code}
> * Flink version : 1.16.1
> * unaligned checkpoint : enabled
> * log-based checkpoint : enabled
> The exception encountered when restore from chk-2718336, and it can
> successfully restore from chk-2718333. And I checked the metadata file of
> chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like
> there is something wrong with the unaligned checkpoint when reassign
> in-flight data. Could you please help a look ? [~arvid] , [~pnowojski]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)