[
https://issues.apache.org/jira/browse/FLINK-9940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166143#comment-17166143
]
Huyen Levan commented on FLINK-9940:
------------------------------------
[~maguowei] thanks for spending time looking at the PR.
When I worked on this PR, the purpose was to deal with two issues:
# Multiple files having the same last-modified timestamp
# AWS S3's eventually consistent behaviour, where a file created at t1 but only
becomes visible to the reader from t2 > t1.
For this purpose, that READ_CONSISTENCY_OFFSET_INTERVAL should be in the order
of a few hundred ms, or a few seconds at most (I tried that in our pipeline
with AWS S3, setting that threshold to 1 second completely eliminated the
problem), thus the number of files we need to store would not be more than a
few thousand, which converts to a few MB of operator memory.
The PR is not to deal with the case when files come late for minutes or hours.
I am not sure whether that can be considered valid scenarios (e.g: the user
deliberately changing the system time, or when the user copy files from other
locations using Linux _cp -p_ ). If they are, then I agree that we'll need a
different approach.
When the (overall) number of files is huge listing the files would become an
issue. E.g., in my case - using S3, from north of around 200K files, it took
more than 10 minutes to just scan the files. The approaches could be either
removing the old files or hierarchically partitioning them.
So, should we have different file-source implementations for different
behaviours of the external system?
> File source continuous monitoring mode: S3 files sometimes missed
> -----------------------------------------------------------------
>
> Key: FLINK-9940
> URL: https://issues.apache.org/jira/browse/FLINK-9940
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.5.1
> Environment: Flink 1.5, EMRFS
> Reporter: Huyen Levan
> Assignee: Huyen Levan
> Priority: Major
> Labels: EMRFS, Flink, S3, pull-request-available
>
> When using StreamExecutionEnvironment.readFile() with
> FileProcessingMode.PROCESS_CONTINUOUSLY mode to monitor an S3 prefix, if
> there is a high amount of new/modified files at the same time, the directory
> monitoring process might miss some files. The number of missing files depends
> on the monitoring interval.
> Cause: Flink tracks which files it has read by remembering the modification
> time of the file that was added (or modified) last. So when there are
> multiple files having a same last-modified timestamp.
> Suggested solution (thanks to [[Fabian
> Hueske|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=25]|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=25]):
> a hybrid approach that keeps the names of all files that have a mod
> timestamp that is larger than the max mod time minus an offset.
> _org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction_
--
This message was sent by Atlassian Jira
(v8.3.4#803005)