Repository: flink Updated Branches: refs/heads/master 7a5189525 -> ddba618d9
[FLINK-5037] Fixed instability in AbstractUdfStreamOperatorLifecycleTest Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddba618d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddba618d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddba618d Branch: refs/heads/master Commit: ddba618d9b9be9bab3da2544eabb2a9975bc8d9c Parents: 7a51895 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Wed Nov 9 11:19:30 2016 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed Nov 9 15:47:18 2016 +0100 ---------------------------------------------------------------------- .../AbstractUdfStreamOperatorLifecycleTest.java | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ddba618d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java index cbb833b..965aec6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -231,19 +231,15 @@ public class AbstractUdfStreamOperatorLifecycleTest { testCheckpointer = new Thread() { @Override public void run() { - long id = 0; - while (true) { - try { - Thread.sleep(50); - if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint( - new CheckpointMetaData(id++, System.currentTimeMillis()))) { - LifecycleTrackingStreamSource.runFinish.trigger(); - break; - } - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); + try { + runStarted.await(); + if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint( + new CheckpointMetaData(0, System.currentTimeMillis()))) { + LifecycleTrackingStreamSource.runFinish.trigger(); } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); } } };