This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 7ac0449 Clean up Python Portability local_job_service. new 0842f4c Merge pull request #7433 from [BEAM-6280] Refactors Python runner 7ac0449 is described below commit 7ac0449804e72379f75f4bd363a0fbdd80935311 Author: Sam Rohde <rohde.sam...@gmail.com> AuthorDate: Mon Jan 7 14:47:40 2019 -0800 Clean up Python Portability local_job_service. pr changes convert to api response in service change back to queue implementation preparation_id != job_id address robert's comments fail fast in get streams remove previous logic for stream and added TODO --- .../runners/portability/local_job_service.py | 126 +++++++++++---------- .../runners/portability/portable_runner.py | 47 +++++--- 2 files changed, 95 insertions(+), 78 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py index 73c017d..060908d 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service.py @@ -45,12 +45,10 @@ TERMINAL_STATES = [ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer): - """ + """Manages one or more pipelines, possibly concurrently. Experimental: No backward compatibility guaranteed. Servicer for the Beam Job API. - Manages one or more pipelines, possibly concurrently. - This JobService uses a basic local implementation of runner to run the job. This JobService is not capable of managing job on remote clusters. @@ -101,43 +99,29 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer): state=self._jobs[request.job_id].state) def GetStateStream(self, request, context=None): + """Yields state transitions since the stream started. + """ + if request.job_id not in self._jobs: + raise LookupError("Job {} does not exist".format(request.job_id)) + job = self._jobs[request.job_id] - state_queue = queue.Queue() - job.add_state_change_callback(state_queue.put) - try: - current_state = state_queue.get() - except queue.Empty: - current_state = job.state - yield beam_job_api_pb2.GetJobStateResponse(state=current_state) - while current_state not in TERMINAL_STATES: - current_state = state_queue.get(block=True) - yield beam_job_api_pb2.GetJobStateResponse(state=current_state) + for state in job.get_state_stream(): + yield beam_job_api_pb2.GetJobStateResponse(state=state) def GetMessageStream(self, request, context=None): + """Yields messages since the stream started. + """ + if request.job_id not in self._jobs: + raise LookupError("Job {} does not exist".format(request.job_id)) + job = self._jobs[request.job_id] - log_queue = queue.Queue() - if job._last_log_message: - # This is likely to contain important information, like errors for - # an already failed job. - # TODO: Decide on proper semantics for the message stream of a - # long-running or completed job. - yield job._last_log_message - job.add_log_callback(log_queue.put) - job.add_state_change_callback(lambda state: log_queue.put( - beam_job_api_pb2.JobMessagesResponse( - state_response=beam_job_api_pb2.GetJobStateResponse( - state=state)))) - current_state = job.state - while current_state not in TERMINAL_STATES: - msg = log_queue.get(block=True) - yield msg - if msg.HasField('state_response'): - current_state = msg.state_response.state - try: - while True: - yield log_queue.get(block=False) - except queue.Empty: - pass + for msg in job.get_message_stream(): + if isinstance(msg, int): + resp = beam_job_api_pb2.JobMessagesResponse( + state_response=beam_job_api_pb2.GetJobStateResponse(state=msg)) + else: + resp = beam_job_api_pb2.JobMessagesResponse(message_response=msg) + yield resp class SubprocessSdkWorker(object): @@ -194,38 +178,31 @@ class BeamJob(threading.Thread): self._pipeline_options = pipeline_options self._pipeline_proto = pipeline_proto self._state = None - self._state_change_callbacks = [] - self._last_log_message = None - self._log_callbacks = [lambda msg: setattr(self, '_last_log_message', msg)] + self._state_queues = [] + self._log_queues = [] self.state = beam_job_api_pb2.JobState.STARTING self.daemon = True - def add_state_change_callback(self, f): - self._state_change_callbacks.append(f) - f(self.state) - - def add_log_callback(self, f): - self._log_callbacks.append(f) - @property def state(self): return self._state @state.setter def state(self, new_state): - for state_change_callback in self._state_change_callbacks: - state_change_callback(new_state) + # Inform consumers of the new state. + for queue in self._state_queues: + queue.put(new_state) self._state = new_state def run(self): - with JobLogHandler(self._log_callbacks): + with JobLogHandler(self._log_queues): try: fn_api_runner.FnApiRunner().run_via_runner_api(self._pipeline_proto) logging.info('Successfully completed job.') self.state = beam_job_api_pb2.JobState.DONE except: # pylint: disable=bare-except logging.exception('Error running pipeline.') - traceback.print_exc() + logging.exception(traceback) self.state = beam_job_api_pb2.JobState.FAILED raise @@ -235,6 +212,32 @@ class BeamJob(threading.Thread): # TODO(robertwb): Actually cancel... self.state = beam_job_api_pb2.JobState.CANCELLED + def get_state_stream(self): + # Register for any new state changes. + state_queue = queue.Queue() + self._state_queues.append(state_queue) + + yield self.state + while True: + current_state = state_queue.get(block=True) + yield current_state + if current_state in TERMINAL_STATES: + break + + def get_message_stream(self): + # Register for any new messages. + log_queue = queue.Queue() + self._log_queues.append(log_queue) + self._state_queues.append(log_queue) + + current_state = self.state + yield current_state + while current_state not in TERMINAL_STATES: + msg = log_queue.get(block=True) + yield msg + if isinstance(msg, int): + current_state = msg + class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer): @@ -260,11 +263,11 @@ class JobLogHandler(logging.Handler): logging.DEBUG: beam_job_api_pb2.JobMessage.JOB_MESSAGE_DEBUG, } - def __init__(self, log_callbacks): + def __init__(self, log_queues): super(JobLogHandler, self).__init__() self._last_id = 0 self._logged_thread = None - self._log_callbacks = log_callbacks + self._log_queues = log_queues def __enter__(self): # Remember the current thread to demultiplex the logs of concurrently @@ -282,12 +285,13 @@ class JobLogHandler(logging.Handler): def emit(self, record): if self._logged_thread is threading.current_thread(): - msg = beam_job_api_pb2.JobMessagesResponse( - message_response=beam_job_api_pb2.JobMessage( - message_id=self._next_id(), - time=time.strftime('%Y-%m-%d %H:%M:%S.', - time.localtime(record.created)), - importance=self.LOG_LEVEL_MAP[record.levelno], - message_text=self.format(record))) - for callback in self._log_callbacks: - callback(msg) + msg = beam_job_api_pb2.JobMessage( + message_id=self._next_id(), + time=time.strftime('%Y-%m-%d %H:%M:%S.', + time.localtime(record.created)), + importance=self.LOG_LEVEL_MAP[record.levelno], + message_text=self.format(record)) + + # Inform all message consumers. + for queue in self._log_queues: + queue.put(msg) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index d2bf31b..4683ed2 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -44,7 +44,6 @@ from apache_beam.runners.portability import portable_stager from apache_beam.runners.portability.job_server import DockerizedJobServer from apache_beam.runners.worker import sdk_worker from apache_beam.runners.worker import sdk_worker_main -from apache_beam.runners.worker.channel_factory import GRPCChannelFactory __all__ = ['PortableRunner'] @@ -189,7 +188,7 @@ class PortableRunner(runner.PipelineRunner): for k, v in options.get_all_options().items() if v is not None} - channel = GRPCChannelFactory.insecure_channel(job_endpoint) + channel = grpc.insecure_channel(job_endpoint) grpc.channel_ready_future(channel).result() job_service = beam_job_api_pb2_grpc.JobServiceStub(channel) @@ -213,19 +212,31 @@ class PortableRunner(runner.PipelineRunner): prepare_response = send_prepare_request() if prepare_response.artifact_staging_endpoint.url: stager = portable_stager.PortableStager( - GRPCChannelFactory.insecure_channel( - prepare_response.artifact_staging_endpoint.url), + grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url), prepare_response.staging_session_token) retrieval_token, _ = stager.stage_job_resources( options, staging_location='') else: retrieval_token = None + + # Run the job and wait for a result. run_response = job_service.Run( beam_job_api_pb2.RunJobRequest( preparation_id=prepare_response.preparation_id, retrieval_token=retrieval_token)) - return PipelineResult(job_service, run_response.job_id, cleanup_callbacks) + + # TODO(BEAM-6442): remove preparation_id and move getting streams to before + # starting the job. + state_stream = job_service.GetStateStream( + beam_job_api_pb2.GetJobStateRequest( + job_id=run_response.job_id)) + message_stream = job_service.GetMessageStream( + beam_job_api_pb2.JobMessagesRequest( + job_id=run_response.job_id)) + + return PipelineResult(job_service, run_response.job_id, message_stream, + state_stream, cleanup_callbacks) class PortableMetrics(metrics.metric.MetricResults): @@ -240,11 +251,14 @@ class PortableMetrics(metrics.metric.MetricResults): class PipelineResult(runner.PipelineResult): - def __init__(self, job_service, job_id, cleanup_callbacks=()): + def __init__(self, job_service, job_id, message_stream, state_stream, + cleanup_callbacks=()): super(PipelineResult, self).__init__(beam_job_api_pb2.JobState.UNSPECIFIED) self._job_service = job_service self._job_id = job_id self._messages = [] + self._message_stream = message_stream + self._state_stream = state_stream self._cleanup_callbacks = cleanup_callbacks def cancel(self): @@ -274,21 +288,21 @@ class PipelineResult(runner.PipelineResult): return PortableMetrics() def _last_error_message(self): - # Python sort is stable. - ordered_messages = sorted( - [m.message_response for m in self._messages - if m.HasField('message_response')], - key=lambda m: m.importance) - if ordered_messages: - return ordered_messages[-1].message_text + # Filter only messages with the "message_response" and error messages. + messages = [m.message_response for m in self._messages + if m.HasField('message_response')] + error_messages = [m for m in messages + if m.importance == + beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR] + if error_messages: + return error_messages[-1].message_text else: return 'unknown error' def wait_until_finish(self): def read_messages(): - for message in self._job_service.GetMessageStream( - beam_job_api_pb2.JobMessagesRequest(job_id=self._job_id)): + for message in self._message_stream: if message.HasField('message_response'): logging.log( MESSAGE_LOG_LEVELS[message.message_response.importance], @@ -306,8 +320,7 @@ class PipelineResult(runner.PipelineResult): t.start() try: - for state_response in self._job_service.GetStateStream( - beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)): + for state_response in self._state_stream: self._state = self._runner_api_state_to_pipeline_state( state_response.state) if state_response.state in TERMINAL_STATES: