Repository: flink
Updated Branches:
  refs/heads/master ae2537e8b -> 62c465c30


[FLINK-1793] [streaming] Fix file source isRunning check for proper cancelling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62c465c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62c465c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62c465c3

Branch: refs/heads/master
Commit: 62c465c30c75523ecaa57233a0119ddc3d80ed4a
Parents: ae2537e
Author: Gyula Fora <[email protected]>
Authored: Wed Apr 8 23:23:17 2015 +0200
Committer: Gyula Fora <[email protected]>
Committed: Wed Apr 8 23:23:17 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/function/source/FileSourceFunction.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62c465c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index d7df266..9289355 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -67,7 +67,7 @@ public class FileSourceFunction extends 
RichParallelSourceFunction<String> {
                                String record = serializer.createInstance();
 
                                format.open(split);
-                               while (!format.reachedEnd()) {
+                               while (isRunning && !format.reachedEnd()) {
                                        if ((record = 
format.nextRecord(record)) != null) {
                                                collector.collect(record);
                                        }

Reply via email to