[FLINK-4569][tests] Respect exceptions thrown in thread in JobRetrievalITCase

This closes #5689.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b1b7f38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b1b7f38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b1b7f38

Branch: refs/heads/release-1.5
Commit: 9b1b7f385f11745544eab21f0383ca3f73e63814
Parents: 2c850d1
Author: zentol <[email protected]>
Authored: Tue Mar 13 13:00:47 2018 +0100
Committer: zentol <[email protected]>
Committed: Wed Mar 14 20:47:52 2018 +0100

----------------------------------------------------------------------
 .../flink/test/example/client/JobRetrievalITCase.java   | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b1b7f38/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index d34b6c33..57198c0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -42,6 +42,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 
 import scala.collection.Seq;
 
@@ -85,6 +86,7 @@ public class JobRetrievalITCase extends TestLogger {
                // has been attached in resumingThread
                lock.acquire();
                client.runDetached(jobGraph, 
JobRetrievalITCase.class.getClassLoader());
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
                final Thread resumingThread = new Thread(new Runnable() {
                        @Override
@@ -92,10 +94,10 @@ public class JobRetrievalITCase extends TestLogger {
                                try {
                                        
assertNotNull(client.retrieveJob(jobID));
                                } catch (Throwable e) {
-                                       fail(e.getMessage());
+                                       error.set(e);
                                }
                        }
-               });
+               }, "Flink-Job-Retriever");
 
                final Seq<ActorSystem> actorSystemSeq = 
cluster.jobManagerActorSystems().get();
                final ActorSystem actorSystem = actorSystemSeq.last();
@@ -119,6 +121,11 @@ public class JobRetrievalITCase extends TestLogger {
                lock.release();
 
                resumingThread.join();
+
+               Throwable exception = error.get();
+               if (exception != null) {
+                       throw new AssertionError(exception);
+               }
        }
 
        @Test
@@ -148,6 +155,7 @@ public class JobRetrievalITCase extends TestLogger {
                @Override
                public void invoke() throws Exception {
                        lock.acquire();
+                       lock.release();
                }
        }
 

Reply via email to