[ 
https://issues.apache.org/jira/browse/FLINK-20208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17240864#comment-17240864
 ] 

Yun Gao edited comment on FLINK-20208 at 11/30/20, 4:42 PM:
------------------------------------------------------------

Hi [~trushev] Very sorry for the late reply for thinking on this issue for some 
while and very thanks for pushing forward this issue! if I understand right I 
think you want to distinguish the pending files from the in-progress files to 
deleted based on the union list state maxPartCounter, every task should be able 
to acquire the whole list after recovery. I have two more comments:

1. Since currently each task might have multiple buckets and the maxPartCounter 
snapshotted in each checkpoint is the maximum of all the buckets, if might not 
be the boundary to the pending files and in-progress files for all the buckets. 
One possible solution is to reset the counter for each bucket to maxPartCounter 
after the snapshot. It should also be able to solve the issue that buckets get 
created after snapshot. 
 2. Another issue is that currently the order of elements in the recovered 
union list is not deterministic, therefore, there should be some method to find 
the mapping of subtask index to its maxPartCounter after recovery.

One more background is that currently a new version of file sink, namely 
FileSink based on the [New Sink 
API|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API]
 is added in release-1.12, it is a unified sink for both Batch and Streaming 
Execution Mode and should be the replacement of StreamingFileSink in the long 
future. Meanwhile, since we are also discussing [Deprecated the union list 
state|https://lists.apache.org/thread.html/r8b87451cea2d3c9976ada333e38165fa03e220e3dd221bbff0b8d8b3%40%3Cdev.flink.apache.org],
 thus the new FileSink does not rely on the union list state anymore. 
Therefore, _from my point of view_ it would be better if the cleanup method 
does not rely on the union list state and would also work for the new version 
of sinks. 


was (Author: gaoyunhaii):
Hi [~trushev] Very sorry for the late reply for thinking on this issue for some 
while, if I understand right I think you want to distinguish the pending files 
from the in-progress files to deleted based on the union list state 
maxPartCounter, every task should be able to acquire the whole list after 
recovery. I have two more comments:

1. Since currently each task might have multiple buckets and the maxPartCounter 
snapshotted in each checkpoint is the maximum of all the buckets, if might not 
be the boundary to the pending files and in-progress files for all the buckets. 
One possible solution is to reset the counter for each bucket to maxPartCounter 
after the snapshot. It should also be able to solve the issue that buckets get 
created after snapshot. 
 2. Another issue is that currently the order of elements in the recovered 
union list is not deterministic, therefore, there should be some method to find 
the mapping of subtask index to its maxPartCounter after recovery.

One more background is that currently a new version of file sink, namely 
FileSink based on the [New Sink 
API|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API]
 is added in release-1.12, it is a unified sink for both Batch and Streaming 
Execution Mode and should be the replacement of StreamingFileSink in the long 
future. Meanwhile, since we are also discussing [Deprecated the union list 
state|https://lists.apache.org/thread.html/r8b87451cea2d3c9976ada333e38165fa03e220e3dd221bbff0b8d8b3%40%3Cdev.flink.apache.org],
 thus the new FileSink does not rely on the union list state anymore. 
Therefore, _from my point of view_ it would be better if the cleanup method 
does not rely on the union list state and would also work for the new version 
of sinks. 

> Remove outdated in-progress files in StreamingFileSink
> ------------------------------------------------------
>
>                 Key: FLINK-20208
>                 URL: https://issues.apache.org/jira/browse/FLINK-20208
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.11.2
>            Reporter: Alexander Trushev
>            Priority: Minor
>
> Assume a job has StreamingFileSink with OnCheckpointRollingPolicy
> In the case:
>  # Acknowledged checkpoint
>  # Event is written to new .part-X-Y.inprogress.UUID1
>  # Job failure
>  # Job recovery from the checkpoint
>  # Event is written to new .part-X-Y.inprogress.UUID2
> we have the outdated part file .part-X-Y.inprogress.UUID1. Where X - subtask 
> index, Y - part counter.
> *Proposal*
>  Add method
> {code:java}
> boolean shouldRemoveOutdatedParts()
> {code}
> to RollingPolicy.
>  Add configurable parameter to OnCheckpointRollingPolicy and to 
> DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by 
> default false)
> We can remove such outdated part files by the next algorithm while restoring 
> job from a checkpoint
>  # After buckets state initializing check shouldRemoveOutdatedParts. If true 
> then (2)
>  # For each bucket scan bucket directory
>  # If three conditions are true then remove part file:
>  part filename contains "inprogress";
>  subtask index from filename equals to current subtask index;
>  part counter from filename more than or equals to current max part counter.
> I propose to remove outdated files, because the similar proposal to overwrite 
> outdated files has not been implemented
> [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to