[FLINK-5021] Guarantee PROCESS_ONCE works correctly after recovering.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a90c6bd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a90c6bd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a90c6bd Branch: refs/heads/master Commit: 5a90c6bdd0a4c279fca7665532c4a34992ffbb24 Parents: 98a6176 Author: kl0u <[email protected]> Authored: Thu Nov 3 11:08:45 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Nov 11 14:05:58 2016 +0100 ---------------------------------------------------------------------- .../source/ContinuousFileMonitoringFunction.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5a90c6bd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index 10068a6..54ab0ab 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -147,8 +147,15 @@ public class ContinuousFileMonitoringFunction<OUT> break; case PROCESS_ONCE: synchronized (checkpointLock) { - monitorDirAndForwardSplits(fileSystem, context); - globalModificationTime = Long.MAX_VALUE; + + // the following check guarantees that if we restart + // after a failure and we managed to have a successful + // checkpoint, we will not reprocess the directory. + + if (globalModificationTime == Long.MIN_VALUE) { + monitorDirAndForwardSplits(fileSystem, context); + globalModificationTime = Long.MAX_VALUE; + } isRunning = false; } break;
