Repository: beam Updated Branches: refs/heads/jstorm-runner 7a28bf1af -> 80bd7f8be
jstorm-runner: handle UserCodeException in TestJStormRunner, and wraps in PipelineExecutionException if receives a checked Exception. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34bf5af9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34bf5af9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34bf5af9 Branch: refs/heads/jstorm-runner Commit: 34bf5af9b19c2fb90f2301823b698d06f21879a1 Parents: 7a28bf1 Author: Pei He <[email protected]> Authored: Mon Sep 4 15:02:14 2017 +0800 Committer: Pei He <[email protected]> Committed: Thu Sep 7 14:12:51 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/TestJStormRunner.java | 32 +++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/34bf5af9/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index 9d2e2f1..b637b7c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.util.UserCodeException; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,23 +85,23 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { result.getTopologyName(), numberOfAssertions); if (numberOfAssertions == 0) { result.waitUntilFinish(Duration.millis(RESULT_WAITING_TIME_MS)); - Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); + Throwable taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); if (taskExceptionRec != null) { LOG.info("Exception was found.", taskExceptionRec); - throw new RuntimeException(taskExceptionRec.getCause()); + handleTaskException(taskExceptionRec); } return result; } else { for (int waitTime = 0; waitTime <= ASSERTION_WAITING_TIME_MS;) { Optional<Boolean> success = checkForPAssertSuccess(result.metrics(), numberOfAssertions); - Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); + Throwable taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); if (success.isPresent() && success.get()) { return result; } else if (success.isPresent() && !success.get()) { throw new AssertionError("Failed assertion checks."); } else if (taskExceptionRec != null) { LOG.info("Exception was found.", taskExceptionRec); - throw new RuntimeException(taskExceptionRec.getCause()); + handleTaskException(taskExceptionRec); } else { JStormUtils.sleepMs(RESULT_CHECK_INTERVAL_MS); waitTime += RESULT_CHECK_INTERVAL_MS; @@ -116,6 +117,29 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { } } + private void handleTaskException(Throwable taskExceptionRec) { + Throwable cause; + if (taskExceptionRec.getCause() != null) { + cause = taskExceptionRec.getCause(); + } else { + cause = taskExceptionRec; + } + + UserCodeException innermostUserCodeException = null; + for (Throwable current = cause; current.getCause() != null; current = current.getCause()) { + if (current instanceof UserCodeException) { + innermostUserCodeException = ((UserCodeException) current); + } + } + if (innermostUserCodeException != null) { + cause = innermostUserCodeException.getCause(); + } + if (cause instanceof AssertionError) { + throw (AssertionError) cause; + } + throw new Pipeline.PipelineExecutionException(cause); + } + private Optional<Boolean> checkForPAssertSuccess( MetricResults metricResults, int expectedNumberOfAssertions) {
