Repository: flink Updated Branches: refs/heads/master 542419ba0 -> 2558ae511
[hotfix][test] Add timeout for joining with CheckedThread Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/435d9d32 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/435d9d32 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/435d9d32 Branch: refs/heads/master Commit: 435d9d320ba320b40eb328b59e32cda6cc2c531b Parents: c6945c2 Author: Piotr Nowojski <[email protected]> Authored: Tue Nov 28 08:41:40 2017 +0100 Committer: Stefan Richter <[email protected]> Committed: Mon Jan 8 11:46:00 2018 +0100 ---------------------------------------------------------------------- .../flink/core/testutils/CheckedThread.java | 31 +++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/435d9d32/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java index 356847e..8de106d 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java @@ -34,6 +34,8 @@ public abstract class CheckedThread extends Thread { /** The error thrown from the main work method. */ private volatile Throwable error; + private volatile boolean finished = false; + // ------------------------------------------------------------------------ /** @@ -76,6 +78,9 @@ public abstract class CheckedThread extends Thread { catch (Throwable t) { error = t; } + finally { + finished = true; + } } /** @@ -86,8 +91,23 @@ public abstract class CheckedThread extends Thread { * exceptions thrown from the {@link #go()} method. */ public void sync() throws Exception { - super.join(); + sync(0); + } + /** + * Waits with timeout until the thread is completed and checks whether any error + * occurred during the execution. In case of timeout an {@link Exception} is thrown. + * + * <p>This method blocks like {@link #join()}, but performs an additional check for + * exceptions thrown from the {@link #go()} method. + */ + public void sync(long timeout) throws Exception { + join(timeout); + checkFinished(); + checkError(); + } + + private void checkError() throws Exception { // propagate the error if (error != null) { if (error instanceof Error) { @@ -101,4 +121,13 @@ public abstract class CheckedThread extends Thread { } } } + + private void checkFinished() throws Exception { + if (!finished) { + throw new Exception(String.format( + "%s[name = %s] has not finished!", + this.getClass().getSimpleName(), + getName())); + } + } }
