Repository: flink
Updated Branches:
  refs/heads/master 7a5189525 -> ddba618d9


[FLINK-5037] Fixed instability in AbstractUdfStreamOperatorLifecycleTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddba618d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddba618d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddba618d

Branch: refs/heads/master
Commit: ddba618d9b9be9bab3da2544eabb2a9975bc8d9c
Parents: 7a51895
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Wed Nov 9 11:19:30 2016 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Nov 9 15:47:18 2016 +0100

----------------------------------------------------------------------
 .../AbstractUdfStreamOperatorLifecycleTest.java | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ddba618d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index cbb833b..965aec6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -231,19 +231,15 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                                testCheckpointer = new Thread() {
                                        @Override
                                        public void run() {
-                                               long id = 0;
-                                               while (true) {
-                                                       try {
-                                                               
Thread.sleep(50);
-                                                               if 
(getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
-                                                                               
new CheckpointMetaData(id++, System.currentTimeMillis()))) {
-                                                                       
LifecycleTrackingStreamSource.runFinish.trigger();
-                                                                       break;
-                                                               }
-                                                       } catch (Exception e) {
-                                                               
e.printStackTrace();
-                                                               Assert.fail();
+                                               try {
+                                                       runStarted.await();
+                                                       if 
(getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
+                                                                       new 
CheckpointMetaData(0, System.currentTimeMillis()))) {
+                                                               
LifecycleTrackingStreamSource.runFinish.trigger();
                                                        }
+                                               } catch (Exception e) {
+                                                       e.printStackTrace();
+                                                       Assert.fail();
                                                }
                                        }
                                };

Reply via email to