[FLINK-9940] Fix - File-source continuous monitoring mode - out-of-order files
were missed
## Fix the issue with ContinuousFileMonitoringFunction - out-of-order files
were missed in continuous directory scanning mode.
- _Cause_: In the existing directory monitoring mechanism, Flink was
maintaining the maximum last-modified-timestamp of all identified files
(_globalModificationTime_) so that in the next scan, all files with
last-modified-timestamp earlier than that _globalModificationTime_ will be
ignored.
- _Fix_: This fix provides an additional param when creating a
ContinuousFileMonitoringFunction: readConsistencyOffset. Every scan now starts
from that max last-modified-timestamp minus this offset. A new list of
processedFiles is also maintained, which consists of all known files having
modTimestamp in that offset period.
- For testing this fix, a change to flink-fs-tests has also been made: The
collection of seenFiles is changed from a TreeSet to a SortedList. This change
is to verify the ExactOnce of file scanning, instead of AtLeastOnce.
## Verifying this change
This change is already covered by existing tests with slight update.
- ContinuousFileProcessingMigrationTest.testMonitoringSourceRestore.
- ContinuousFileProcessingTest.{testFunctionRestore, testProcessContinuously}
This change also added test:
- ContinuousFileProcessingTest.testProcessContinuouslyWithNoteTooLateFile
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: yes
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes (per-file).
This is expected to have minimal impact.
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? JavaDocs
[ Full content available at: https://github.com/apache/flink/pull/6613 ]
This message was relayed via gitbox.apache.org for [email protected]