Special casing job exec AssertionError in TestFlinkPipelineRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af8e9887 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af8e9887 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af8e9887 Branch: refs/heads/master Commit: af8e98878bbc8678e33a4c00548ccabf6cf55a17 Parents: 2d71af7 Author: Kenneth Knowles <[email protected]> Authored: Fri May 6 12:49:55 2016 -0700 Committer: Aljoscha Krettek <[email protected]> Committed: Fri May 20 08:08:24 2016 +0200 ---------------------------------------------------------------------- .../beam/runners/flink/TestFlinkPipelineRunner.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af8e9887/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java index 24883c8..139aebf 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; +import org.apache.flink.runtime.client.JobExecutionException; + public class TestFlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { private FlinkPipelineRunner delegate; @@ -55,7 +57,19 @@ public class TestFlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { @Override public FlinkRunnerResult run(Pipeline pipeline) { - return delegate.run(pipeline); + try { + return delegate.run(pipeline); + } catch (RuntimeException e) { + // Special case hack to pull out assertion errors from PAssert; instead there should + // probably be a better story along the lines of UserCodeException. + if (e.getCause() != null + && e.getCause() instanceof JobExecutionException + && e.getCause().getCause() instanceof AssertionError) { + throw (AssertionError) e.getCause().getCause(); + } else { + throw e; + } + } } public PipelineOptions getPipelineOptions() {
