[FLINK-4569][tests] Respect exceptions thrown in thread in JobRetrievalITCase
This closes #5689. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b1b7f38 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b1b7f38 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b1b7f38 Branch: refs/heads/release-1.5 Commit: 9b1b7f385f11745544eab21f0383ca3f73e63814 Parents: 2c850d1 Author: zentol <[email protected]> Authored: Tue Mar 13 13:00:47 2018 +0100 Committer: zentol <[email protected]> Committed: Wed Mar 14 20:47:52 2018 +0100 ---------------------------------------------------------------------- .../flink/test/example/client/JobRetrievalITCase.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9b1b7f38/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java index d34b6c33..57198c0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java @@ -42,6 +42,7 @@ import org.junit.BeforeClass; import org.junit.Test; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; import scala.collection.Seq; @@ -85,6 +86,7 @@ public class JobRetrievalITCase extends TestLogger { // has been attached in resumingThread lock.acquire(); client.runDetached(jobGraph, JobRetrievalITCase.class.getClassLoader()); + final AtomicReference<Throwable> error = new AtomicReference<>(); final Thread resumingThread = new Thread(new Runnable() { @Override @@ -92,10 +94,10 @@ public class JobRetrievalITCase extends TestLogger { try { assertNotNull(client.retrieveJob(jobID)); } catch (Throwable e) { - fail(e.getMessage()); + error.set(e); } } - }); + }, "Flink-Job-Retriever"); final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get(); final ActorSystem actorSystem = actorSystemSeq.last(); @@ -119,6 +121,11 @@ public class JobRetrievalITCase extends TestLogger { lock.release(); resumingThread.join(); + + Throwable exception = error.get(); + if (exception != null) { + throw new AssertionError(exception); + } } @Test @@ -148,6 +155,7 @@ public class JobRetrievalITCase extends TestLogger { @Override public void invoke() throws Exception { lock.acquire(); + lock.release(); } }
