[ 
https://issues.apache.org/jira/browse/BEAM-4204?focusedWorklogId=101890&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101890
 ]

ASF GitHub Bot logged work on BEAM-4204:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/May/18 21:21
            Start Date: 14/May/18 21:21
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5301: [BEAM-4204] Splitting 
ULR in portable runner stub and job service
URL: https://github.com/apache/beam/pull/5301
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py 
b/sdks/python/apache_beam/runners/portability/local_job_service.py
new file mode 100644
index 00000000000..7e46deabee8
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import functools
+import logging
+import os
+import Queue as queue
+import subprocess
+import threading
+import time
+import traceback
+import uuid
+from concurrent import futures
+
+import grpc
+from google.protobuf import text_format
+
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.portability import fn_api_runner
+
+TERMINAL_STATES = [
+    beam_job_api_pb2.JobState.DONE,
+    beam_job_api_pb2.JobState.STOPPED,
+    beam_job_api_pb2.JobState.FAILED,
+    beam_job_api_pb2.JobState.CANCELLED,
+]
+
+
+class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
+  """
+    Experimental: No backward compatibility guaranteed.
+    Servicer for the Beam Job API.
+
+    Manages one or more pipelines, possibly concurrently.
+
+    This JobService uses a basic local implementation of runner to run the job.
+    This JobService is not capable of managing job on remote clusters.
+
+    By default, this JobService executes the job in process but still uses GRPC
+    to communicate pipeline and worker state.  It can also be configured to use
+    inline calls rather than GRPC (for speed) or launch completely separate
+    subprocesses for the runner and worker(s).
+    """
+
+  def __init__(self, worker_command_line=None, use_grpc=True):
+    self._worker_command_line = worker_command_line
+    self._use_grpc = use_grpc or bool(worker_command_line)
+    self._jobs = {}
+
+  def start_grpc_server(self, port=0):
+    self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=3))
+    port = self._server.add_insecure_port('localhost:%d' % port)
+    beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(self, self._server)
+    self._server.start()
+    logging.info('Grpc server started on port %s', port)
+    return port
+
+  def Prepare(self, request, context=None):
+    # For now, just use the job name as the job id.
+    logging.debug('Got Prepare request.')
+    preparation_id = '%s-%s' % (request.job_name, uuid.uuid4())
+    if self._worker_command_line:
+      sdk_harness_factory = functools.partial(SubprocessSdkWorker,
+                                              self._worker_command_line)
+    else:
+      sdk_harness_factory = None
+    self._jobs[preparation_id] = BeamJob(
+        preparation_id,
+        request.pipeline_options,
+        request.pipeline,
+        use_grpc=self._use_grpc,
+        sdk_harness_factory=sdk_harness_factory)
+    logging.debug("Prepared job '%s' as '%s'", request.job_name, 
preparation_id)
+    return beam_job_api_pb2.PrepareJobResponse(preparation_id=preparation_id)
+
+  def Run(self, request, context=None):
+    job_id = request.preparation_id
+    logging.debug("Runing job '%s'", job_id)
+    self._jobs[job_id].start()
+    return beam_job_api_pb2.RunJobResponse(job_id=job_id)
+
+  def GetState(self, request, context=None):
+    return beam_job_api_pb2.GetJobStateResponse(
+        state=self._jobs[request.job_id].state)
+
+  def Cancel(self, request, context=None):
+    self._jobs[request.job_id].cancel()
+    return beam_job_api_pb2.CancelJobRequest(
+        state=self._jobs[request.job_id].state)
+
+  def GetStateStream(self, request, context=None):
+    job = self._jobs[request.job_id]
+    state_queue = queue.Queue()
+    job.add_state_change_callback(lambda state: state_queue.put(state))
+    try:
+      current_state = state_queue.get()
+    except queue.Empty:
+      current_state = job.state
+    yield beam_job_api_pb2.GetJobStateResponse(state=current_state)
+    while current_state not in TERMINAL_STATES:
+      current_state = state_queue.get(block=True)
+      yield beam_job_api_pb2.GetJobStateResponse(state=current_state)
+
+  def GetMessageStream(self, request, context=None):
+    job = self._jobs[request.job_id]
+    current_state = job.state
+    while current_state not in TERMINAL_STATES:
+      msg = job.log_queue.get(block=True)
+      yield msg
+      if msg.HasField('state_response'):
+        current_state = msg.state_response.state
+    try:
+      while True:
+        yield job.log_queue.get(block=False)
+    except queue.Empty:
+      pass
+
+
+class SubprocessSdkWorker(object):
+  """Manages a SDK worker implemented as a subprocess communicating over grpc.
+    """
+
+  def __init__(self, worker_command_line, control_address):
+    self._worker_command_line = worker_command_line
+    self._control_address = control_address
+
+  def run(self):
+    logging_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+    logging_port = logging_server.add_insecure_port('[::]:0')
+    logging_server.start()
+    logging_servicer = BeamFnLoggingServicer()
+    beam_fn_api_pb2_grpc.add_BeamFnLoggingServicer_to_server(
+        logging_servicer, logging_server)
+    logging_descriptor = text_format.MessageToString(
+        endpoints_pb2.ApiServiceDescriptor(url='localhost:%s' % logging_port))
+
+    control_descriptor = text_format.MessageToString(
+        endpoints_pb2.ApiServiceDescriptor(url=self._control_address))
+
+    p = subprocess.Popen(
+        self._worker_command_line,
+        shell=True,
+        env=dict(
+            os.environ,
+            CONTROL_API_SERVICE_DESCRIPTOR=control_descriptor,
+            LOGGING_API_SERVICE_DESCRIPTOR=logging_descriptor))
+    try:
+      p.wait()
+      if p.returncode:
+        raise RuntimeError(
+            'Worker subprocess exited with return code %s' % p.returncode)
+    finally:
+      if p.poll() is None:
+        p.kill()
+      logging_server.stop(0)
+
+
+class BeamJob(threading.Thread):
+  """This class handles running and managing a single pipeline.
+
+    The current state of the pipeline is available as self.state.
+    """
+
+  def __init__(self,
+               job_id,
+               pipeline_options,
+               pipeline_proto,
+               use_grpc=True,
+               sdk_harness_factory=None):
+    super(BeamJob, self).__init__()
+    self._job_id = job_id
+    self._pipeline_options = pipeline_options
+    self._pipeline_proto = pipeline_proto
+    self._use_grpc = use_grpc
+    self._sdk_harness_factory = sdk_harness_factory
+    self._log_queue = queue.Queue()
+    self._state_change_callbacks = [
+        lambda new_state: self._log_queue.put(
+            beam_job_api_pb2.JobMessagesResponse(
+                state_response=
+                beam_job_api_pb2.GetJobStateResponse(state=new_state)))
+    ]
+    self._state = None
+    self.state = beam_job_api_pb2.JobState.STARTING
+    self.daemon = True
+
+  def add_state_change_callback(self, f):
+    self._state_change_callbacks.append(f)
+
+  @property
+  def log_queue(self):
+    return self._log_queue
+
+  @property
+  def state(self):
+    return self._state
+
+  @state.setter
+  def state(self, new_state):
+    for state_change_callback in self._state_change_callbacks:
+      state_change_callback(new_state)
+    self._state = new_state
+
+  def run(self):
+    with JobLogHandler(self._log_queue):
+      try:
+        fn_api_runner.FnApiRunner(
+            use_grpc=self._use_grpc,
+            sdk_harness_factory=self._sdk_harness_factory).run_via_runner_api(
+                self._pipeline_proto)
+        logging.info('Successfully completed job.')
+        self.state = beam_job_api_pb2.JobState.DONE
+      except:  # pylint: disable=bare-except
+        logging.exception('Error running pipeline.')
+        traceback.print_exc()
+        self.state = beam_job_api_pb2.JobState.FAILED
+        raise
+
+  def cancel(self):
+    if self.state not in TERMINAL_STATES:
+      self.state = beam_job_api_pb2.JobState.CANCELLING
+      # TODO(robertwb): Actually cancel...
+      self.state = beam_job_api_pb2.JobState.CANCELLED
+
+
+class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
+
+  def Logging(self, log_bundles, context=None):
+    for log_bundle in log_bundles:
+      for log_entry in log_bundle.log_entries:
+        logging.info('Worker: %s', str(log_entry).replace('\n', ' '))
+    return iter([])
+
+
+class JobLogHandler(logging.Handler):
+  """Captures logs to be returned via the Beam Job API.
+
+    Enabled via the with statement."""
+
+  # Mapping from logging levels to LogEntry levels.
+  LOG_LEVEL_MAP = {
+      logging.FATAL: beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR,
+      logging.ERROR: beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR,
+      logging.WARNING: beam_job_api_pb2.JobMessage.JOB_MESSAGE_WARNING,
+      logging.INFO: beam_job_api_pb2.JobMessage.JOB_MESSAGE_BASIC,
+      logging.DEBUG: beam_job_api_pb2.JobMessage.JOB_MESSAGE_DEBUG,
+  }
+
+  def __init__(self, message_queue):
+    super(JobLogHandler, self).__init__()
+    self._message_queue = message_queue
+    self._last_id = 0
+    self._logged_thread = None
+
+  def __enter__(self):
+    # Remember the current thread to demultiplex the logs of concurrently
+    # running pipelines (as Python log handlers are global).
+    self._logged_thread = threading.current_thread()
+    logging.getLogger().addHandler(self)
+
+  def __exit__(self, *args):
+    self._logged_thread = None
+    self.close()
+
+  def _next_id(self):
+    self._last_id += 1
+    return str(self._last_id)
+
+  def emit(self, record):
+    if self._logged_thread is threading.current_thread():
+      self._message_queue.put(
+          beam_job_api_pb2.JobMessagesResponse(
+              message_response=beam_job_api_pb2.JobMessage(
+                  message_id=self._next_id(),
+                  time=time.strftime('%Y-%m-%d %H:%M:%S.',
+                                     time.localtime(record.created)),
+                  importance=self.LOG_LEVEL_MAP[record.levelno],
+                  message_text=self.format(record))))
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner_main.py 
b/sdks/python/apache_beam/runners/portability/local_job_service_main.py
similarity index 88%
rename from 
sdks/python/apache_beam/runners/portability/universal_local_runner_main.py
rename to sdks/python/apache_beam/runners/portability/local_job_service_main.py
index 93781e1c6f5..b6f2ef9710c 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner_main.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service_main.py
@@ -20,7 +20,7 @@
 import sys
 import time
 
