Special casing job exec AssertionError in TestFlinkPipelineRunner

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af8e9887
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af8e9887
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af8e9887

Branch: refs/heads/master
Commit: af8e98878bbc8678e33a4c00548ccabf6cf55a17
Parents: 2d71af7
Author: Kenneth Knowles <[email protected]>
Authored: Fri May 6 12:49:55 2016 -0700
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri May 20 08:08:24 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/flink/TestFlinkPipelineRunner.java | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af8e9887/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
index 24883c8..139aebf 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
+import org.apache.flink.runtime.client.JobExecutionException;
+
 public class TestFlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> 
{
 
   private FlinkPipelineRunner delegate;
@@ -55,7 +57,19 @@ public class TestFlinkPipelineRunner extends 
PipelineRunner<FlinkRunnerResult> {
 
   @Override
   public FlinkRunnerResult run(Pipeline pipeline) {
-    return delegate.run(pipeline);
+    try {
+      return delegate.run(pipeline);
+    } catch (RuntimeException e) {
+      // Special case hack to pull out assertion errors from PAssert; instead 
there should
+      // probably be a better story along the lines of UserCodeException.
+      if (e.getCause() != null
+          && e.getCause() instanceof JobExecutionException
+          && e.getCause().getCause() instanceof AssertionError) {
+          throw (AssertionError) e.getCause().getCause();
+      } else {
+        throw e;
+      }
+    }
   }
 
   public PipelineOptions getPipelineOptions() {

Reply via email to