[FLINK-5021] Guarantee PROCESS_ONCE works correctly after recovering.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a90c6bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a90c6bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a90c6bd

Branch: refs/heads/master
Commit: 5a90c6bdd0a4c279fca7665532c4a34992ffbb24
Parents: 98a6176
Author: kl0u <[email protected]>
Authored: Thu Nov 3 11:08:45 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Nov 11 14:05:58 2016 +0100

----------------------------------------------------------------------
 .../source/ContinuousFileMonitoringFunction.java         | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5a90c6bd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 10068a6..54ab0ab 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -147,8 +147,15 @@ public class ContinuousFileMonitoringFunction<OUT>
                                break;
                        case PROCESS_ONCE:
                                synchronized (checkpointLock) {
-                                       monitorDirAndForwardSplits(fileSystem, 
context);
-                                       globalModificationTime = Long.MAX_VALUE;
+
+                                       // the following check guarantees that 
if we restart
+                                       // after a failure and we managed to 
have a successful
+                                       // checkpoint, we will not reprocess 
the directory.
+
+                                       if (globalModificationTime == 
Long.MIN_VALUE) {
+                                               
monitorDirAndForwardSplits(fileSystem, context);
+                                               globalModificationTime = 
Long.MAX_VALUE;
+                                       }
                                        isRunning = false;
                                }
                                break;

Reply via email to