[FLINK-1081] FileMonitoringFunction fix for proper handling of modification 
times

Closes #226


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b63f269
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b63f269
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b63f269

Branch: refs/heads/master
Commit: 9b63f269e7ced1c9cb58bffe0091eb3d86ac6624
Parents: bf5e39a
Author: Gyula Fora <[email protected]>
Authored: Sun Jan 25 20:20:40 2015 +0100
Committer: Gyula Fora <[email protected]>
Committed: Sun Jan 25 21:26:54 2015 +0100

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java |  9 ++---
 .../function/source/FileMonitoringFunction.java | 36 ++++++++++++--------
 .../api/scala/StreamExecutionEnvironment.scala  | 15 ++++----
 3 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b63f269/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b7eeed2..e0cdd02 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -224,8 +224,8 @@ public abstract class StreamExecutionEnvironment {
         * @param filePath
         *            The path of the file, as a URI (e.g.,
         *            "file:///some/local/file" or 
"hdfs://host:port/file/path/").
-        * @param interval
-        *            The interval of file watching.
+        * @param intervalMillis
+        *            The interval of file watching in milliseconds.
         * @param watchType
         *            The watch type of file stream. When watchType is
         *            {@link WatchType.ONLY_NEW_FILES}, the system processes 
only
@@ -236,9 +236,10 @@ public abstract class StreamExecutionEnvironment {
         * 
         * @return The DataStream containing the given directory.
         */
-       public DataStream<String> readFileStream(String filePath, long 
interval, WatchType watchType) {
+       public DataStream<String> readFileStream(String filePath, long 
intervalMillis,
+                       WatchType watchType) {
                DataStream<Tuple3<String, Long, Long>> source = addSource(new 
FileMonitoringFunction(
-                               filePath, interval, watchType));
+                               filePath, intervalMillis, watchType), null, 
"File Stream");
 
                return source.flatMap(new FileReadFunction());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b63f269/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
index c223c53..05a2489 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
@@ -48,15 +48,14 @@ public class FileMonitoringFunction implements 
SourceFunction<Tuple3<String, Lon
        private WatchType watchType;
 
        private FileSystem fileSystem;
-       private long lastModificationTime;
        private Map<String, Long> offsetOfFiles;
+       private Map<String, Long> modificationTimes;
 
        public FileMonitoringFunction(String path, long interval, WatchType 
watchType) {
                this.path = path;
                this.interval = interval;
                this.watchType = watchType;
-
-               this.lastModificationTime = System.currentTimeMillis();
+               this.modificationTimes = new HashMap<String, Long>();
                this.offsetOfFiles = new HashMap<String, Long>();
        }
 
@@ -67,7 +66,8 @@ public class FileMonitoringFunction implements 
SourceFunction<Tuple3<String, Lon
                while (true) {
                        List<String> files = listNewFiles();
                        for (String filePath : files) {
-                               if (watchType == WatchType.ONLY_NEW_FILES || 
watchType == WatchType.REPROCESS_WITH_APPENDED) {
+                               if (watchType == WatchType.ONLY_NEW_FILES
+                                               || watchType == 
WatchType.REPROCESS_WITH_APPENDED) {
                                        collector.collect(new Tuple3<String, 
Long, Long>(filePath, 0L, -1L));
                                        offsetOfFiles.put(filePath, -1L);
                                } else if (watchType == 
WatchType.PROCESS_ONLY_APPENDED) {
@@ -90,28 +90,34 @@ public class FileMonitoringFunction implements 
SourceFunction<Tuple3<String, Lon
 
        private List<String> listNewFiles() throws IOException {
                List<String> files = new ArrayList<String>();
+
                FileStatus[] statuses = fileSystem.listStatus(new Path(path));
 
                for (FileStatus status : statuses) {
                        Path filePath = status.getPath();
+                       String fileName = filePath.getName();
                        long modificationTime = status.getModificationTime();
 
-                       if (!isFiltered(filePath, modificationTime)) {
+                       if (!isFiltered(fileName, modificationTime)) {
                                files.add(filePath.toString());
+                               modificationTimes.put(fileName, 
modificationTime);
                        }
                }
-
-               lastModificationTime = System.currentTimeMillis();
-
                return files;
        }
 
-       private boolean isFiltered(Path path, long modificationTime) {
-               String filename = path.getName();
-
-               return lastModificationTime > modificationTime // not modified 
file
-                               || (watchType == WatchType.ONLY_NEW_FILES && 
offsetOfFiles.containsKey(path.toString())) // modified file but already 
processed
-                               || filename.startsWith(".") // hidden file
-                               || filename.contains("_COPYING_"); // currently 
copying file
+       private boolean isFiltered(String fileName, long modificationTime) {
+
+               if ((watchType == WatchType.ONLY_NEW_FILES && 
modificationTimes.containsKey(fileName))
+                               || fileName.startsWith(".") || 
fileName.contains("_COPYING_")) {
+                       return true;
+               } else {
+                       Long lastModification = modificationTimes.get(fileName);
+                       if (lastModification == null) {
+                               return false;
+                       } else {
+                               return lastModification >= modificationTime;
+                       }
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b63f269/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index b4565c7..e0f50f8 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -25,6 +25,7 @@ import 
org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment =>
 import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, 
SourceFunction }
 import org.apache.flink.util.Collector
 import org.apache.flink.api.scala.ClosureCleaner
+import 
org.apache.flink.streaming.api.function.source.FileMonitoringFunction.WatchType
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
@@ -81,14 +82,16 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.readTextFile(filePath)
 
   /**
-   * Creates a DataStream that represents the Strings produced by reading the
-   * given file line wise multiple times(infinite). The file will be read with
-   * the system's default character set. This functionality can be used for
-   * testing a topology.
+   * Creates a DataStream that contains the contents of file created while
+   * system watches the given path. The file will be read with the system's
+   * default character set. The user can check the monitoring interval in 
milliseconds,
+   * and the the way file modifications are handled. By default it checks for 
only new files
+   * every 100 milliseconds.
    *
    */
-  def readTextStream(StreamPath: String): DataStream[String] = 
-    javaEnv.readTextStream(StreamPath)
+  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
watchType: WatchType = 
+    WatchType.ONLY_NEW_FILES): DataStream[String] =
+    javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
 
   /**
    * Creates a new DataStream that contains the strings received infinitely

Reply via email to