[
https://issues.apache.org/jira/browse/FLINK-20208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234844#comment-17234844
]
Alexander Trushev commented on FLINK-20208:
-------------------------------------------
Hello [~gaoyunhaii].
The second issue about rescaling. I think we can use even-split redistribution
scheme
{code:java}
OperatorStateStore.getListState(...)
{code}
and each subtask snapshots its own subtask index.
For example, we have 3 subtasks: "1", "2", "3". We change parallelism to 2.
Subtask "1" receives two indices [1, 2]. Subtask "2" receives one index [3].
These sublists of old indices we use to remove parts.
About the first issue let me think. I wrote the algorithm and implemented
prototype based on OnCheckpointRollingPolicy. DefaultRollingPolicy was
recklessly added to the description at the last moment.
> Remove outdated in-progress files in StreamingFileSink
> ------------------------------------------------------
>
> Key: FLINK-20208
> URL: https://issues.apache.org/jira/browse/FLINK-20208
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Affects Versions: 1.11.2
> Reporter: Alexander Trushev
> Priority: Minor
>
> Assume a job has StreamingFileSink with OnCheckpointRollingPolicy
> In the case:
> # Acknowledged checkpoint
> # Event is written to new .part-X-Y.UUID1
> # Job failure
> # Job recovery from the checkpoint
> # Event is written to new .part-X-Y.UUID2
> we have the outdated part file .part-X-Y.UUID1. Where X - subtask index, Y -
> part counter.
> *Proposal*
> Add method
> {code:java}
> boolean shouldRemoveOutdatedParts()
> {code}
> to RollingPolicy.
> Add configurable parameter to OnCheckpointRollingPolicy and to
> DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by
> default false)
> We can remove such outdated part files by the next algorithm while restoring
> job from a checkpoint
> # After buckets state initializing check shouldRemoveOutdatedParts. If true
> then (2)
> # For each inactive bucket scan bucket directory
> # If three conditions are true then remove part file:
> part filename contains "inprogress";
> subtask index from filename equals to current subtask index;
> part counter from filename more than or equals to current max part counter.
> I propose to remove outdated files, because the similar proposal to overwrite
> outdated files has not been implemented
> [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)