[
https://issues.apache.org/jira/browse/FLINK-20208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17240864#comment-17240864
]
Yun Gao edited comment on FLINK-20208 at 11/30/20, 4:42 PM:
------------------------------------------------------------
Hi [~trushev] Very sorry for the late reply for thinking on this issue for some
while and very thanks for pushing forward this issue! if I understand right I
think you want to distinguish the pending files from the in-progress files to
deleted based on the union list state maxPartCounter, every task should be able
to acquire the whole list after recovery. I have two more comments:
1. Since currently each task might have multiple buckets and the maxPartCounter
snapshotted in each checkpoint is the maximum of all the buckets, if might not
be the boundary to the pending files and in-progress files for all the buckets.
One possible solution is to reset the counter for each bucket to maxPartCounter
after the snapshot. It should also be able to solve the issue that buckets get
created after snapshot.
2. Another issue is that currently the order of elements in the recovered
union list is not deterministic, therefore, there should be some method to find
the mapping of subtask index to its maxPartCounter after recovery.
One more background is that currently a new version of file sink, namely
FileSink based on the [New Sink
API|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API]
is added in release-1.12, it is a unified sink for both Batch and Streaming
Execution Mode and should be the replacement of StreamingFileSink in the long
future. Meanwhile, since we are also discussing [Deprecated the union list
state|https://lists.apache.org/thread.html/r8b87451cea2d3c9976ada333e38165fa03e220e3dd221bbff0b8d8b3%40%3Cdev.flink.apache.org],
thus the new FileSink does not rely on the union list state anymore.
Therefore, _from my point of view_ it would be better if the cleanup method
does not rely on the union list state and would also work for the new version
of sinks.
was (Author: gaoyunhaii):
Hi [~trushev] Very sorry for the late reply for thinking on this issue for some
while, if I understand right I think you want to distinguish the pending files
from the in-progress files to deleted based on the union list state
maxPartCounter, every task should be able to acquire the whole list after
recovery. I have two more comments:
1. Since currently each task might have multiple buckets and the maxPartCounter
snapshotted in each checkpoint is the maximum of all the buckets, if might not
be the boundary to the pending files and in-progress files for all the buckets.
One possible solution is to reset the counter for each bucket to maxPartCounter
after the snapshot. It should also be able to solve the issue that buckets get
created after snapshot.
2. Another issue is that currently the order of elements in the recovered
union list is not deterministic, therefore, there should be some method to find
the mapping of subtask index to its maxPartCounter after recovery.
One more background is that currently a new version of file sink, namely
FileSink based on the [New Sink
API|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API]
is added in release-1.12, it is a unified sink for both Batch and Streaming
Execution Mode and should be the replacement of StreamingFileSink in the long
future. Meanwhile, since we are also discussing [Deprecated the union list
state|https://lists.apache.org/thread.html/r8b87451cea2d3c9976ada333e38165fa03e220e3dd221bbff0b8d8b3%40%3Cdev.flink.apache.org],
thus the new FileSink does not rely on the union list state anymore.
Therefore, _from my point of view_ it would be better if the cleanup method
does not rely on the union list state and would also work for the new version
of sinks.
> Remove outdated in-progress files in StreamingFileSink
> ------------------------------------------------------
>
> Key: FLINK-20208
> URL: https://issues.apache.org/jira/browse/FLINK-20208
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Affects Versions: 1.11.2
> Reporter: Alexander Trushev
> Priority: Minor
>
> Assume a job has StreamingFileSink with OnCheckpointRollingPolicy
> In the case:
> # Acknowledged checkpoint
> # Event is written to new .part-X-Y.inprogress.UUID1
> # Job failure
> # Job recovery from the checkpoint
> # Event is written to new .part-X-Y.inprogress.UUID2
> we have the outdated part file .part-X-Y.inprogress.UUID1. Where X - subtask
> index, Y - part counter.
> *Proposal*
> Add method
> {code:java}
> boolean shouldRemoveOutdatedParts()
> {code}
> to RollingPolicy.
> Add configurable parameter to OnCheckpointRollingPolicy and to
> DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by
> default false)
> We can remove such outdated part files by the next algorithm while restoring
> job from a checkpoint
> # After buckets state initializing check shouldRemoveOutdatedParts. If true
> then (2)
> # For each bucket scan bucket directory
> # If three conditions are true then remove part file:
> part filename contains "inprogress";
> subtask index from filename equals to current subtask index;
> part counter from filename more than or equals to current max part counter.
> I propose to remove outdated files, because the similar proposal to overwrite
> outdated files has not been implemented
> [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)