This is an automated email from the ASF dual-hosted git repository.

ccy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bb34672  [BEAM-4648] Remove experimental Python RPC DirectRunner 
(#5777)
bb34672 is described below

commit bb3467273956d67ac2058adf4c4560e6ed184b69
Author: Charles Chen <[email protected]>
AuthorDate: Wed Jul 11 22:07:51 2018 -0700

    [BEAM-4648] Remove experimental Python RPC DirectRunner (#5777)
---
 .../apache_beam/runners/experimental/__init__.py   |  17 ----
 .../experimental/python_rpc_direct/__init__.py     |  24 -----
 .../python_rpc_direct/python_rpc_direct_runner.py  | 113 ---------------------
 .../experimental/python_rpc_direct/server.py       | 113 ---------------------
 4 files changed, 267 deletions(-)

diff --git a/sdks/python/apache_beam/runners/experimental/__init__.py 
b/sdks/python/apache_beam/runners/experimental/__init__.py
deleted file mode 100644
index f4f43cb..0000000
--- a/sdks/python/apache_beam/runners/experimental/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import absolute_import
diff --git 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
deleted file mode 100644
index 0b416aa..0000000
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""This is the experimental direct runner for testing the job api that
-sends a runner API proto over the API and then runs it on the other side.
-"""
-
-from __future__ import absolute_import
-
-from 
apache_beam.runners.experimental.python_rpc_direct.python_rpc_direct_runner 
import PythonRPCDirectRunner
diff --git 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
deleted file mode 100644
index 57056ab..0000000
--- 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
+++ /dev/null
@@ -1,113 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A runner implementation that submits a job for remote execution.
-"""
-
-from __future__ import absolute_import
-
-import logging
-import random
-import string
-from builtins import range
-
-import grpc
-
-from apache_beam.portability.api import beam_job_api_pb2
-from apache_beam.runners.job import utils as job_utils
-from apache_beam.runners.job.manager import DockerRPCManager
-from apache_beam.runners.runner import PipelineResult
-from apache_beam.runners.runner import PipelineRunner
-
-__all__ = ['PythonRPCDirectRunner']
-
-
-class PythonRPCDirectRunner(PipelineRunner):
-  """Executes a single pipeline on the local machine inside a container."""
-
-  # A list of PTransformOverride objects to be applied before running a 
pipeline
-  # using DirectRunner.
-  # Currently this only works for overrides where the input and output types do
-  # not change.
-  # For internal SDK use only. This should not be updated by Beam pipeline
-  # authors.
-  _PTRANSFORM_OVERRIDES = []
-
-  def __init__(self):
-    self._cache = None
-
-  def run(self, pipeline):
-    """Remotely executes entire pipeline or parts reachable from node."""
-
-    # Performing configured PTransform overrides.
-    pipeline.replace_all(PythonRPCDirectRunner._PTRANSFORM_OVERRIDES)
-
-    # Start the RPC co-process
-    manager = DockerRPCManager()
-
-    # Submit the job to the RPC co-process
-    jobName = ('Job-' +
-               ''.join(random.choice(string.ascii_uppercase) for _ in 
range(6)))
-    options = {k: v for k, v in pipeline._options.get_all_options().items()
-               if v is not None}
-
-    try:
-      response = manager.service.run(beam_job_api_pb2.SubmitJobRequest(
-          pipeline=pipeline.to_runner_api(),
-          pipelineOptions=job_utils.dict_to_struct(options),
-          jobName=jobName))
-
-      logging.info('Submitted a job with id: %s', response.jobId)
-
-      # Return the result object that references the manager instance
-      result = PythonRPCDirectPipelineResult(response.jobId, manager)
-      return result
-    except grpc.RpcError:
-      logging.error('Failed to run the job with name: %s', jobName)
-      raise
-
-
-class PythonRPCDirectPipelineResult(PipelineResult):
-  """Represents the state of a pipeline run on the Dataflow service."""
-
-  def __init__(self, job_id, job_manager):
-    self.job_id = job_id
-    self.manager = job_manager
-
-  @property
-  def state(self):
-    return self.manager.service.getState(
-        beam_job_api_pb2.GetJobStateRequest(jobId=self.job_id))
-
-  def wait_until_finish(self):
-    messages_request = beam_job_api_pb2.JobMessagesRequest(jobId=self.job_id)
-    for message in self.manager.service.getMessageStream(messages_request):
-      if message.HasField('stateResponse'):
-        logging.info(
-            'Current state of job: %s',
-            beam_job_api_pb2.JobState.Enum.Name(
-                message.stateResponse.state))
-      else:
-        logging.info('Message %s', message.messageResponse)
-    logging.info('Job with id: %s in terminal state now.', self.job_id)
-
-  def cancel(self):
-    return self.manager.service.cancel(
-        beam_job_api_pb2.CancelJobRequest(jobId=self.job_id))
-
-  def metrics(self):
-    raise NotImplementedError
diff --git 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
deleted file mode 100644
index a0397c6..0000000
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
+++ /dev/null
@@ -1,113 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A runner implementation that submits a job for remote execution.
-"""
-from __future__ import absolute_import
-
-import time
-import uuid
-from concurrent import futures
-
-import grpc
-
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.pipeline import Pipeline
-from apache_beam.portability.api import beam_job_api_pb2
-from apache_beam.portability.api import beam_job_api_pb2_grpc
-from apache_beam.runners.runner import PipelineState
-
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
-
-
-class JobService(beam_job_api_pb2_grpc.JobServiceServicer):
-
-  def __init__(self):
-    self.jobs = {}
-
-  def run(self, request, context):
-    job_id = uuid.uuid4().get_hex()
-    pipeline_result = Pipeline.from_runner_api(
-        request.pipeline,
-        'DirectRunner',
-        PipelineOptions()).run()
-    self.jobs[job_id] = pipeline_result
-    return beam_job_api_pb2.SubmitJobResponse(jobId=job_id)
-
-  def getState(self, request, context):
-    pipeline_result = self.jobs[request.jobId]
-    return beam_job_api_pb2.GetJobStateResponse(
-        state=self._map_state_to_jobState(pipeline_result.state))
-
-  def cancel(self, request, context):
-    pipeline_result = self.jobs[request.jobId]
-    pipeline_result.cancel()
-    return beam_job_api_pb2.CancelJobResponse(
-        state=self._map_state_to_jobState(pipeline_result.state))
-
-  def getMessageStream(self, request, context):
-    pipeline_result = self.jobs[request.jobId]
-    pipeline_result.wait_until_finish()
-    yield beam_job_api_pb2.JobMessagesResponse(
-        stateResponse=beam_job_api_pb2.GetJobStateResponse(
-            state=self._map_state_to_jobState(pipeline_result.state)))
-
-  def getStateStream(self, request, context):
-    context.set_details('Not Implemented for direct runner!')
-    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-    return
-
-  @staticmethod
-  def _map_state_to_jobState(state):
-    if state == PipelineState.UNKNOWN:
-      return beam_job_api_pb2.JobState.UNSPECIFIED
-    elif state == PipelineState.STOPPED:
-      return beam_job_api_pb2.JobState.STOPPED
-    elif state == PipelineState.RUNNING:
-      return beam_job_api_pb2.JobState.RUNNING
-    elif state == PipelineState.DONE:
-      return beam_job_api_pb2.JobState.DONE
-    elif state == PipelineState.FAILED:
-      return beam_job_api_pb2.JobState.FAILED
-    elif state == PipelineState.CANCELLED:
-      return beam_job_api_pb2.JobState.CANCELLED
-    elif state == PipelineState.UPDATED:
-      return beam_job_api_pb2.JobState.UPDATED
-    elif state == PipelineState.DRAINING:
-      return beam_job_api_pb2.JobState.DRAINING
-    elif state == PipelineState.DRAINED:
-      return beam_job_api_pb2.JobState.DRAINED
-    else:
-      raise ValueError('Unknown pipeline state')
-
-
-def serve():
-  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
-  beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(JobService(), server)
-
-  server.add_insecure_port('[::]:50051')
-  server.start()
-
-  try:
-    while True:
-      time.sleep(_ONE_DAY_IN_SECONDS)
-  except KeyboardInterrupt:
-    server.stop(0)
-
-
-if __name__ == '__main__':
-  serve()

Reply via email to