This is an automated email from the ASF dual-hosted git repository.
ccy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bb34672 [BEAM-4648] Remove experimental Python RPC DirectRunner
(#5777)
bb34672 is described below
commit bb3467273956d67ac2058adf4c4560e6ed184b69
Author: Charles Chen <[email protected]>
AuthorDate: Wed Jul 11 22:07:51 2018 -0700
[BEAM-4648] Remove experimental Python RPC DirectRunner (#5777)
---
.../apache_beam/runners/experimental/__init__.py | 17 ----
.../experimental/python_rpc_direct/__init__.py | 24 -----
.../python_rpc_direct/python_rpc_direct_runner.py | 113 ---------------------
.../experimental/python_rpc_direct/server.py | 113 ---------------------
4 files changed, 267 deletions(-)
diff --git a/sdks/python/apache_beam/runners/experimental/__init__.py
b/sdks/python/apache_beam/runners/experimental/__init__.py
deleted file mode 100644
index f4f43cb..0000000
--- a/sdks/python/apache_beam/runners/experimental/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import absolute_import
diff --git
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
deleted file mode 100644
index 0b416aa..0000000
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""This is the experimental direct runner for testing the job api that
-sends a runner API proto over the API and then runs it on the other side.
-"""
-
-from __future__ import absolute_import
-
-from
apache_beam.runners.experimental.python_rpc_direct.python_rpc_direct_runner
import PythonRPCDirectRunner
diff --git
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
deleted file mode 100644
index 57056ab..0000000
---
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
+++ /dev/null
@@ -1,113 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A runner implementation that submits a job for remote execution.
-"""
-
-from __future__ import absolute_import
-
-import logging
-import random
-import string
-from builtins import range
-
-import grpc
-
-from apache_beam.portability.api import beam_job_api_pb2
-from apache_beam.runners.job import utils as job_utils
-from apache_beam.runners.job.manager import DockerRPCManager
-from apache_beam.runners.runner import PipelineResult
-from apache_beam.runners.runner import PipelineRunner
-
-__all__ = ['PythonRPCDirectRunner']
-
-
-class PythonRPCDirectRunner(PipelineRunner):
- """Executes a single pipeline on the local machine inside a container."""
-
- # A list of PTransformOverride objects to be applied before running a
pipeline
- # using DirectRunner.
- # Currently this only works for overrides where the input and output types do
- # not change.
- # For internal SDK use only. This should not be updated by Beam pipeline
- # authors.
- _PTRANSFORM_OVERRIDES = []
-
- def __init__(self):
- self._cache = None
-
- def run(self, pipeline):
- """Remotely executes entire pipeline or parts reachable from node."""
-
- # Performing configured PTransform overrides.
- pipeline.replace_all(PythonRPCDirectRunner._PTRANSFORM_OVERRIDES)
-
- # Start the RPC co-process
- manager = DockerRPCManager()
-
- # Submit the job to the RPC co-process
- jobName = ('Job-' +
- ''.join(random.choice(string.ascii_uppercase) for _ in
range(6)))
- options = {k: v for k, v in pipeline._options.get_all_options().items()
- if v is not None}
-
- try:
- response = manager.service.run(beam_job_api_pb2.SubmitJobRequest(
- pipeline=pipeline.to_runner_api(),
- pipelineOptions=job_utils.dict_to_struct(options),
- jobName=jobName))
-
- logging.info('Submitted a job with id: %s', response.jobId)
-
- # Return the result object that references the manager instance
- result = PythonRPCDirectPipelineResult(response.jobId, manager)
- return result
- except grpc.RpcError:
- logging.error('Failed to run the job with name: %s', jobName)
- raise
-
-
-class PythonRPCDirectPipelineResult(PipelineResult):
- """Represents the state of a pipeline run on the Dataflow service."""
-
- def __init__(self, job_id, job_manager):
- self.job_id = job_id
- self.manager = job_manager
-
- @property
- def state(self):
- return self.manager.service.getState(
- beam_job_api_pb2.GetJobStateRequest(jobId=self.job_id))
-
- def wait_until_finish(self):
- messages_request = beam_job_api_pb2.JobMessagesRequest(jobId=self.job_id)
- for message in self.manager.service.getMessageStream(messages_request):
- if message.HasField('stateResponse'):
- logging.info(
- 'Current state of job: %s',
- beam_job_api_pb2.JobState.Enum.Name(
- message.stateResponse.state))
- else:
- logging.info('Message %s', message.messageResponse)
- logging.info('Job with id: %s in terminal state now.', self.job_id)
-
- def cancel(self):
- return self.manager.service.cancel(
- beam_job_api_pb2.CancelJobRequest(jobId=self.job_id))
-
- def metrics(self):
- raise NotImplementedError
diff --git
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
deleted file mode 100644
index a0397c6..0000000
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
+++ /dev/null
@@ -1,113 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A runner implementation that submits a job for remote execution.
-"""
-from __future__ import absolute_import
-
-import time
-import uuid
-from concurrent import futures
-
-import grpc
-
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.pipeline import Pipeline
-from apache_beam.portability.api import beam_job_api_pb2
-from apache_beam.portability.api import beam_job_api_pb2_grpc
-from apache_beam.runners.runner import PipelineState
-
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
-
-
-class JobService(beam_job_api_pb2_grpc.JobServiceServicer):
-
- def __init__(self):
- self.jobs = {}
-
- def run(self, request, context):
- job_id = uuid.uuid4().get_hex()
- pipeline_result = Pipeline.from_runner_api(
- request.pipeline,
- 'DirectRunner',
- PipelineOptions()).run()
- self.jobs[job_id] = pipeline_result
- return beam_job_api_pb2.SubmitJobResponse(jobId=job_id)
-
- def getState(self, request, context):
- pipeline_result = self.jobs[request.jobId]
- return beam_job_api_pb2.GetJobStateResponse(
- state=self._map_state_to_jobState(pipeline_result.state))
-
- def cancel(self, request, context):
- pipeline_result = self.jobs[request.jobId]
- pipeline_result.cancel()
- return beam_job_api_pb2.CancelJobResponse(
- state=self._map_state_to_jobState(pipeline_result.state))
-
- def getMessageStream(self, request, context):
- pipeline_result = self.jobs[request.jobId]
- pipeline_result.wait_until_finish()
- yield beam_job_api_pb2.JobMessagesResponse(
- stateResponse=beam_job_api_pb2.GetJobStateResponse(
- state=self._map_state_to_jobState(pipeline_result.state)))
-
- def getStateStream(self, request, context):
- context.set_details('Not Implemented for direct runner!')
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- return
-
- @staticmethod
- def _map_state_to_jobState(state):
- if state == PipelineState.UNKNOWN:
- return beam_job_api_pb2.JobState.UNSPECIFIED
- elif state == PipelineState.STOPPED:
- return beam_job_api_pb2.JobState.STOPPED
- elif state == PipelineState.RUNNING:
- return beam_job_api_pb2.JobState.RUNNING
- elif state == PipelineState.DONE:
- return beam_job_api_pb2.JobState.DONE
- elif state == PipelineState.FAILED:
- return beam_job_api_pb2.JobState.FAILED
- elif state == PipelineState.CANCELLED:
- return beam_job_api_pb2.JobState.CANCELLED
- elif state == PipelineState.UPDATED:
- return beam_job_api_pb2.JobState.UPDATED
- elif state == PipelineState.DRAINING:
- return beam_job_api_pb2.JobState.DRAINING
- elif state == PipelineState.DRAINED:
- return beam_job_api_pb2.JobState.DRAINED
- else:
- raise ValueError('Unknown pipeline state')
-
-
-def serve():
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(JobService(), server)
-
- server.add_insecure_port('[::]:50051')
- server.start()
-
- try:
- while True:
- time.sleep(_ONE_DAY_IN_SECONDS)
- except KeyboardInterrupt:
- server.stop(0)
-
-
-if __name__ == '__main__':
- serve()