Cliff Resnick created FLINK-5083:
------------------------------------
Summary: Race condition in Rolling/Bucketing Sink pending files
cleanup
Key: FLINK-5083
URL: https://issues.apache.org/jira/browse/FLINK-5083
Project: Flink
Issue Type: Bug
Components: DataStream API
Affects Versions: 1.1.3, 1.2.0
Reporter: Cliff Resnick
In both Open and Restore methods there is code that:
1. gets a recursive listing from baseDir
2. iterates listing and name checks filenames based on subtaskIndex and other
criteria to find pending or in-progress files. If found delete.
The problem is that the recursive listing gets all files for all
subtaskIndexes. The race error is when #hasNext is called as part of the
iteration, a hidden existence check is made on the "next" file, which was
deleted by another task after-listing but pre-iteration, so an error is thrown
and the job fails.
Depending on the number of pending files, this condition may outlast the number
of job retries, each failing on a different file.
A solution would be use #listStatus instead. The hadoop FileSystem supports a
PathFilter in its #listStatus calls, but not in the recursive #listFiles call.
The cleanup is performed from the baseDir so the recursive listing would have
to be in Flink.
This touches on another issue. Over time, the directory listing is bound to get
very large, and re-listing everything from the baseDir may get increasingly
expensive, especially if the Fs is S3. Maybe we can have a Bucketer callback to
return a list of cleanup root directories based on the current file? I'm
guessing most people are using time based bucketing, so there's only so much of
a period where cleanup will matter. If so, then this would solve for the above
recursive listing problem.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)