-from apache_beam.runners.portability import universal_local_runner
+from apache_beam.runners.portability import local_job_service
 
 
 def run(argv):
@@ -33,8 +33,8 @@ def run(argv):
   parser.add_argument('--worker_command_line',
                       help='command line for starting up a worker process')
   options = parser.parse_args(argv)
-  job_servicer = 
universal_local_runner.JobServicer(options.worker_command_line)
-  port = job_servicer.start_grpc(options.port)
+  job_servicer = 
local_job_service.LocalJobServicer(options.worker_command_line)
+  port = job_servicer.start_grpc_server(options.port)
   while True:
     logging.info("Listening for jobs at %d", port)
     time.sleep(300)
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
new file mode 100644
index 00000000000..190355d7989
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import os
+import threading
+
+import grpc
+
+from apache_beam import coders
+from apache_beam.internal import pickler
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+from apache_beam.runners import pipeline_context
+from apache_beam.runners import runner
+
+TERMINAL_STATES = [
+    beam_job_api_pb2.JobState.DONE,
+    beam_job_api_pb2.JobState.STOPPED,
+    beam_job_api_pb2.JobState.FAILED,
+    beam_job_api_pb2.JobState.CANCELLED,
+]
+
+
+class PortableRunner(runner.PipelineRunner):
+  """
+    Experimental: No backward compatibility guaranteed.
+    A BeamRunner that executes Python pipelines via the Beam Job API.
+
+    This runner is a stub and does not run the actual job.
+    This runner schedules the job on a job service. The responsibility of
+    running and managing the job lies with the job service used.
+  """
+
+  # TODO(angoenka): Read all init parameters from pipeline_options.
+  def __init__(self,
+               runner_api_address=None,
+               job_service_address=None,
+               docker_image=None):
+    super(PortableRunner, self).__init__()
+
+    self._subprocess = None
+    self._runner_api_address = runner_api_address
+    if not job_service_address:
+      raise ValueError(
+          'job_service_address should be provided while creating runner.')
+    self._job_service_address = job_service_address
+    self._docker_image = docker_image or self.default_docker_image()
+
+  @staticmethod
+  def default_docker_image():
+    if 'USER' in os.environ:
+      # Perhaps also test if this was built?
+      logging.info('Using latest locally built Python SDK docker image.')
+      return os.environ['USER'] + 
'-docker.apache.bintray.io/beam/python:latest'
+    else:
+      logging.warning('Could not find a Python SDK docker image.')
+      return 'unknown'
+
+  def _create_job_service(self):
+    return beam_job_api_pb2_grpc.JobServiceStub(
+        grpc.insecure_channel(self._job_service_address))
+
+  def run_pipeline(self, pipeline):
+    # Java has different expectations about coders
+    # (windowed in Fn API, but *un*windowed in runner API), whereas the
+    # FnApiRunner treats them consistently, so we must guard this.
+    # See also BEAM-2717.
+    proto_context = pipeline_context.PipelineContext(
+        default_environment_url=self._docker_image)
+    proto_pipeline = pipeline.to_runner_api(context=proto_context)
+    if self._runner_api_address:
+      for pcoll in proto_pipeline.components.pcollections.values():
+        if pcoll.coder_id not in proto_context.coders:
+          coder = coders.registry.get_coder(pickler.loads(pcoll.coder_id))
+          pcoll.coder_id = proto_context.coders.get_id(coder)
+      proto_context.coders.populate_map(proto_pipeline.components.coders)
+
+    job_service = self._create_job_service()
+    prepare_response = job_service.Prepare(
+        beam_job_api_pb2.PrepareJobRequest(
+            job_name='job', pipeline=proto_pipeline))
+    run_response = job_service.Run(
+        beam_job_api_pb2.RunJobRequest(
+            preparation_id=prepare_response.preparation_id))
+    return PipelineResult(job_service, run_response.job_id)
+
+
+class PipelineResult(runner.PipelineResult):
+
+  def __init__(self, job_service, job_id):
+    super(PipelineResult, self).__init__(beam_job_api_pb2.JobState.UNSPECIFIED)
+    self._job_service = job_service
+    self._job_id = job_id
+    self._messages = []
+
+  def cancel(self):
+    self._job_service.Cancel()
+
+  @property
+  def state(self):
+    runner_api_state = self._job_service.GetState(
+        beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)).state
+    self._state = self._runner_api_state_to_pipeline_state(runner_api_state)
+    return self._state
+
+  @staticmethod
+  def _runner_api_state_to_pipeline_state(runner_api_state):
+    return getattr(runner.PipelineState,
+                   beam_job_api_pb2.JobState.Enum.Name(runner_api_state))
+
+  @staticmethod
+  def _pipeline_state_to_runner_api_state(pipeline_state):
+    return beam_job_api_pb2.JobState.Enum.Value(pipeline_state)
+
+  def wait_until_finish(self):
+
+    def read_messages():
+      for message in self._job_service.GetMessageStream(
+          beam_job_api_pb2.JobMessagesRequest(job_id=self._job_id)):
+        self._messages.append(message)
+
+    t = threading.Thread(target=read_messages, name='wait_until_finish_read')
+    t.daemon = True
+    t.start()
+
+    for state_response in self._job_service.GetStateStream(
+        beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)):
+      self._state = self._runner_api_state_to_pipeline_state(
+          state_response.state)
+      if state_response.state in TERMINAL_STATES:
+        break
+    if self._state != runner.PipelineState.DONE:
+      raise RuntimeError(
+          'Pipeline %s failed in state %s.' % (self._job_id, self._state))
+    return self._state
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py 
b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
similarity index 52%
rename from 
sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
rename to sdks/python/apache_beam/runners/portability/portable_runner_test.py
index 56dc514b416..3e680a35b01 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
@@ -19,19 +19,27 @@
 import logging
 import platform
 import signal
