[FLINK-1081] FileMonitoringFunction fix for proper handling of modification times
Closes #226 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b63f269 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b63f269 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b63f269 Branch: refs/heads/master Commit: 9b63f269e7ced1c9cb58bffe0091eb3d86ac6624 Parents: bf5e39a Author: Gyula Fora <[email protected]> Authored: Sun Jan 25 20:20:40 2015 +0100 Committer: Gyula Fora <[email protected]> Committed: Sun Jan 25 21:26:54 2015 +0100 ---------------------------------------------------------------------- .../environment/StreamExecutionEnvironment.java | 9 ++--- .../function/source/FileMonitoringFunction.java | 36 ++++++++++++-------- .../api/scala/StreamExecutionEnvironment.scala | 15 ++++---- 3 files changed, 35 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9b63f269/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index b7eeed2..e0cdd02 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -224,8 +224,8 @@ public abstract class StreamExecutionEnvironment { * @param filePath * The path of the file, as a URI (e.g., * "file:///some/local/file" or "hdfs://host:port/file/path/"). - * @param interval - * The interval of file watching. + * @param intervalMillis + * The interval of file watching in milliseconds. * @param watchType * The watch type of file stream. When watchType is * {@link WatchType.ONLY_NEW_FILES}, the system processes only @@ -236,9 +236,10 @@ public abstract class StreamExecutionEnvironment { * * @return The DataStream containing the given directory. */ - public DataStream<String> readFileStream(String filePath, long interval, WatchType watchType) { + public DataStream<String> readFileStream(String filePath, long intervalMillis, + WatchType watchType) { DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction( - filePath, interval, watchType)); + filePath, intervalMillis, watchType), null, "File Stream"); return source.flatMap(new FileReadFunction()); } http://git-wip-us.apache.org/repos/asf/flink/blob/9b63f269/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java index c223c53..05a2489 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java @@ -48,15 +48,14 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon private WatchType watchType; private FileSystem fileSystem; - private long lastModificationTime; private Map<String, Long> offsetOfFiles; + private Map<String, Long> modificationTimes; public FileMonitoringFunction(String path, long interval, WatchType watchType) { this.path = path; this.interval = interval; this.watchType = watchType; - - this.lastModificationTime = System.currentTimeMillis(); + this.modificationTimes = new HashMap<String, Long>(); this.offsetOfFiles = new HashMap<String, Long>(); } @@ -67,7 +66,8 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon while (true) { List<String> files = listNewFiles(); for (String filePath : files) { - if (watchType == WatchType.ONLY_NEW_FILES || watchType == WatchType.REPROCESS_WITH_APPENDED) { + if (watchType == WatchType.ONLY_NEW_FILES + || watchType == WatchType.REPROCESS_WITH_APPENDED) { collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L)); offsetOfFiles.put(filePath, -1L); } else if (watchType == WatchType.PROCESS_ONLY_APPENDED) { @@ -90,28 +90,34 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon private List<String> listNewFiles() throws IOException { List<String> files = new ArrayList<String>(); + FileStatus[] statuses = fileSystem.listStatus(new Path(path)); for (FileStatus status : statuses) { Path filePath = status.getPath(); + String fileName = filePath.getName(); long modificationTime = status.getModificationTime(); - if (!isFiltered(filePath, modificationTime)) { + if (!isFiltered(fileName, modificationTime)) { files.add(filePath.toString()); + modificationTimes.put(fileName, modificationTime); } } - - lastModificationTime = System.currentTimeMillis(); - return files; } - private boolean isFiltered(Path path, long modificationTime) { - String filename = path.getName(); - - return lastModificationTime > modificationTime // not modified file - || (watchType == WatchType.ONLY_NEW_FILES && offsetOfFiles.containsKey(path.toString())) // modified file but already processed - || filename.startsWith(".") // hidden file - || filename.contains("_COPYING_"); // currently copying file + private boolean isFiltered(String fileName, long modificationTime) { + + if ((watchType == WatchType.ONLY_NEW_FILES && modificationTimes.containsKey(fileName)) + || fileName.startsWith(".") || fileName.contains("_COPYING_")) { + return true; + } else { + Long lastModification = modificationTimes.get(fileName); + if (lastModification == null) { + return false; + } else { + return lastModification >= modificationTime; + } + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b63f269/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index b4565c7..e0f50f8 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction } import org.apache.flink.util.Collector import org.apache.flink.api.scala.ClosureCleaner +import org.apache.flink.streaming.api.function.source.FileMonitoringFunction.WatchType class StreamExecutionEnvironment(javaEnv: JavaEnv) { @@ -81,14 +82,16 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { javaEnv.readTextFile(filePath) /** - * Creates a DataStream that represents the Strings produced by reading the - * given file line wise multiple times(infinite). The file will be read with - * the system's default character set. This functionality can be used for - * testing a topology. + * Creates a DataStream that contains the contents of file created while + * system watches the given path. The file will be read with the system's + * default character set. The user can check the monitoring interval in milliseconds, + * and the the way file modifications are handled. By default it checks for only new files + * every 100 milliseconds. * */ - def readTextStream(StreamPath: String): DataStream[String] = - javaEnv.readTextStream(StreamPath) + def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType = + WatchType.ONLY_NEW_FILES): DataStream[String] = + javaEnv.readFileStream(StreamPath, intervalMillis, watchType) /** * Creates a new DataStream that contains the strings received infinitely
