Kostas Kloudas closed FLINK-5083.
    Resolution: Fixed

> Race condition in Rolling/Bucketing Sink pending files cleanup
> --------------------------------------------------------------
>                 Key: FLINK-5083
>                 URL: https://issues.apache.org/jira/browse/FLINK-5083
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Cliff Resnick
> In both Open and Restore methods there is code that:
> 1. gets a recursive listing from baseDir
> 2. iterates listing and name checks filenames based on subtaskIndex and other 
> criteria to find pending or in-progress files. If found delete.
> The problem is that the recursive listing gets all files for all 
> subtaskIndexes. The race error is when #hasNext is called as part of the 
> iteration, a hidden existence check is made on the "next" file, which was 
> deleted by another task after-listing but pre-iteration, so an error is 
> thrown and the job fails. 
> Depending on the number of pending files, this condition may outlast the 
> number of job retries, each failing on a different file.
> A solution would be use #listStatus instead. The hadoop FileSystem supports a 
> PathFilter in its #listStatus calls, but not in the recursive #listFiles 
> call. The cleanup is performed from the baseDir so the recursive listing 
> would have to be in Flink. 
> This touches on another issue. Over time, the directory listing is bound to 
> get very large, and re-listing everything from the baseDir may get 
> increasingly expensive, especially if the Fs is S3. Maybe we can have a 
> Bucketer callback to return a list of cleanup root directories based on the 
> current file? I'm guessing most people are using time based bucketing, so 
> there's only so much of a period where cleanup will matter. If so, then this 
> would solve for the above recursive listing problem.

This message was sent by Atlassian JIRA

Reply via email to