+import socket
+import subprocess
 import sys
 import threading
+import time
 import traceback
 import unittest
 
+import grpc
+
 import apache_beam as beam
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import beam_job_api_pb2_grpc
 from apache_beam.runners.portability import fn_api_runner_test
-from apache_beam.runners.portability import universal_local_runner
+from apache_beam.runners.portability import portable_runner
+from apache_beam.runners.portability.local_job_service import LocalJobServicer
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 
 
-class UniversalLocalRunnerTest(fn_api_runner_test.FnApiRunnerTest):
+class PortableRunnerTest(fn_api_runner_test.FnApiRunnerTest):
 
   TIMEOUT_SECS = 30
 
@@ -58,19 +66,85 @@ def tearDown(self):
     if platform.system() != 'Windows':
       signal.alarm(0)
 
+  @staticmethod
+  def _pick_unused_port():
+    """Not perfect, but we have to provide a port to the subprocess."""
+    # TODO(robertwb): Consider letting the subprocess communicate a choice of
+    # port back.
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    s.bind(('localhost', 0))
+    _, port = s.getsockname()
+    s.close()
+    return port
+
+  @classmethod
+  def _start_local_runner_subprocess_job_service(cls):
+    if cls._subprocess:
+      # Kill the old one if it exists.
+      cls._subprocess.kill()
+    # TODO(robertwb): Consider letting the subprocess pick one and
+    # communicate it back...
+    port = cls._pick_unused_port()
+    logging.info('Starting server on port %d.', port)
+    cls._subprocess = subprocess.Popen([
+        sys.executable, '-m',
+        'apache_beam.runners.portability.local_job_service_main', '-p',
+        str(port), '--worker_command_line',
+        '%s -m apache_beam.runners.worker.sdk_worker_main' % sys.executable
+    ])
+    address = 'localhost:%d' % port
+    job_service = beam_job_api_pb2_grpc.JobServiceStub(
+        grpc.insecure_channel(address))
+    logging.info('Waiting for server to be ready...')
+    start = time.time()
+    timeout = 30
+    while True:
+      time.sleep(0.1)
+      if cls._subprocess.poll() is not None:
+        raise RuntimeError(
+            'Subprocess terminated unexpectedly with exit code %d.' %
+            cls._subprocess.returncode)
+      elif time.time() - start > timeout:
+        raise RuntimeError(
+            'Pipeline timed out waiting for job service subprocess.')
+      else:
+        try:
+          job_service.GetState(
+              beam_job_api_pb2.GetJobStateRequest(job_id='[fake]'))
+          break
+        except grpc.RpcError as exn:
+          if exn.code != grpc.StatusCode.UNAVAILABLE:
+            # We were able to contact the service for our fake state request.
+            break
+    logging.info('Server ready.')
+    return address
+
+  @classmethod
+  def _create_job_service(cls):
+    if cls._use_subprocesses:
+      return cls._start_local_runner_subprocess_job_service()
+    elif cls._use_grpc:
+      # Use GRPC for workers.
+      cls._servicer = LocalJobServicer(use_grpc=True)
+      return 'localhost:%d' % cls._servicer.start_grpc_server()
+    else:
+      # Do not use GRPC for worker.
+      cls._servicer = LocalJobServicer(use_grpc=False)
+      return 'localhost:%d' % cls._servicer.start_grpc_server()
+
   @classmethod
   def get_runner(cls):
     # Don't inherit.
     if '_runner' not in cls.__dict__:
