[ 
https://issues.apache.org/jira/browse/FLINK-21080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384819#comment-17384819
 ] 

Piotr Nowojski commented on FLINK-21080:
----------------------------------------

As I understand this is a problem with {{UnionListState}}, right? And as such 
it's not only a problem with legacy sources, but with any operator using 
{{UnionListState}}, including for example {{FlinkKafkaProducer}} or any 
broadcast state pattern.

I think there are two other solutions to this problem that you haven't 
mentioned [~gaoyunhaii]:

# (intermediate/temporary solution) Just ignore this problem and we can say we 
do not support checkpoints with finished subtasks that have defined a 
{{UnionListState}} (or have a non empty {{UnionListState}}?). 
# (long term solution) If subtask has finished with non empty 
{{UnionListState}}, we just need to make sure that this "finished" part of the 
{{UnionListState}} is included in all subsequent checkpoints. During recovery, 
when we are merging {{UnionListState}} from all of the operators, we would 
merge/union all parts, both from the alive and already dead subtasks, and 
redistribute it just as we are doing it right now.

For 2., I'm not sure how this could be implemented. Keeping zombie 
{{StreamTask}} alive to make sure that such zombie {{StreamTask}} with non 
empty {{UnionListState}} has an opportunity to checkpoint {{UnionListState}}? 
Move the finished/zombie {{UnionListState}} to the JobManager (or another 
TaskManager?), and mark JobManager responsible for persisting it?

> Identify JobVertex containing legacy source operators and abort checkpoint 
> with legacy source operators partially finished
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21080
>                 URL: https://issues.apache.org/jira/browse/FLINK-21080
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream, Runtime / Checkpointing
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Major
>              Labels: auto-unassigned
>
> Most legacy source operators would record the offset for each partitions, and 
> after recovery it would read from the recorded offset. If before a checkpoint 
> some subtasks are finished, the corresponding partition offsets would be 
> deserted in the checkpoint. Then if the job recover with this checkpoint, the 
> legacy source would re-discovery all the partitions and for those finished 
> tasks, the legacy source would re-read them since their offsets are not 
> recorded. 
> Therefore, we would like to fail the checkpoint if some legacy source 
> operators have part of subtasks finished. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to