Repository: flink Updated Branches: refs/heads/master ae2537e8b -> 62c465c30
[FLINK-1793] [streaming] Fix file source isRunning check for proper cancelling Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62c465c3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62c465c3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62c465c3 Branch: refs/heads/master Commit: 62c465c30c75523ecaa57233a0119ddc3d80ed4a Parents: ae2537e Author: Gyula Fora <[email protected]> Authored: Wed Apr 8 23:23:17 2015 +0200 Committer: Gyula Fora <[email protected]> Committed: Wed Apr 8 23:23:17 2015 +0200 ---------------------------------------------------------------------- .../flink/streaming/api/function/source/FileSourceFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/62c465c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java index d7df266..9289355 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java @@ -67,7 +67,7 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> { String record = serializer.createInstance(); format.open(split); - while (!format.reachedEnd()) { + while (isRunning && !format.reachedEnd()) { if ((record = format.nextRecord(record)) != null) { collector.collect(record); }
