This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 0c195e4 [BEAM-6019] Report root cause of failure in Flink portable pipelines new aa70fac Merge pull request #7716: [BEAM-6019] Report root cause of failure in Flink portable pipelines 0c195e4 is described below commit 0c195e4ceeb78529b341dccd3c8868f79b2c5aa4 Author: Maximilian Michels <m...@apache.org> AuthorDate: Mon Feb 4 12:49:22 2019 +0100 [BEAM-6019] Report root cause of failure in Flink portable pipelines The `JobInvocation` only reported the top-most message in the stack trace. This changes the reporting to pass on the root cause instead. Unblocks several Python PVR tests which rely on exception reporting. --- .../org/apache/beam/runners/flink/FlinkJobInvocation.java | 3 ++- .../apache_beam/runners/portability/flink_runner_test.py | 14 -------------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java index d3e95dd..bd0b27f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.flink; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getRootCause; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getStackTraceAsString; import java.io.IOException; @@ -213,7 +214,7 @@ public class FlinkJobInvocation implements JobInvocation { .build()); sendMessage( JobMessage.newBuilder() - .setMessageText(throwable.toString()) + .setMessageText(getRootCause(throwable).toString()) .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR) .build()); setState(JobState.Enum.FAILED); diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 2ae23d4..958bb8e 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -35,7 +35,6 @@ from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners.portability import portable_runner from apache_beam.runners.portability import portable_runner_test from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to if __name__ == '__main__': # Run as @@ -153,19 +152,6 @@ if __name__ == '__main__': def test_no_subtransform_composite(self): raise unittest.SkipTest("BEAM-4781") - def test_assert_that(self): - # We still want to make sure asserts fail, even if the message - # isn't right (BEAM-6019). - with self.assertRaises(Exception): - with self.create_pipeline() as p: - assert_that(p | beam.Create(['a', 'b']), equal_to(['a'])) - - def test_error_message_includes_stage(self): - raise unittest.SkipTest("BEAM-6019") - - def test_error_traceback_includes_user_code(self): - raise unittest.SkipTest("BEAM-6019") - def test_flattened_side_input(self): # Blocked on support for transcoding # https://jira.apache.org/jira/browse/BEAM-6523