[FLINK-3745] [runtime] Fix early stopping of stream sources
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8570b6dc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8570b6dc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8570b6dc Branch: refs/heads/master Commit: 8570b6dc3ae3c69dc50e81a46835d40df8a03992 Parents: 2728f92 Author: Stephan Ewen <[email protected]> Authored: Wed Apr 13 12:26:42 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Apr 13 20:50:49 2016 +0200 ---------------------------------------------------------------------- .../tasks/StoppableSourceStreamTask.java | 15 +++- .../tasks/SourceStreamTaskStoppingTest.java | 94 ++++++++++++++++++++ .../runtime/tasks/SourceStreamTaskTest.java | 40 +-------- .../streaming/timestamp/TimestampITCase.java | 18 ++-- .../test/classloading/jar/UserCodeType.java | 1 + 5 files changed, 123 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java index 5173796..7ff39b7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java @@ -31,9 +31,20 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource; public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction> extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask { + private volatile boolean stopped; + @Override - public void stop() { - this.headOperator.stop(); + protected void run() throws Exception { + if (!stopped) { + super.run(); + } } + @Override + public void stop() { + stopped = true; + if (this.headOperator != null) { + this.headOperator.stop(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java new file mode 100644 index 0000000..ab9e59b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.operators.StoppableStreamSource; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * These tests verify that the RichFunction methods are called (in correct order). And that + * checkpointing/element emission don't occur concurrently. + */ +public class SourceStreamTaskStoppingTest { + + + // test flag for testStop() + static boolean stopped = false; + + @Test + public void testStop() { + final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>(); + sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource()); + + sourceTask.stop(); + + assertTrue(stopped); + } + + @Test + public void testStopBeforeInitialization() throws Exception { + + final StoppableSourceStreamTask<Object, StoppableFailingSource> sourceTask = new StoppableSourceStreamTask<>(); + sourceTask.stop(); + + sourceTask.headOperator = new StoppableStreamSource<>(new StoppableFailingSource()); + sourceTask.run(); + } + + // ------------------------------------------------------------------------ + + private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction { + private static final long serialVersionUID = 728864804042338806L; + + @Override + public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx) + throws Exception { + } + + @Override + public void cancel() {} + + @Override + public void stop() { + stopped = true; + } + } + + private static class StoppableFailingSource extends RichSourceFunction<Object> implements StoppableFunction { + private static final long serialVersionUID = 728864804042338806L; + + @Override + public void run(SourceContext<Object> ctx) throws Exception { + fail("should not be called"); + } + + @Override + public void cancel() {} + + @Override + public void stop() {} + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index bfb2d34..cb779b0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -28,12 +27,13 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.StoppableStreamSource; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.util.TestHarnessUtil; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; + import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -77,19 +77,6 @@ public class SourceStreamTaskTest { Assert.assertEquals(10, resultElements.size()); } - // test flag for testStop() - static boolean stopped = false; - - @Test - public void testStop() { - final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>(); - sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource()); - - sourceTask.stop(); - - Assert.assertTrue(stopped); - } - /** * This test ensures that the SourceStreamTask properly serializes checkpointing * and element emission. This also verifies that there are no concurrent invocations @@ -155,24 +142,7 @@ public class SourceStreamTaskTest { } } - private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction { - private static final long serialVersionUID = 728864804042338806L; - - @Override - public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx) - throws Exception { - } - - @Override - public void cancel() {} - - @Override - public void stop() { - stopped = true; - } - } - - private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed { + private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> { private static final long serialVersionUID = 1; private int maxElements; @@ -240,9 +210,7 @@ public class SourceStreamTaskTest { } @Override - public void restoreState(Serializable state) { - - } + public void restoreState(Serializable state) {} } /** http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java index 1a59ab3..d857672 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java @@ -203,7 +203,7 @@ public class TimestampITCase { // try until we get the running jobs List<JobID> running; while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) { - Thread.sleep(100); + Thread.sleep(50); } JobID id = running.get(0); @@ -223,22 +223,26 @@ public class TimestampITCase { env.execute(); // verify that all the watermarks arrived at the final custom operator - for (int i = 0; i < PARALLELISM; i++) { + for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) { + // we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the // other source stops emitting after that - for (int j = 0; j < NUM_WATERMARKS / 2; j++) { - if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) { + for (int j = 0; j < subtaskWatermarks.size(); j++) { + if (subtaskWatermarks.get(j).getTimestamp() != initialTime + j) { System.err.println("All Watermarks: "); for (int k = 0; k <= NUM_WATERMARKS / 2; k++) { - System.err.println(CustomOperator.finalWatermarks[i].get(k)); + System.err.println(subtaskWatermarks.get(k)); } fail("Wrong watermark."); } } - assertNotEquals(Watermark.MAX_WATERMARK, - CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1)); + // if there are watermarks, the final one must not be the MAX watermark + if (subtaskWatermarks.size() > 0) { + assertNotEquals(Watermark.MAX_WATERMARK, + subtaskWatermarks.get(subtaskWatermarks.size()-1)); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java index 333c01a..a073cba 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java @@ -48,6 +48,7 @@ public class UserCodeType { int port = Integer.parseInt(args[2]); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile); + env.getConfig().disableSysoutLogging(); DataSet<Integer> input = env.fromElements(1,2,3,4,5);
