[
https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899156#comment-17899156
]
Arvid Heise edited comment on FLINK-36743 at 11/18/24 2:12 PM:
---------------------------------------------------------------
Hey Feifan, it's a bit hard to debug remotely and it's also been a while since
I worked on that code but I try my best to help.
You mentioned that you were able to find the in-flight data which is great.
That means that you deeply understood the structure. Could you share some
background on the partitioning used?
It's also great that you have a checkpoint where it works and a checkpoint that
doesn't. Can you please make sure that you persist the data elsewhere until
investigation has concluded? We may never have such a good setup again to find
the root cause.
If possible, can you check if the rescaling of the good checkpoint also uses
the same subtask mappings? Is this issue reproducible on each rescaling attempt?
{noformat}
Cannot select SubtaskConnectionDescriptor{inputSubtaskIndex=0,
outputSubtaskIndex=4071}; known channels are
[SubtaskConnectionDescriptor{inputSubtaskIndex=1357, outputSubtaskIndex=0},
SubtaskConnectionDescriptor{inputSubtaskIndex=1357,
outputSubtaskIndex=4200}]{noformat}
One possibility is that we have some rounding errors around the remapping that
becomes only visible in higher DOP. However, the expected index of 4071 is
pretty far off the available indexes 0 and 4200. The input index is also
completely off.
Another possibility is that we end up with shifted byte streams for some reason
but then the subtask descriptor looks too normal.
So before going deeper. I'd like to understand which partitioner is used
between upstream and downstream and the parallelism of the respective tasks.
Then I can manually compute if the event is incorrect or the state in
downstream. If you have a way to debug if the event indeed comes from upstream
subtask=0 that would be neat.
>From there we can see if the checkpoint is corrupted or the recovery code is
>bugged.
was (Author: arvid):
Hey Feifan, it's a bit hard to debug remotely and it's also been a while since
I worked on that code but I try my best to help.
You mentioned that you were able to find the in-flight data which is great.
That means that you deeply understood the structure. Could you share some
background on the partitioning used?
It's also great that you have a checkpoint where it works and a checkpoint that
doesn't. Can you please make sure that you persist the data elsewhere until
investigation has concluded? We may never have such a good setup again to find
the root cause.
If possible, can you check if the rescaling of the good checkpoint also uses
the same subtask mappings? Is this issue reproducible on each rescaling attempt?
{noformat}
Cannot select SubtaskConnectionDescriptor{inputSubtaskIndex=0,
outputSubtaskIndex=4071}; known channels are
[SubtaskConnectionDescriptor{inputSubtaskIndex=1357, outputSubtaskIndex=0},
SubtaskConnectionDescriptor{inputSubtaskIndex=1357,
outputSubtaskIndex=4200}]{noformat}
One possibility is that we have some rounding errors around the remapping that
becomes only visible in higher DOP. However, the expected index of 4071 is
pretty far off the available indexes 0 and 4200. The input index is also
completely off.
Another possibility is that we end up with shifted byte streams for some reason
but then the subtask descriptor looks too normal.
So before going deeper. I'd like to understand which partitioner is used
between upstream and downstream and the DPO of the respective tasks. Then I can
manually compute if the event is incorrect or the state in downstream. If you
have a way to debug if the event indeed comes from upstream subtask=0 that
would be neat.
>From there we can see if the checkpoint is corrupted or the recovery code is
>bugged.
> Rescale from unaligend checkpoint failed
> ----------------------------------------
>
> Key: FLINK-36743
> URL: https://issues.apache.org/jira/browse/FLINK-36743
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Reporter: Feifan Wang
> Priority: Major
>
> We encountered the following exception when scaling down a job from 5600 to
> 4200:
> {code:java}
> 2024-11-12 19:20:54,308 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink:
> xxxxxx (1358/1400)
> (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0)
> switched from RUNNING to FAILED on
> container_e33_1725519807238_6894116_01_000825 @ yg-
> java.lang.IllegalStateException: Cannot select
> SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071};
> known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357,
> outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357,
> outputSubtaskIndex=4200}]
> at
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code}
> * Flink version : 1.16.1
> * unaligned checkpoint : enabled
> * log-based checkpoint : enabled
> The exception encountered when restore from chk-2718336, and it can
> successfully restore from chk-2718333. And I checked the metadata file of
> chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like
> there is something wrong with the unaligned checkpoint when reassign
> in-flight data. Could you please help a look ? [~arvid] , [~pnowojski]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)