[
https://issues.apache.org/jira/browse/FLINK-21080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387985#comment-17387985
]
Piotr Nowojski edited comment on FLINK-21080 at 7/28/21, 6:21 AM:
------------------------------------------------------------------
Another potential long-term solution is to just not support {{UnionListState}}.
Even now the impact of that should be limited, and it will be less of an issue
as we migrate more and more sources/sinks to the new interfaces. I think the
idea on the DataStream API part is to deprecate/get rid of at least the
{{UnionListState}} or even all of the operator state from the public API, as
it's causing lot's of troubles. Especially {{UnionListState}} use cases should
be replaced with using {{OperatorCoordinator}}.
Currently I think this issue affects the built-in:
* {{FlinkKafkaConsumer}} - we already have FLIP-27 replacement. Existing
{{FlinkKafkaConsumer}} might be dropped soon
* {{FlinkKinsesisConsumer}} - AFAIK it can not produce bounded streams. Should
be replaced with FLIP-27 source at some point also.
* {{StreamingFileSink}}/{{FlinkKafkaProducer}} - they also should be replaced
with new sinks, using new Sink API and {{OperatorCoordinator}} to avoid using
{{UnionListState}}. Besides impact of not supporting checkpointing with
partially finished sinks is quite limited. Sinks in most cases are proceeded
with a keyed exchange. Keyed exchanges are making impossible to have partially
finished subtasks for any extended period of time. Hence in most cases all
sinks should finish at the same time.
That's why I would vote on implementing the simplest stop-gap solution (decline
checkpoints with partially finished task that used {{UnionListState}}) and
rather focus on getting rid of {{UnionListState}} rather then implement a non
trivial long term solution. What do you think [~gaoyunhaii]?
was (Author: pnowojski):
Another potential long-term solution is to just not support {{UnionListState}}.
Even now the impact of that should be limited, and it will be less of an issue
as we migrate more and more sources/sinks to the new interfaces. I think the
idea on the DataStream API part is to deprecate/get rid of at least the
{{UnionListState}} or even all of the operator state from the public API, as
it's causing lot's of troubles. Especially {{UnionListState}} use cases should
be replaced with using {{OperatorCoordinator}}.
Currently I think this issue affects the built-in:
* {{FlinkKafkaConsumer}} - we already have FLIP-27 replacement. Existing
{{FlinkKafkaConsumer}} might be dropped soon
* {{FlinkKinsesisConsumer}} - AFAIK it can not produce bounded streams. Should
be replaced with FLIP-27 source at some point also.
* {{StreamingFileSink}}/{{FlinkKafkaProducer}} - they also should be replaced
with new sinks, using new Sink API and {{OperatorCoordinator}} to avoid using
{{UnionListState}}. Besides impact of not supporting checkpointing with
partially finished sinks is quite limited. Sinks in most cases are proceeded
with a keyed exchange. Keyed exchanges are making impossible to have partially
finished subtasks for any extended period of time. Hence in most cases all
sinks should finish at the same time.
That's why I would vote on implementing the simplest stop-gap solution (decline
checkpoints with partially finished task that used {{UnionListState}} and
rather focus on getting rid of {{UnionListState}} rather then implement a non
trivial long term solution. What do you think [~gaoyunhaii]?
> Handle UnionListState with finished operators
> ---------------------------------------------
>
> Key: FLINK-21080
> URL: https://issues.apache.org/jira/browse/FLINK-21080
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream, Runtime / Checkpointing
> Reporter: Yun Gao
> Assignee: Yun Gao
> Priority: Major
> Labels: auto-unassigned
>
> Most legacy source operators would record the offset for each partitions, and
> after recovery it would read from the recorded offset. If before a checkpoint
> some subtasks are finished, the corresponding partition offsets would be
> deserted in the checkpoint. Then if the job recover with this checkpoint, the
> legacy source would re-discovery all the partitions and for those finished
> tasks, the legacy source would re-read them since their offsets are not
> recorded.
> Therefore, we would like to fail the checkpoint if some legacy source
> operators have part of subtasks finished.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)