Repository: beam Updated Branches: refs/heads/master ef6a5008a -> b672cde11
Simplify and improve exception unwrapping in TestFlinkRunner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/76b7991f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/76b7991f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/76b7991f Branch: refs/heads/master Commit: 76b7991f102e8e724686042be8266778d4eef44f Parents: ef6a500 Author: Eugene Kirpichov <[email protected]> Authored: Tue Mar 14 18:23:19 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri Mar 17 09:54:11 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/flink/TestFlinkRunner.java | 39 +++++++------------- 1 file changed, 13 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/76b7991f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java index ef56b55..8f50105 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java @@ -56,36 +56,23 @@ public class TestFlinkRunner extends PipelineRunner<PipelineResult> { public PipelineResult run(Pipeline pipeline) { try { return delegate.run(pipeline); - } catch (Throwable e) { + } catch (Throwable t) { // Special case hack to pull out assertion errors from PAssert; instead there should // probably be a better story along the lines of UserCodeException. - Throwable cause = e; - Throwable oldCause = e; - PipelineExecutionException executionException = null; - do { - - // find UserCodeException and throw PipelineExecutionException - if (cause instanceof UserCodeException) { - executionException = new PipelineExecutionException(cause.getCause()); - } - - if (cause.getCause() == null) { - break; - } - - oldCause = cause; - cause = cause.getCause(); - - } while (!oldCause.equals(cause)); - if (cause instanceof AssertionError) { - throw (AssertionError) cause; - } else { - if (executionException != null) { - throw executionException; - } else { - throw e; + UserCodeException innermostUserCodeException = null; + Throwable current = t; + for (; current.getCause() != null; current = current.getCause()) { + if (current instanceof UserCodeException) { + innermostUserCodeException = ((UserCodeException) current); } } + if (innermostUserCodeException != null) { + current = innermostUserCodeException.getCause(); + } + if (current instanceof AssertionError) { + throw (AssertionError) current; + } + throw new PipelineExecutionException(current); } }
