[FLINK-3745] [runtime] Fix early stopping of stream sources

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8570b6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8570b6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8570b6dc

Branch: refs/heads/master
Commit: 8570b6dc3ae3c69dc50e81a46835d40df8a03992
Parents: 2728f92
Author: Stephan Ewen <[email protected]>
Authored: Wed Apr 13 12:26:42 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Apr 13 20:50:49 2016 +0200

----------------------------------------------------------------------
 .../tasks/StoppableSourceStreamTask.java        | 15 +++-
 .../tasks/SourceStreamTaskStoppingTest.java     | 94 ++++++++++++++++++++
 .../runtime/tasks/SourceStreamTaskTest.java     | 40 +--------
 .../streaming/timestamp/TimestampITCase.java    | 18 ++--
 .../test/classloading/jar/UserCodeType.java     |  1 +
 5 files changed, 123 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
index 5173796..7ff39b7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
@@ -31,9 +31,20 @@ import 
org.apache.flink.streaming.api.operators.StoppableStreamSource;
 public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & 
StoppableFunction>
        extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> 
implements StoppableTask {
 
+       private volatile boolean stopped;
+
        @Override
-       public void stop() {
-               this.headOperator.stop();
+       protected void run() throws Exception {
+               if (!stopped) {
+                       super.run();
+               }
        }
 
+       @Override
+       public void stop() {
+               stopped = true;
+               if (this.headOperator != null) {
+                       this.headOperator.stop();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
new file mode 100644
index 0000000..ab9e59b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * These tests verify that the RichFunction methods are called (in correct 
order). And that
+ * checkpointing/element emission don't occur concurrently.
+ */
+public class SourceStreamTaskStoppingTest {
+
+
+       // test flag for testStop()
+       static boolean stopped = false;
+
+       @Test
+       public void testStop() {
+               final StoppableSourceStreamTask<Object, StoppableSource> 
sourceTask = new StoppableSourceStreamTask<>();
+               sourceTask.headOperator = new StoppableStreamSource<>(new 
StoppableSource());
+
+               sourceTask.stop();
+
+               assertTrue(stopped);
+       }
+
+       @Test
+       public void testStopBeforeInitialization() throws Exception {
+
+               final StoppableSourceStreamTask<Object, StoppableFailingSource> 
sourceTask = new StoppableSourceStreamTask<>();
+               sourceTask.stop();
+               
+               sourceTask.headOperator = new StoppableStreamSource<>(new 
StoppableFailingSource());
+               sourceTask.run();
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       private static class StoppableSource extends RichSourceFunction<Object> 
implements StoppableFunction {
+               private static final long serialVersionUID = 
728864804042338806L;
+
+               @Override
+               public void 
run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object>
 ctx)
+                               throws Exception {
+               }
+
+               @Override
+               public void cancel() {}
+
+               @Override
+               public void stop() {
+                       stopped = true;
+               }
+       }
+
+       private static class StoppableFailingSource extends 
RichSourceFunction<Object> implements StoppableFunction {
+               private static final long serialVersionUID = 
728864804042338806L;
+
+               @Override
+               public void run(SourceContext<Object> ctx) throws Exception {
+                       fail("should not be called");
+               }
+
+               @Override
+               public void cancel() {}
+
+               @Override
+               public void stop() {}
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index bfb2d34..cb779b0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -28,12 +27,13 @@ import 
org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -77,19 +77,6 @@ public class SourceStreamTaskTest {
                Assert.assertEquals(10, resultElements.size());
        }
 
-       // test flag for testStop()
-       static boolean stopped = false;
-
-       @Test
-       public void testStop() {
-               final StoppableSourceStreamTask<Object, StoppableSource> 
sourceTask = new StoppableSourceStreamTask<>();
-               sourceTask.headOperator = new StoppableStreamSource<>(new 
StoppableSource());
-
-               sourceTask.stop();
-
-               Assert.assertTrue(stopped);
-       }
-
        /**
         * This test ensures that the SourceStreamTask properly serializes 
checkpointing
         * and element emission. This also verifies that there are no 
concurrent invocations
@@ -155,24 +142,7 @@ public class SourceStreamTaskTest {
                }
        }
 
-       private static class StoppableSource extends RichSourceFunction<Object> 
implements StoppableFunction {
-               private static final long serialVersionUID = 
728864804042338806L;
-
-               @Override
-               public void 
run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object>
 ctx)
-                               throws Exception {
-               }
-
-               @Override
-               public void cancel() {}
-
-               @Override
-               public void stop() {
-                       stopped = true;
-               }
-       }
-
-       private static class MockSource implements SourceFunction<Tuple2<Long, 
Integer>>, Checkpointed {
+       private static class MockSource implements SourceFunction<Tuple2<Long, 
Integer>>, Checkpointed<Serializable> {
                private static final long serialVersionUID = 1;
 
                private int maxElements;
@@ -240,9 +210,7 @@ public class SourceStreamTaskTest {
                }
 
                @Override
-               public void restoreState(Serializable state) {
-
-               }
+               public void restoreState(Serializable state) {}
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 1a59ab3..d857672 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -203,7 +203,7 @@ public class TimestampITCase {
                                        // try until we get the running jobs
                                        List<JobID> running;
                                        while ((running = 
cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
-                                               Thread.sleep(100);
+                                               Thread.sleep(50);
                                        }
 
                                        JobID id = running.get(0);
@@ -223,22 +223,26 @@ public class TimestampITCase {
                env.execute();
 
                // verify that all the watermarks arrived at the final custom 
operator
-               for (int i = 0; i < PARALLELISM; i++) {
+               for (List<Watermark> subtaskWatermarks : 
CustomOperator.finalWatermarks) {
+                       
                        // we are only guaranteed to see NUM_WATERMARKS / 2 
watermarks because the
                        // other source stops emitting after that
-                       for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
-                               if 
(!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + 
j))) {
+                       for (int j = 0; j < subtaskWatermarks.size(); j++) {
+                               if (subtaskWatermarks.get(j).getTimestamp() != 
initialTime + j) {
                                        System.err.println("All Watermarks: ");
                                        for (int k = 0; k <= NUM_WATERMARKS / 
2; k++) {
-                                               
System.err.println(CustomOperator.finalWatermarks[i].get(k));
+                                               
System.err.println(subtaskWatermarks.get(k));
                                        }
 
                                        fail("Wrong watermark.");
                                }
                        }
                        
-                       assertNotEquals(Watermark.MAX_WATERMARK,
-                                       
CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
+                       // if there are watermarks, the final one must not be 
the MAX watermark
+                       if (subtaskWatermarks.size() > 0) {
+                               assertNotEquals(Watermark.MAX_WATERMARK,
+                                               
subtaskWatermarks.get(subtaskWatermarks.size()-1));
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
index 333c01a..a073cba 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
@@ -48,6 +48,7 @@ public class UserCodeType {
                int port = Integer.parseInt(args[2]);
 
                ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+               env.getConfig().disableSysoutLogging();
 
                DataSet<Integer> input = env.fromElements(1,2,3,4,5);
 

Reply via email to