[
https://issues.apache.org/jira/browse/FLINK-9587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542830#comment-16542830
]
Andrey Zagrebin commented on FLINK-9587:
----------------------------------------
Hi [~shumanski]
*ContinuousFileMonitoringFunction* is not currently designed for this kind of
source directory behaviour. The contact assumes that the content of it is
immutable, at least for the processing time of the incoming files which get
only added into the directory. In case of deletion before having been
processed, this is not the only place with race condition. It can fail any time
trying to process a file between its first discovery and actual processing.
I would suggest to try setting up *rsync* in a way that it uses some temporary
directory on the same disk partition for this kind of operations and atomically
renames ready files into the source directory of Flink: e.g.
[--temp-dir|https://linux.die.net/man/1/rsync].
> ContinuousFileMonitoringFunction crashes on short living files
> --------------------------------------------------------------
>
> Key: FLINK-9587
> URL: https://issues.apache.org/jira/browse/FLINK-9587
> Project: Flink
> Issue Type: Bug
> Components: FileSystem, Streaming, Streaming Connectors
> Affects Versions: 1.5.0
> Environment: Flink 1.5 running as a standalone cluster.
> Reporter: Andrei Shumanski
> Priority: Critical
> Fix For: 1.6.0
>
>
> Hi,
>
> We use Flink to monitor a directory for new files. The filesystem is a MapR
> Fuse mount that looks like a local FS.
> The files are copied to the directory by another process that uses rsync
> command. While a file is not completely written rsync creates a temporary
> file with a name like ".file.txt.uM6MfZ" where the last extension is a random
> string.
> When the copying is done - file is renamed to the final name "file.txt".
>
> The bug is that Flink does not correctly handle this behavior and does not
> take into account that files in the directory might be deleted.
>
> We are getting error traces:
> {code:java}
> java.io.FileNotFoundException: File
> file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or
> the user running Flink ('root') has insufficient permissions to access it.
> at
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
> at
> org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92)
> at
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707)
> at
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> In LocalFileSystem.listStatus(final Path f) we read the list of files in a
> directory and then create LocalFileStatus object for each of the files. But a
> file might be removed during the interval between these operations.
> I do not see any option to handle this exception in our code.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)