[ 
https://issues.apache.org/jira/browse/FLINK-20208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237296#comment-17237296
 ] 

Alexander Trushev commented on FLINK-20208:
-------------------------------------------

Hi [~gaoyunhaii] . I have checked your case. The inprogress files will be 
removed because their part counters greater than max part counter from the 
checkpoint.
 I found a scenario where outdated part files won't be removed. If in step 6 
characters start being written to a new bucket. In this case, the new bucket 
will not be cleaned up, because the bucket is not in chk-1.
 I see only 2 ways out:
 * leave them
 * in runtime, for each attempt to create a new part file, check whether a part 
file with the same name but a different UUID exists. This existing file is 
outdated and should be removed.

The second way is very similar to [https://github.com/apache/flink/pull/7313]. 
But there may be performance issues in runtime and issue of job downscaling 
("If there are rescaling between checkpoint and failover, for example, the 
parallelism is decreased from 10 to 5, then the files created by subtask 6 ~ 10 
would not be removed."). So I'm more inclined to the first way.

> Remove outdated in-progress files in StreamingFileSink
> ------------------------------------------------------
>
>                 Key: FLINK-20208
>                 URL: https://issues.apache.org/jira/browse/FLINK-20208
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.11.2
>            Reporter: Alexander Trushev
>            Priority: Minor
>
> Assume a job has StreamingFileSink with OnCheckpointRollingPolicy
> In the case:
>  # Acknowledged checkpoint
>  # Event is written to new .part-X-Y.inprogress.UUID1
>  # Job failure
>  # Job recovery from the checkpoint
>  # Event is written to new .part-X-Y.inprogress.UUID2
> we have the outdated part file .part-X-Y.inprogress.UUID1. Where X - subtask 
> index, Y - part counter.
> *Proposal*
>  Add method
> {code:java}
> boolean shouldRemoveOutdatedParts()
> {code}
> to RollingPolicy.
>  Add configurable parameter to OnCheckpointRollingPolicy and to 
> DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by 
> default false)
> We can remove such outdated part files by the next algorithm while restoring 
> job from a checkpoint
>  # After buckets state initializing check shouldRemoveOutdatedParts. If true 
> then (2)
>  # For each bucket scan bucket directory
>  # If three conditions are true then remove part file:
>  part filename contains "inprogress";
>  subtask index from filename equals to current subtask index;
>  part counter from filename more than or equals to current max part counter.
> I propose to remove outdated files, because the similar proposal to overwrite 
> outdated files has not been implemented
> [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to