[ 
https://issues.apache.org/jira/browse/BEAM-6008?focusedWorklogId=165533&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165533
 ]

ASF GitHub Bot logged work on BEAM-6008:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Nov/18 18:00
            Start Date: 13/Nov/18 18:00
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #6973: [BEAM-6008] 
Propagate errors through portable portable runner.
URL: https://github.com/apache/beam/pull/6973
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index 44c52f1500c..4a7216be463 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -19,6 +19,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Throwables.getStackTraceAsString;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -67,6 +68,7 @@ public static FlinkJobInvocation create(
   private final List<String> filesToStage;
   private JobState.Enum jobState;
   private List<Consumer<JobState.Enum>> stateObservers;
+  private List<Consumer<JobMessage>> messageObservers;
 
   @Nullable private ListenableFuture<PipelineResult> invocationFuture;
 
@@ -86,6 +88,7 @@ private FlinkJobInvocation(
     this.invocationFuture = null;
     this.jobState = JobState.Enum.STOPPED;
     this.stateObservers = new ArrayList<>();
+    this.messageObservers = new ArrayList<>();
   }
 
   private PipelineResult runPipeline() throws Exception {
@@ -155,6 +158,16 @@ public void onSuccess(@Nullable PipelineResult 
pipelineResult) {
           public void onFailure(Throwable throwable) {
             String message = String.format("Error during job invocation %s.", 
getId());
             LOG.error(message, throwable);
+            sendMessage(
+                JobMessage.newBuilder()
+                    .setMessageText(getStackTraceAsString(throwable))
+                    
.setImportance(JobMessage.MessageImportance.JOB_MESSAGE_DEBUG)
+                    .build());
+            sendMessage(
+                JobMessage.newBuilder()
+                    .setMessageText(throwable.toString())
+                    
.setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR)
+                    .build());
             setState(JobState.Enum.FAILED);
           }
         },
@@ -205,7 +218,7 @@ public synchronized void 
addStateListener(Consumer<JobState.Enum> stateStreamObs
 
   @Override
   public synchronized void addMessageListener(Consumer<JobMessage> 
messageStreamObserver) {
-    LOG.warn("addMessageObserver() not yet implemented.");
+    messageObservers.add(messageStreamObserver);
   }
 
   private synchronized void setState(JobState.Enum state) {
@@ -215,6 +228,12 @@ private synchronized void setState(JobState.Enum state) {
     }
   }
 
+  private synchronized void sendMessage(JobMessage message) {
+    for (Consumer<JobMessage> observer : messageObservers) {
+      observer.accept(message);
+    }
+  }
+
   /** Indicates whether the given pipeline has any unbounded PCollections. */
   private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) 
{
     checkNotNull(pipeline);
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
index 56799b41cd4..f02218c8af9 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
@@ -258,7 +258,12 @@ public void getStateStream(
       Function<JobState.Enum, GetJobStateResponse> responseFunction =
           state -> GetJobStateResponse.newBuilder().setState(state).build();
       Consumer<JobState.Enum> stateListener =
-          state -> responseObserver.onNext(responseFunction.apply(state));
+          state -> {
+            responseObserver.onNext(responseFunction.apply(state));
+            if (JobInvocation.isTerminated(state)) {
+              responseObserver.onCompleted();
+            }
+          };
       invocation.addStateListener(stateListener);
     } catch (Exception e) {
       String errMessage =
@@ -279,11 +284,15 @@ public void getMessageStream(
       StreamObserver<JobMessagesResponse> syncResponseObserver =
           SynchronizedStreamObserver.wrapping(responseObserver);
       Consumer<JobState.Enum> stateListener =
-          state ->
-              syncResponseObserver.onNext(
-                  JobMessagesResponse.newBuilder()
-                      
.setStateResponse(GetJobStateResponse.newBuilder().setState(state).build())
-                      .build());
+          state -> {
+            syncResponseObserver.onNext(
+                JobMessagesResponse.newBuilder()
+                    
.setStateResponse(GetJobStateResponse.newBuilder().setState(state).build())
+                    .build());
+            if (JobInvocation.isTerminated(state)) {
+              responseObserver.onCompleted();
+            }
+          };
       Consumer<JobMessage> messageListener =
           message ->
               syncResponseObserver.onNext(
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py 
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index 28f6aa2a287..c44a2bf0093 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -32,6 +32,7 @@
 from apache_beam.runners.portability import portable_runner
 from apache_beam.runners.portability import portable_runner_test
 from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 if __name__ == '__main__':
   # Run as
@@ -109,6 +110,19 @@ def test_no_subtransform_composite(self):
     def test_pardo_timers(self):
       raise unittest.SkipTest("BEAM-4681 - User timers not yet supported.")
 
+    def test_assert_that(self):
+      # We still want to make sure asserts fail, even if the message
+      # isn't right (BEAM-6019).
+      with self.assertRaises(Exception):
+        with self.create_pipeline() as p:
+          assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
+
+    def test_error_message_includes_stage(self):
+      raise unittest.SkipTest("BEAM-6019")
+
+    def test_error_traceback_includes_user_code(self):
+      raise unittest.SkipTest("BEAM-6019")
+
     # Inherits all other tests.
 
   # Run the tests.
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py 
b/sdks/python/apache_beam/runners/portability/local_job_service.py
index df4496a2c8a..5819832aa9a 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -208,6 +208,7 @@ def __init__(self,
 
   def add_state_change_callback(self, f):
     self._state_change_callbacks.append(f)
+    f(self.state)
 
   @property
   def log_queue(self):
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 6344f4610c7..0b539af4535 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -39,6 +39,15 @@
 
 __all__ = ['PortableRunner']
 
+MESSAGE_LOG_LEVELS = {
+    beam_job_api_pb2.JobMessage.MESSAGE_IMPORTANCE_UNSPECIFIED: logging.INFO,
+    beam_job_api_pb2.JobMessage.JOB_MESSAGE_DEBUG: logging.DEBUG,
+    beam_job_api_pb2.JobMessage.JOB_MESSAGE_DETAILED: logging.DEBUG,
+    beam_job_api_pb2.JobMessage.JOB_MESSAGE_BASIC: logging.INFO,
+    beam_job_api_pb2.JobMessage.JOB_MESSAGE_WARNING: logging.WARNING,
+    beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR: logging.ERROR,
+}
+
 TERMINAL_STATES = [
     beam_job_api_pb2.JobState.DONE,
     beam_job_api_pb2.JobState.STOPPED,
@@ -208,11 +217,32 @@ def _pipeline_state_to_runner_api_state(pipeline_state):
   def metrics(self):
     return PortableMetrics()
 
+  def _last_error_message(self):
+    # Python sort is stable.
+    ordered_messages = sorted(
+        [m.message_response for m in self._messages
+         if m.HasField('message_response')],
+        key=lambda m: m.importance)
+    if ordered_messages:
+      return ordered_messages[-1].message_text
+    else:
+      return 'unknown error'
+
   def wait_until_finish(self):
 
     def read_messages():
       for message in self._job_service.GetMessageStream(
           beam_job_api_pb2.JobMessagesRequest(job_id=self._job_id)):
+        if message.HasField('message_response'):
+          logging.log(
+              MESSAGE_LOG_LEVELS[message.message_response.importance],
+              "%s",
+              message.message_response.message_text)
+        else:
+          logging.info(
+              "Job state changed to %s",
+              self._runner_api_state_to_pipeline_state(
+                  message.state_response.state))
         self._messages.append(message)
 
     t = threading.Thread(target=read_messages, name='wait_until_finish_read')
@@ -224,8 +254,11 @@ def read_messages():
       self._state = self._runner_api_state_to_pipeline_state(
           state_response.state)
       if state_response.state in TERMINAL_STATES:
+        # Wait for any last messages.
+        t.join(10)
         break
     if self._state != runner.PipelineState.DONE:
       raise RuntimeError(
-          'Pipeline %s failed in state %s.' % (self._job_id, self._state))
+          'Pipeline %s failed in state %s: %s' % (
+              self._job_id, self._state, self._last_error_message()))
     return self._state
diff --git 
a/sdks/python/apache_beam/runners/portability/portable_runner_test.py 
b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
index 68ada95ac5c..5f178799fed 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
@@ -19,7 +19,6 @@
 
 import inspect
 import logging
-import os
 import platform
 import signal
 import socket
@@ -43,8 +42,6 @@
 from apache_beam.runners.portability import portable_runner
 from apache_beam.runners.portability.local_job_service import LocalJobServicer
 from apache_beam.runners.portability.portable_runner import PortableRunner
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
 
 
 class PortableRunnerTest(fn_api_runner_test.FnApiRunnerTest):
@@ -170,37 +167,6 @@ def get_pipeline_name():
   def create_pipeline(self):
     return beam.Pipeline(self.get_runner(), self.create_options())
 
-  def test_assert_that(self):
-    # TODO: figure out a way for runner to parse and raise the
-    # underlying exception.
-    with self.assertRaises(Exception):
-      with self.create_pipeline() as p:
-        assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
-
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test is flaky on on Python 3. '
-                   'TODO: BEAM-5692')
-  def test_error_message_includes_stage(self):
-    # TODO: figure out a way for runner to parse and raise the
-    # underlying exception.
-    with self.assertRaises(Exception):
-      with self.create_pipeline() as p:
-        def raise_error(x):
-          raise RuntimeError('x')
-        # pylint: disable=expression-not-assigned
-        (p
-         | beam.Create(['a', 'b'])
-         | 'StageA' >> beam.Map(lambda x: x)
-         | 'StageB' >> beam.Map(lambda x: x)
-         | 'StageC' >> beam.Map(raise_error)
-         | 'StageD' >> beam.Map(lambda x: x))
-
-  def test_error_traceback_includes_user_code(self):
-    # TODO: figure out a way for runner to parse and raise the
-    # underlying exception.
-    raise unittest.SkipTest('TODO')
-
   # Inherits all tests from fn_api_runner_test.FnApiRunnerTest
 
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 165533)
    Time Spent: 2.5h  (was: 2h 20m)

> Propagate errors through portable portable runner
> -------------------------------------------------
>
>                 Key: BEAM-6008
>                 URL: https://issues.apache.org/jira/browse/BEAM-6008
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Assignee: Ahmet Altay
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to