Repository: beam
Updated Branches:
  refs/heads/master ef6a5008a -> b672cde11


Simplify and improve exception unwrapping in TestFlinkRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/76b7991f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/76b7991f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/76b7991f

Branch: refs/heads/master
Commit: 76b7991f102e8e724686042be8266778d4eef44f
Parents: ef6a500
Author: Eugene Kirpichov <[email protected]>
Authored: Tue Mar 14 18:23:19 2017 -0700
Committer: Thomas Groh <[email protected]>
Committed: Fri Mar 17 09:54:11 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/flink/TestFlinkRunner.java     | 39 +++++++-------------
 1 file changed, 13 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/76b7991f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index ef56b55..8f50105 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -56,36 +56,23 @@ public class TestFlinkRunner extends 
PipelineRunner<PipelineResult> {
   public PipelineResult run(Pipeline pipeline) {
     try {
       return delegate.run(pipeline);
-    } catch (Throwable e) {
+    } catch (Throwable t) {
       // Special case hack to pull out assertion errors from PAssert; instead 
there should
       // probably be a better story along the lines of UserCodeException.
-      Throwable cause = e;
-      Throwable oldCause = e;
-      PipelineExecutionException executionException = null;
-      do {
-
-        // find UserCodeException and throw PipelineExecutionException
-        if (cause instanceof UserCodeException) {
-          executionException = new 
PipelineExecutionException(cause.getCause());
-        }
-
-        if (cause.getCause() == null) {
-          break;
-        }
-
-        oldCause = cause;
-        cause = cause.getCause();
-
-      } while (!oldCause.equals(cause));
-      if (cause instanceof AssertionError) {
-        throw (AssertionError) cause;
-      } else {
-        if (executionException != null) {
-          throw executionException;
-        } else {
-          throw e;
+      UserCodeException innermostUserCodeException = null;
+      Throwable current = t;
+      for (; current.getCause() != null; current = current.getCause()) {
+        if (current instanceof UserCodeException) {
+          innermostUserCodeException = ((UserCodeException) current);
         }
       }
+      if (innermostUserCodeException != null) {
+        current = innermostUserCodeException.getCause();
+      }
+      if (current instanceof AssertionError) {
+        throw (AssertionError) current;
+      }
+      throw new PipelineExecutionException(current);
     }
   }
 

Reply via email to