Repository: flink
Updated Branches:
  refs/heads/master 542419ba0 -> 2558ae511


[hotfix][test] Add timeout for joining with CheckedThread


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/435d9d32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/435d9d32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/435d9d32

Branch: refs/heads/master
Commit: 435d9d320ba320b40eb328b59e32cda6cc2c531b
Parents: c6945c2
Author: Piotr Nowojski <[email protected]>
Authored: Tue Nov 28 08:41:40 2017 +0100
Committer: Stefan Richter <[email protected]>
Committed: Mon Jan 8 11:46:00 2018 +0100

----------------------------------------------------------------------
 .../flink/core/testutils/CheckedThread.java     | 31 +++++++++++++++++++-
 1 file changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/435d9d32/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
index 356847e..8de106d 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -34,6 +34,8 @@ public abstract class CheckedThread extends Thread {
        /** The error thrown from the main work method. */
        private volatile Throwable error;
 
+       private volatile boolean finished = false;
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -76,6 +78,9 @@ public abstract class CheckedThread extends Thread {
                catch (Throwable t) {
                        error = t;
                }
+               finally {
+                       finished = true;
+               }
        }
 
        /**
@@ -86,8 +91,23 @@ public abstract class CheckedThread extends Thread {
         * exceptions thrown from the {@link #go()} method.
         */
        public void sync() throws Exception {
-               super.join();
+               sync(0);
+       }
 
+       /**
+        * Waits with timeout until the thread is completed and checks whether 
any error
+        * occurred during the execution. In case of timeout an {@link 
Exception} is thrown.
+        *
+        * <p>This method blocks like {@link #join()}, but performs an 
additional check for
+        * exceptions thrown from the {@link #go()} method.
+        */
+       public void sync(long timeout) throws Exception {
+               join(timeout);
+               checkFinished();
+               checkError();
+       }
+
+       private void checkError() throws Exception {
                // propagate the error
                if (error != null) {
                        if (error instanceof Error) {
@@ -101,4 +121,13 @@ public abstract class CheckedThread extends Thread {
                        }
                }
        }
+
+       private void checkFinished() throws Exception {
+               if (!finished) {
+                       throw new Exception(String.format(
+                               "%s[name = %s] has not finished!",
+                               this.getClass().getSimpleName(),
+                               getName()));
+               }
+       }
 }

Reply via email to