[
https://issues.apache.org/jira/browse/FLINK-20208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234754#comment-17234754
]
Yun Gao commented on FLINK-20208:
---------------------------------
Hi [~trushev], very thanks for bringing this up, and I also think removing
in-progress files is very required. However, from my perspective, there might
still be some issue:
# We could not distinguish the pending files (which has been snapshotted and
need to be re-committed) and in-progress files (which need to be removed) from
their part-no (both of their part-no is less that the max part-counter) since
we do not rename files when they turn into pending. We have to refer to the
state to see which files are in-progress files. However, since each bucket
serve as one item in the bucketStates, after failover they will be re-assigned
to different subtasks in average. So if before failover different subtasks have
different number of buckets, one bucket is very likely to be assigned to
another subtask, then we could not access the state easily.
# If there are rescaling between checkpoint and failover, for example, the
parallelism is decreased from 10 to 5, then the files created by subtask 6 ~ 10
would not be removed.
> Remove outdated in-progress files in StreamingFileSink
> ------------------------------------------------------
>
> Key: FLINK-20208
> URL: https://issues.apache.org/jira/browse/FLINK-20208
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Affects Versions: 1.11.2
> Reporter: Alexander Trushev
> Priority: Minor
>
> Assume a job has StreamingFileSink with OnCheckpointRollingPolicy
> In the case:
> # Acknowledged checkpoint
> # Event is written to new .part-X-Y.UUID1
> # Job failure
> # Job recovery from the checkpoint
> # Event is written to new .part-X-Y.UUID2
> we have the outdated part file .part-X-Y.UUID1. Where X - subtask index, Y -
> part counter.
> *Proposal*
> Add method
> {code:java}
> boolean shouldRemoveOutdatedParts()
> {code}
> to RollingPolicy.
> Add configurable parameter to OnCheckpointRollingPolicy and to
> DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by
> default false)
> We can remove such outdated part files by the next algorithm while restoring
> job from a checkpoint
> # After buckets state initializing check shouldRemoveOutdatedParts. If true
> then (2)
> # For each inactive bucket scan bucket directory
> # If three conditions are true then remove part file:
> part filename contains "inprogress";
> subtask index from filename equals to current subtask index;
> part counter from filename more than or equals to current max part counter.
> I propose to remove outdated files, because the similar proposal to overwrite
> outdated files has not been implemented
> [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)