[
https://issues.apache.org/jira/browse/FLINK-9940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592532#comment-16592532
]
ASF GitHub Bot commented on FLINK-9940:
---------------------------------------
lvhuyen opened a new pull request #6613: [FLINK-9940] [Streaming][File source]
out-of-order files were missed in continuous monitoring
URL: https://github.com/apache/flink/pull/6613
[FLINK-9940] Fix - File-source continuous monitoring mode - out-of-order
files were missed
## Fix the issue with ContinuousFileMonitoringFunction - out-of-order files
were missed in continuous directory scanning mode.
- _Cause_: In the existing directory monitoring mechanism, Flink was
maintaining the maximum last-modified-timestamp of all identified files
(_globalModificationTime_) so that in the next scan, all files with
last-modified-timestamp earlier than that _globalModificationTime_ will be
ignored.
- _Fix_: This fix provides an additional param when creating a
ContinuousFileMonitoringFunction: readConsistencyOffset. Every scan now starts
from that max last-modified-timestamp minus this offset. A new list of
processedFiles is also maintained, which consists of all known files having
modTimestamp in that offset period.
- For testing this fix, a change to flink-fs-tests has also been made: The
collection of seenFiles is changed from a TreeSet to a SortedList. This change
is to verify the ExactOnce of file scanning, instead of AtLeastOnce.
## Verifying this change
This change is already covered by existing tests with slight update.
- ContinuousFileProcessingMigrationTest.testMonitoringSourceRestore.
- ContinuousFileProcessingTest.{testFunctionRestore, testProcessContinuously}
This change also added test:
- ContinuousFileProcessingTest.testProcessContinuouslyWithNoteTooLateFile
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: yes
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes
(per-file). This is expected to have minimal impact.
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? JavaDocs
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> File source continuous monitoring mode: S3 files sometimes missed
> -----------------------------------------------------------------
>
> Key: FLINK-9940
> URL: https://issues.apache.org/jira/browse/FLINK-9940
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.5.1
> Environment: Flink 1.5, EMRFS
> Reporter: Huyen Levan
> Assignee: Huyen Levan
> Priority: Major
> Labels: EMRFS, Flink, S3, pull-request-available
> Fix For: 1.7.0
>
>
> When using StreamExecutionEnvironment.readFile() with
> FileProcessingMode.PROCESS_CONTINUOUSLY mode to monitor an S3 prefix, if
> there is a high amount of new/modified files at the same time, the directory
> monitoring process might miss some files. The number of missing files depends
> on the monitoring interval.
> Cause: Flink tracks which files it has read by remembering the modification
> time of the file that was added (or modified) last. So when there are
> multiple files having a same last-modified timestamp.
> Suggested solution (thanks to [[Fabian
> Hueske|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=25]|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=25]):
> a hybrid approach that keeps the names of all files that have a mod
> timestamp that is larger than the max mod time minus an offset.
> _org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction_
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)