jstorm-runner: Throw AssertionError instead of RuntimeException when pipeline encounter exception
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af5221c0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af5221c0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af5221c0 Branch: refs/heads/jstorm-runner Commit: af5221c001678e36de6492fa20b3fc4026f486e8 Parents: dc6f63c Author: basti.lj <[email protected]> Authored: Tue Jul 18 14:50:19 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:58 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/TestJStormRunner.java | 41 ++++++++++---------- 1 file changed, 21 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/af5221c0/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index a117675..0088cf9 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -47,6 +47,7 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { @Override public JStormRunnerResult run(Pipeline pipeline) { + TaskReportErrorAndDie.setExceptionRecord(null); JStormRunnerResult result = stormRunner.run(pipeline); try { @@ -54,30 +55,30 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions); - if (numberOfAssertions == 0) { - // If assert number is zero, wait 5 sec - JStormUtils.sleepMs(5000); + + int maxTimeoutSec = numberOfAssertions > 0 ? 20 : 5; + for (int waitTime = 0; waitTime <= maxTimeoutSec * 1000; ) { + Optional<Boolean> success = numberOfAssertions > 0 + ? checkForPAssertSuccess(numberOfAssertions) : Optional.<Boolean>absent(); Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); - if (taskExceptionRec != null) { - throw new RuntimeException(taskExceptionRec.getCause()); - } - return result; - } else { - for (int i = 0; i < 40; ++i) { - Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions); - Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); - if (success.isPresent() && success.get()) { - return result; - } else if (success.isPresent() && !success.get()) { - throw new AssertionError("Failed assertion checks."); - } else if (taskExceptionRec != null) { - throw new RuntimeException(taskExceptionRec.getCause()); - } else { - JStormUtils.sleepMs(500); - } + if (success.isPresent() && success.get()) { + return result; + } else if (success.isPresent() && !success.get()) { + throw new AssertionError("Failed assertion checks."); + } else if (taskExceptionRec != null) { + LOG.info("Exception was found.", taskExceptionRec); + throw new AssertionError(taskExceptionRec.getCause()); + } else { + JStormUtils.sleepMs(500); + waitTime += 500; } + } + + if (numberOfAssertions > 0) { LOG.info("Assertion checks timed out."); throw new AssertionError("Assertion checks timed out."); + } else { + return result; } } finally { clearPAssertCount();