-      cls._runner = universal_local_runner.UniversalLocalRunner(
-          use_grpc=cls._use_grpc,
-          use_subprocesses=cls._use_subprocesses)
+      cls._runner = portable_runner.PortableRunner(
+          job_service_address=cls._create_job_service())
     return cls._runner
 
   @classmethod
   def tearDownClass(cls):
-    if hasattr(cls, '_runner'):
-      cls._runner.cleanup()
+    if hasattr(cls, '_subprocess'):
+      cls._subprocess.kill()
+      time.sleep(0.1)
 
   def create_pipeline(self):
     return beam.Pipeline(self.get_runner())
@@ -105,12 +179,12 @@ def test_error_traceback_includes_user_code(self):
   # Inherits all tests from fn_api_runner_test.FnApiRunnerTest
 
 
-class UniversalLocalRunnerTestWithGrpc(UniversalLocalRunnerTest):
+class PortableRunnerTestWithGrpc(PortableRunnerTest):
   _use_grpc = True
 
 
 @unittest.skip("BEAM-3040")
-class UniversalLocalRunnerTestWithSubprocesses(UniversalLocalRunnerTest):
+class PortableRunnerTestWithSubprocesses(PortableRunnerTest):
   _use_grpc = True
   _use_subprocesses = True
 
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner.py 
b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
deleted file mode 100644
index 2753f6186dc..00000000000
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ /dev/null
@@ -1,477 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import functools
-import logging
-import os
-import Queue as queue
-import socket
-import subprocess
-import sys
-import threading
-import time
-import traceback
-import uuid
-from concurrent import futures
-
-import grpc
-from google.protobuf import text_format
-
-from apache_beam import coders
-from apache_beam.internal import pickler
-from apache_beam.portability.api import beam_fn_api_pb2_grpc
-from apache_beam.portability.api import beam_job_api_pb2
-from apache_beam.portability.api import beam_job_api_pb2_grpc
-from apache_beam.portability.api import endpoints_pb2
-from apache_beam.runners import pipeline_context
-from apache_beam.runners import runner
-from apache_beam.runners.portability import fn_api_runner
-
-TERMINAL_STATES = [
-    beam_job_api_pb2.JobState.DONE,
-    beam_job_api_pb2.JobState.STOPPED,
-    beam_job_api_pb2.JobState.FAILED,
-    beam_job_api_pb2.JobState.CANCELLED,
-]
-
-
-class UniversalLocalRunner(runner.PipelineRunner):
-  """A BeamRunner that executes Python pipelines via the Beam Job API.
-
-  By default, this runner executes in process but still uses GRPC to 
communicate
-  pipeline and worker state.  It can also be configured to use inline calls
-  rather than GRPC (for speed) or launch completely separate subprocesses for
-  the runner and worker(s).
-  """
-
-  def __init__(
-      self,
-      use_grpc=True,
-      use_subprocesses=False,
-      runner_api_address=None,
-      docker_image=None):
-    if use_subprocesses and not use_grpc:
-      raise ValueError("GRPC must be used with subprocesses")
-    super(UniversalLocalRunner, self).__init__()
-    self._use_grpc = use_grpc
-    self._use_subprocesses = use_subprocesses
-
-    self._job_service = None
-    self._job_service_lock = threading.Lock()
-    self._subprocess = None
-    self._runner_api_address = runner_api_address
-    self._docker_image = docker_image or self.default_docker_image()
-
-  def __del__(self):
-    # Best effort to not leave any dangling processes around.
-    self.cleanup()
-
-  def cleanup(self):
-    if self._subprocess:
-      self._subprocess.kill()
-      time.sleep(0.1)
-    self._subprocess = None
-
-  @staticmethod
-  def default_docker_image():
-    if 'USER' in os.environ:
-      # Perhaps also test if this was built?
-      logging.info('Using latest locally built Python SDK docker image.')
-      return os.environ['USER'] + 
'-docker.apache.bintray.io/beam/python:latest'
-    else:
-      logging.warning('Could not find a Python SDK docker image.')
-      return 'unknown'
-
-  def _get_job_service(self):
-    with self._job_service_lock:
-      if not self._job_service:
-        if self._runner_api_address:
-          self._job_service = beam_job_api_pb2_grpc.JobServiceStub(
-              grpc.insecure_channel(self._runner_api_address))
-        elif self._use_subprocesses:
-          self._job_service = self._start_local_runner_subprocess_job_service()
-
-        elif self._use_grpc:
-          self._servicer = JobServicer(use_grpc=True)
-          self._job_service = beam_job_api_pb2_grpc.JobServiceStub(
-              grpc.insecure_channel(
-                  'localhost:%d' % self._servicer.start_grpc()))
-
-        else:
-          self._job_service = JobServicer(use_grpc=False)
-
-    return self._job_service
-
-  def _start_local_runner_subprocess_job_service(self):
-    if self._subprocess:
-      # Kill the old one if it exists.
-      self._subprocess.kill()
-    # TODO(robertwb): Consider letting the subprocess pick one and
-    # communicate it back...
-    port = _pick_unused_port()
-    logging.info("Starting server on port %d.", port)
-    self._subprocess = subprocess.Popen([
-        sys.executable,
-        '-m',
-        'apache_beam.runners.portability.universal_local_runner_main',
-        '-p',
-        str(port),
-        '--worker_command_line',
-        '%s -m apache_beam.runners.worker.sdk_worker_main' % sys.executable
-    ])
-    job_service = beam_job_api_pb2_grpc.JobServiceStub(
-        grpc.insecure_channel('localhost:%d' % port))
-    logging.info("Waiting for server to be ready...")
-    start = time.time()
-    timeout = 30
-    while True:
-      time.sleep(0.1)
-      if self._subprocess.poll() is not None:
-        raise RuntimeError(
-            "Subprocess terminated unexpectedly with exit code %d." %
-            self._subprocess.returncode)
-      elif time.time() - start > timeout:
-        raise RuntimeError(
-            "Pipeline timed out waiting for job service subprocess.")
-      else:
-        try:
-          job_service.GetState(
-              beam_job_api_pb2.GetJobStateRequest(job_id='[fake]'))
-          break
-        except grpc.RpcError as exn:
-          if exn.code != grpc.StatusCode.UNAVAILABLE:
-            # We were able to contact the service for our fake state request.
-            break
-    logging.info("Server ready.")
-    return job_service
-
-  def run_pipeline(self, pipeline):
-    # Java has different expectations about coders
-    # (windowed in Fn API, but *un*windowed in runner API), whereas the
-    # FnApiRunner treats them consistently, so we must guard this.
-    # See also BEAM-2717.
-    proto_context = pipeline_context.PipelineContext(
-        default_environment_url=self._docker_image)
-    proto_pipeline = pipeline.to_runner_api(context=proto_context)
-    if self._runner_api_address:
-      for pcoll in proto_pipeline.components.pcollections.values():
-        if pcoll.coder_id not in proto_context.coders:
-          coder = coders.registry.get_coder(pickler.loads(pcoll.coder_id))
-          pcoll.coder_id = proto_context.coders.get_id(coder)
-      proto_context.coders.populate_map(proto_pipeline.components.coders)
-
-    job_service = self._get_job_service()
-    prepare_response = job_service.Prepare(
-        beam_job_api_pb2.PrepareJobRequest(
-            job_name='job',
-            pipeline=proto_pipeline))
-    run_response = job_service.Run(beam_job_api_pb2.RunJobRequest(
-        preparation_id=prepare_response.preparation_id))
-    return PipelineResult(job_service, run_response.job_id)
-
-
-class PipelineResult(runner.PipelineResult):
-  def __init__(self, job_service, job_id):
-    super(PipelineResult, self).__init__(beam_job_api_pb2.JobState.UNSPECIFIED)
-    self._job_service = job_service
-    self._job_id = job_id
-    self._messages = []
-
-  def cancel(self):
-    self._job_service.Cancel()
-
-  @property
-  def state(self):
-    runner_api_state = self._job_service.GetState(
-        beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)).state
-    self._state = self._runner_api_state_to_pipeline_state(runner_api_state)
-    return self._state
-
-  @staticmethod
-  def _runner_api_state_to_pipeline_state(runner_api_state):
-    return getattr(
-        runner.PipelineState,
-        beam_job_api_pb2.JobState.Enum.Name(runner_api_state))
-
-  @staticmethod
-  def _pipeline_state_to_runner_api_state(pipeline_state):
-    return beam_job_api_pb2.JobState.Enum.Value(pipeline_state)
-
-  def wait_until_finish(self):
-    def read_messages():
-      for message in self._job_service.GetMessageStream(
-          beam_job_api_pb2.JobMessagesRequest(job_id=self._job_id)):
-        self._messages.append(message)
-    t = threading.Thread(target=read_messages, name='wait_until_finish_read')
-    t.daemon = True
-    t.start()
-
-    for state_response in self._job_service.GetStateStream(
-        beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)):
-      self._state = self._runner_api_state_to_pipeline_state(
-          state_response.state)
-      if state_response.state in TERMINAL_STATES:
-        break
-    if self._state != runner.PipelineState.DONE:
-      raise RuntimeError(
-          "Pipeline %s failed in state %s." % (self._job_id, self._state))
-    return self._state
-
-
-class BeamJob(threading.Thread):
-  """This class handles running and managing a single pipeline.
-
-  The current state of the pipeline is available as self.state.
-  """
-  def __init__(self, job_id, pipeline_options, pipeline_proto,
-               use_grpc=True, sdk_harness_factory=None):
-    super(BeamJob, self).__init__()
-    self._job_id = job_id
-    self._pipeline_options = pipeline_options
-    self._pipeline_proto = pipeline_proto
-    self._use_grpc = use_grpc
-    self._sdk_harness_factory = sdk_harness_factory
-    self._log_queue = queue.Queue()
-    self._state_change_callbacks = [
-        lambda new_state: self._log_queue.put(
-            beam_job_api_pb2.JobMessagesResponse(
-                state_response=
-                beam_job_api_pb2.GetJobStateResponse(state=new_state)))
-    ]
-    self._state = None
-    self.state = beam_job_api_pb2.JobState.STARTING
-    self.daemon = True
-
-  def add_state_change_callback(self, f):
-    self._state_change_callbacks.append(f)
-
-  @property
-  def log_queue(self):
-    return self._log_queue
-
-  @property
-  def state(self):
-    return self._state
-
-  @state.setter
-  def state(self, new_state):
-    for state_change_callback in self._state_change_callbacks:
-      state_change_callback(new_state)
-    self._state = new_state
-
-  def run(self):
-    with JobLogHandler(self._log_queue):
-      try:
-        fn_api_runner.FnApiRunner(
-            use_grpc=self._use_grpc,
-            sdk_harness_factory=self._sdk_harness_factory
-        ).run_via_runner_api(self._pipeline_proto)
-        logging.info("Successfully completed job.")
-        self.state = beam_job_api_pb2.JobState.DONE
-      except:  # pylint: disable=bare-except
-        logging.exception("Error running pipeline.")
-        traceback.print_exc()
-        self.state = beam_job_api_pb2.JobState.FAILED
-        raise
-
-  def cancel(self):
-    if self.state not in TERMINAL_STATES:
-      self.state = beam_job_api_pb2.JobState.CANCELLING
-      # TODO(robertwb): Actually cancel...
-      self.state = beam_job_api_pb2.JobState.CANCELLED
-
-
-class JobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
-  """Servicer for the Beam Job API.
-
-  Manages one or more pipelines, possibly concurrently.
-  """
-  def __init__(
-      self, worker_command_line=None, use_grpc=True):
-    self._worker_command_line = worker_command_line
-    self._use_grpc = use_grpc or bool(worker_command_line)
-    self._jobs = {}
-
-  def start_grpc(self, port=0):
-    self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=3))
-    port = self._server.add_insecure_port('localhost:%d' % port)
-    beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(self, self._server)
-    self._server.start()
-    logging.info("Grpc server started on port %s", port)
-    return port
-
-  def Prepare(self, request, context=None):
-    # For now, just use the job name as the job id.
-    logging.debug("Got Prepare request.")
-    preparation_id = "%s-%s" % (request.job_name, uuid.uuid4())
-    if self._worker_command_line:
-      sdk_harness_factory = functools.partial(
-          SubprocessSdkWorker, self._worker_command_line)
-    else:
-      sdk_harness_factory = None
-    self._jobs[preparation_id] = BeamJob(
-        preparation_id, request.pipeline_options, request.pipeline,
-        use_grpc=self._use_grpc, sdk_harness_factory=sdk_harness_factory)
-    logging.debug("Prepared job '%s' as '%s'", request.job_name, 
preparation_id)
-    return beam_job_api_pb2.PrepareJobResponse(preparation_id=preparation_id)
-
-  def Run(self, request, context=None):
-    job_id = request.preparation_id
-    logging.debug("Runing job '%s'", job_id)
-    self._jobs[job_id].start()
-    return beam_job_api_pb2.RunJobResponse(job_id=job_id)
-
-  def GetState(self, request, context=None):
-    return beam_job_api_pb2.GetJobStateResponse(
-        state=self._jobs[request.job_id].state)
-
-  def Cancel(self, request, context=None):
-    self._jobs[request.job_id].cancel()
-    return beam_job_api_pb2.CancelJobRequest(
-        state=self._jobs[request.job_id].state)
-
-  def GetStateStream(self, request, context=None):
-    job = self._jobs[request.job_id]
-    state_queue = queue.Queue()
-    job.add_state_change_callback(lambda state: state_queue.put(state))
-    try:
-      current_state = state_queue.get()
-    except queue.Empty:
-      current_state = job.state
-    yield beam_job_api_pb2.GetJobStateResponse(
-        state=current_state)
-    while current_state not in TERMINAL_STATES:
-      current_state = state_queue.get(block=True)
-      yield beam_job_api_pb2.GetJobStateResponse(
-          state=current_state)
-
-  def GetMessageStream(self, request, context=None):
-    job = self._jobs[request.job_id]
-    current_state = job.state
-    while current_state not in TERMINAL_STATES:
-      msg = job.log_queue.get(block=True)
-      yield msg
-      if msg.HasField('state_response'):
-        current_state = msg.state_response.state
-    try:
-      while True:
-        yield job.log_queue.get(block=False)
-    except queue.Empty:
-      pass
-
-
-class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
-  def Logging(self, log_bundles, context=None):
-    for log_bundle in log_bundles:
-      for log_entry in log_bundle.log_entries:
-        logging.info('Worker: %s', str(log_entry).replace('\n', ' '))
-    return iter([])
-
-
-class SubprocessSdkWorker(object):
-  """Manages a SDK worker implemented as a subprocess communicating over grpc.
-  """
-
-  def __init__(self, worker_command_line, control_address):
-    self._worker_command_line = worker_command_line
-    self._control_address = control_address
-
-  def run(self):
-    logging_server = grpc.server(
-        futures.ThreadPoolExecutor(max_workers=10))
-    logging_port = logging_server.add_insecure_port('[::]:0')
-    logging_server.start()
-    logging_servicer = BeamFnLoggingServicer()
-    beam_fn_api_pb2_grpc.add_BeamFnLoggingServicer_to_server(
-        logging_servicer, logging_server)
-    logging_descriptor = text_format.MessageToString(
-        endpoints_pb2.ApiServiceDescriptor(url='localhost:%s' % logging_port))
-
-    control_descriptor = text_format.MessageToString(
-        endpoints_pb2.ApiServiceDescriptor(url=self._control_address))
-
-    p = subprocess.Popen(
-        self._worker_command_line,
-        shell=True,
-        env=dict(os.environ,
-                 CONTROL_API_SERVICE_DESCRIPTOR=control_descriptor,
-                 LOGGING_API_SERVICE_DESCRIPTOR=logging_descriptor))
-    try:
-      p.wait()
-      if p.returncode:
-        raise RuntimeError(
-            "Worker subprocess exited with return code %s" % p.returncode)
-    finally:
-      if p.poll() is None:
-        p.kill()
-      logging_server.stop(0)
-
-
-class JobLogHandler(logging.Handler):
-  """Captures logs to be returned via the Beam Job API.
-
-  Enabled via the with statement."""
-
-  # Mapping from logging levels to LogEntry levels.
-  LOG_LEVEL_MAP = {
-      logging.FATAL: beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR,
-      logging.ERROR: beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR,
-      logging.WARNING: beam_job_api_pb2.JobMessage.JOB_MESSAGE_WARNING,
-      logging.INFO: beam_job_api_pb2.JobMessage.JOB_MESSAGE_BASIC,
-      logging.DEBUG: beam_job_api_pb2.JobMessage.JOB_MESSAGE_DEBUG,
-  }
-
-  def __init__(self, message_queue):
-    super(JobLogHandler, self).__init__()
-    self._message_queue = message_queue
-    self._last_id = 0
-    self._logged_thread = None
-
-  def __enter__(self):
-    # Remember the current thread to demultiplex the logs of concurrently
-    # running pipelines (as Python log handlers are global).
-    self._logged_thread = threading.current_thread()
-    logging.getLogger().addHandler(self)
-
-  def __exit__(self, *args):
-    self._logged_thread = None
-    self.close()
-
-  def _next_id(self):
-    self._last_id += 1
-    return str(self._last_id)
-
-  def emit(self, record):
-    if self._logged_thread is threading.current_thread():
-      self._message_queue.put(beam_job_api_pb2.JobMessagesResponse(
-          message_response=beam_job_api_pb2.JobMessage(
-              message_id=self._next_id(),
-              time=time.strftime(
-                  '%Y-%m-%d %H:%M:%S.', time.localtime(record.created)),
-              importance=self.LOG_LEVEL_MAP[record.levelno],
-              message_text=self.format(record))))
-
-
-def _pick_unused_port():
-  """Not perfect, but we have to provide a port to the subprocess."""
-  # TODO(robertwb): Consider letting the subprocess communicate a choice of
-  # port back.
-  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-  s.bind(('localhost', 0))
-  _, port = s.getsockname()
-  s.close()
-  return port


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101890)
    Time Spent: 2.5h  (was: 2h 20m)

> Python: PortableRunner - p.run() via given JobService
> -----------------------------------------------------
>
>                 Key: BEAM-4204
>                 URL: https://issues.apache.org/jira/browse/BEAM-4204
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>            Priority: Major
>             Fix For: Not applicable
>
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Like BEAM-4071 but for Python. Is this fully encompassed by 
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/universal_local_runner.py]
>  ? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to