[FLINK-9940] Fix - File-source continuous monitoring mode - out-of-order files 
were missed

## Fix the issue with ContinuousFileMonitoringFunction - out-of-order files 
were missed in continuous directory scanning mode.

- _Cause_: In the existing directory monitoring mechanism, Flink was 
maintaining the maximum last-modified-timestamp of all identified files 
(_globalModificationTime_) so that in the next scan, all files with 
last-modified-timestamp earlier than that _globalModificationTime_ will be 
ignored.


- _Fix_: This fix provides an additional param when creating a 
ContinuousFileMonitoringFunction: readConsistencyOffset. Every scan now starts 
from that max last-modified-timestamp minus this offset. A new list of 
processedFiles is also maintained, which consists of all known files having 
modTimestamp in that offset period.
- For testing this fix, a change to flink-fs-tests has also been made: The 
collection of seenFiles is changed from a TreeSet to a SortedList. This change 
is to verify the ExactOnce of file scanning, instead of AtLeastOnce.

## Verifying this change
This change is already covered by existing tests with slight update.
- ContinuousFileProcessingMigrationTest.testMonitoringSourceRestore.
- ContinuousFileProcessingTest.{testFunctionRestore, testProcessContinuously}
This change also added test: 

- ContinuousFileProcessingTest.testProcessContinuouslyWithNoteTooLateFile

## Does this pull request potentially affect one of the following parts:
  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): yes (per-file). 
This is expected to have minimal impact.
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation
  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs


[ Full content available at: https://github.com/apache/flink/pull/6613 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to