Repository: beam Updated Branches: refs/heads/master 9ed2cf41f -> fb85d84dc
[BEAM-2431] Add experimental python rpc direct runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/76db0aa3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/76db0aa3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/76db0aa3 Branch: refs/heads/master Commit: 76db0aa30c632296a6a882c012f9da2d21f775b5 Parents: 9ed2cf4 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Wed Aug 2 10:49:48 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Tue Aug 8 23:12:30 2017 -0700 ---------------------------------------------------------------------- .../runners/experimental/__init__.py | 16 +++ .../experimental/python_rpc_direct/__init__.py | 22 ++++ .../python_rpc_direct_runner.py | 111 +++++++++++++++++++ .../experimental/python_rpc_direct/server.py | 111 +++++++++++++++++++ sdks/python/apache_beam/runners/job/__init__.py | 16 +++ sdks/python/apache_beam/runners/job/manager.py | 52 +++++++++ sdks/python/apache_beam/runners/job/utils.py | 32 ++++++ sdks/python/apache_beam/runners/runner.py | 6 + 8 files changed, 366 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/experimental/__init__.py b/sdks/python/apache_beam/runners/experimental/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/runners/experimental/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py new file mode 100644 index 0000000..5d14030 --- /dev/null +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""This is the experimental direct runner for testing the job api that +sends a runner API proto over the API and then runs it on the other side. +""" + +from apache_beam.runners.experimental.python_rpc_direct.python_rpc_direct_runner import PythonRPCDirectRunner http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py new file mode 100644 index 0000000..247ce1f --- /dev/null +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A runner implementation that submits a job for remote execution. +""" + +import logging +import random +import string + +import grpc + +from apache_beam.portability.api import beam_job_api_pb2 +from apache_beam.runners.job import utils as job_utils +from apache_beam.runners.job.manager import DockerRPCManager +from apache_beam.runners.runner import PipelineResult +from apache_beam.runners.runner import PipelineRunner + + +__all__ = ['PythonRPCDirectRunner'] + + +class PythonRPCDirectRunner(PipelineRunner): + """Executes a single pipeline on the local machine inside a container.""" + + # A list of PTransformOverride objects to be applied before running a pipeline + # using DirectRunner. + # Currently this only works for overrides where the input and output types do + # not change. + # For internal SDK use only. This should not be updated by Beam pipeline + # authors. + _PTRANSFORM_OVERRIDES = [] + + def __init__(self): + self._cache = None + + def run(self, pipeline): + """Remotely executes entire pipeline or parts reachable from node.""" + + # Performing configured PTransform overrides. + pipeline.replace_all(PythonRPCDirectRunner._PTRANSFORM_OVERRIDES) + + # Start the RPC co-process + manager = DockerRPCManager() + + # Submit the job to the RPC co-process + jobName = ('Job-' + + ''.join(random.choice(string.ascii_uppercase) for _ in range(6))) + options = {k: v for k, v in pipeline._options.get_all_options().iteritems() + if v is not None} + + try: + response = manager.service.run(beam_job_api_pb2.SubmitJobRequest( + pipeline=pipeline.to_runner_api(), + pipelineOptions=job_utils.dict_to_struct(options), + jobName=jobName)) + + logging.info('Submitted a job with id: %s', response.jobId) + + # Return the result object that references the manager instance + result = PythonRPCDirectPipelineResult(response.jobId, manager) + return result + except grpc.RpcError: + logging.error('Failed to run the job with name: %s', jobName) + raise + + +class PythonRPCDirectPipelineResult(PipelineResult): + """Represents the state of a pipeline run on the Dataflow service.""" + + def __init__(self, job_id, job_manager): + self.job_id = job_id + self.manager = job_manager + + @property + def state(self): + return self.manager.service.getState( + beam_job_api_pb2.GetJobStateRequest(jobId=self.job_id)) + + def wait_until_finish(self): + messages_request = beam_job_api_pb2.JobMessagesRequest(jobId=self.job_id) + for message in self.manager.service.getMessageStream(messages_request): + if message.HasField('stateResponse'): + logging.info( + 'Current state of job: %s', + beam_job_api_pb2.JobState.JobStateType.Name( + message.stateResponse.state)) + else: + logging.info('Message %s', message.messageResponse) + logging.info('Job with id: %s in terminal state now.', self.job_id) + + def cancel(self): + return self.manager.service.cancel( + beam_job_api_pb2.CancelJobRequest(jobId=self.job_id)) + + def metrics(self): + raise NotImplementedError http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py new file mode 100644 index 0000000..3addf92 --- /dev/null +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A runner implementation that submits a job for remote execution. +""" +from concurrent import futures +import time +import uuid + +import grpc + +from apache_beam.portability.api import beam_job_api_pb2 +from apache_beam.portability.api import beam_job_api_pb2_grpc +from apache_beam.pipeline import Pipeline +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.runners.runner import PipelineState + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 + + +class JobService(beam_job_api_pb2_grpc.JobServiceServicer): + + def __init__(self): + self.jobs = {} + + def run(self, request, context): + job_id = uuid.uuid4().get_hex() + pipeline_result = Pipeline.from_runner_api( + request.pipeline, + 'DirectRunner', + PipelineOptions()).run() + self.jobs[job_id] = pipeline_result + return beam_job_api_pb2.SubmitJobResponse(jobId=job_id) + + def getState(self, request, context): + pipeline_result = self.jobs[request.jobId] + return beam_job_api_pb2.GetJobStateResponse( + state=self._map_state_to_jobState(pipeline_result.state)) + + def cancel(self, request, context): + pipeline_result = self.jobs[request.jobId] + pipeline_result.cancel() + return beam_job_api_pb2.CancelJobResponse( + state=self._map_state_to_jobState(pipeline_result.state)) + + def getMessageStream(self, request, context): + pipeline_result = self.jobs[request.jobId] + pipeline_result.wait_until_finish() + yield beam_job_api_pb2.JobMessagesResponse( + stateResponse=beam_job_api_pb2.GetJobStateResponse( + state=self._map_state_to_jobState(pipeline_result.state))) + + def getStateStream(self, request, context): + context.set_details('Not Implemented for direct runner!') + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + return + + @staticmethod + def _map_state_to_jobState(state): + if state == PipelineState.UNKNOWN: + return beam_job_api_pb2.JobState.UNKNOWN + elif state == PipelineState.STOPPED: + return beam_job_api_pb2.JobState.STOPPED + elif state == PipelineState.RUNNING: + return beam_job_api_pb2.JobState.RUNNING + elif state == PipelineState.DONE: + return beam_job_api_pb2.JobState.DONE + elif state == PipelineState.FAILED: + return beam_job_api_pb2.JobState.FAILED + elif state == PipelineState.CANCELLED: + return beam_job_api_pb2.JobState.CANCELLED + elif state == PipelineState.UPDATED: + return beam_job_api_pb2.JobState.UPDATED + elif state == PipelineState.DRAINING: + return beam_job_api_pb2.JobState.DRAINING + elif state == PipelineState.DRAINED: + return beam_job_api_pb2.JobState.DRAINED + else: + raise ValueError('Unknown pipeline state') + + +def serve(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(JobService(), server) + + server.add_insecure_port('[::]:50051') + server.start() + + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(0) + + +if __name__ == '__main__': + serve() http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/job/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/job/__init__.py b/sdks/python/apache_beam/runners/job/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/runners/job/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/job/manager.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/job/manager.py b/sdks/python/apache_beam/runners/job/manager.py new file mode 100644 index 0000000..4d88a11 --- /dev/null +++ b/sdks/python/apache_beam/runners/job/manager.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A object to control to the Job API Co-Process +""" + +import logging +import subprocess +import time + +import grpc + +from apache_beam.portability.api import beam_job_api_pb2_grpc + + +class DockerRPCManager(object): + """A native co-process to start a contianer that speaks the JobApi + """ + def __init__(self, run_command=None): + # TODO(BEAM-2431): Change this to a docker container from a command. + self.process = subprocess.Popen( + ['python', + '-m', + 'apache_beam.runners.experimental.python_rpc_direct.server']) + + self.channel = grpc.insecure_channel('localhost:50051') + self.service = beam_job_api_pb2_grpc.JobServiceStub(self.channel) + + # Sleep for 2 seconds for process to start completely + # This is just for the co-process and would be removed + # once we migrate to docker. + time.sleep(2) + + def __del__(self): + """Terminate the co-process when the manager is GC'ed + """ + logging.info('Shutting the co-process') + self.process.terminate() http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/job/utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/job/utils.py b/sdks/python/apache_beam/runners/job/utils.py new file mode 100644 index 0000000..84c727f --- /dev/null +++ b/sdks/python/apache_beam/runners/job/utils.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utility functions for efficiently processing with the job API +""" + +import json + +from google.protobuf import json_format +from google.protobuf import struct_pb2 + + +def dict_to_struct(dict_obj): + return json_format.Parse(json.dumps(dict_obj), struct_pb2.Struct()) + + +def struct_to_dict(struct_obj): + return json.loads(json_format.MessageToJson(struct_obj)) http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index af00d8f..7ce9a03 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -41,7 +41,11 @@ _DIRECT_RUNNER_PATH = 'apache_beam.runners.direct.direct_runner.' _DATAFLOW_RUNNER_PATH = ( 'apache_beam.runners.dataflow.dataflow_runner.') _TEST_RUNNER_PATH = 'apache_beam.runners.test.' +_PYTHON_RPC_DIRECT_RUNNER = ( + 'apache_beam.runners.experimental.python_rpc_direct.' + 'python_rpc_direct_runner.') +_KNOWN_PYTHON_RPC_DIRECT_RUNNER = ('PythonRPCDirectRunner',) _KNOWN_DIRECT_RUNNERS = ('DirectRunner', 'EagerRunner') _KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner',) _KNOWN_TEST_RUNNERS = ('TestDataflowRunner',) @@ -51,6 +55,8 @@ _RUNNER_MAP.update(_get_runner_map(_KNOWN_DIRECT_RUNNERS, _DIRECT_RUNNER_PATH)) _RUNNER_MAP.update(_get_runner_map(_KNOWN_DATAFLOW_RUNNERS, _DATAFLOW_RUNNER_PATH)) +_RUNNER_MAP.update(_get_runner_map(_KNOWN_PYTHON_RPC_DIRECT_RUNNER, + _PYTHON_RPC_DIRECT_RUNNER)) _RUNNER_MAP.update(_get_runner_map(_KNOWN_TEST_RUNNERS, _TEST_RUNNER_PATH))