This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c195e4  [BEAM-6019] Report root cause of failure in Flink portable 
pipelines
     new aa70fac  Merge pull request #7716: [BEAM-6019] Report root cause of 
failure in Flink portable pipelines
0c195e4 is described below

commit 0c195e4ceeb78529b341dccd3c8868f79b2c5aa4
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Mon Feb 4 12:49:22 2019 +0100

    [BEAM-6019] Report root cause of failure in Flink portable pipelines
    
    The `JobInvocation` only reported the top-most message in the stack trace. 
This
    changes the reporting to pass on the root cause instead.
    
    Unblocks several Python PVR tests which rely on exception reporting.
---
 .../org/apache/beam/runners/flink/FlinkJobInvocation.java  |  3 ++-
 .../apache_beam/runners/portability/flink_runner_test.py   | 14 --------------
 2 files changed, 2 insertions(+), 15 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index d3e95dd..bd0b27f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink;
 
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getRootCause;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getStackTraceAsString;
 
 import java.io.IOException;
@@ -213,7 +214,7 @@ public class FlinkJobInvocation implements JobInvocation {
                     .build());
             sendMessage(
                 JobMessage.newBuilder()
-                    .setMessageText(throwable.toString())
+                    .setMessageText(getRootCause(throwable).toString())
                     
.setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR)
                     .build());
             setState(JobState.Enum.FAILED);
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py 
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index 2ae23d4..958bb8e 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -35,7 +35,6 @@ from apache_beam.options.pipeline_options import 
StandardOptions
 from apache_beam.runners.portability import portable_runner
 from apache_beam.runners.portability import portable_runner_test
 from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
 
 if __name__ == '__main__':
   # Run as
@@ -153,19 +152,6 @@ if __name__ == '__main__':
     def test_no_subtransform_composite(self):
       raise unittest.SkipTest("BEAM-4781")
 
-    def test_assert_that(self):
-      # We still want to make sure asserts fail, even if the message
-      # isn't right (BEAM-6019).
-      with self.assertRaises(Exception):
-        with self.create_pipeline() as p:
-          assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
-
-    def test_error_message_includes_stage(self):
-      raise unittest.SkipTest("BEAM-6019")
-
-    def test_error_traceback_includes_user_code(self):
-      raise unittest.SkipTest("BEAM-6019")
-
     def test_flattened_side_input(self):
       # Blocked on support for transcoding
       # https://jira.apache.org/jira/browse/BEAM-6523

Reply via email to