[ 
https://issues.apache.org/jira/browse/BEAM-7722?focusedWorklogId=275804&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275804
 ]

ASF GitHub Bot logged work on BEAM-7722:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Jul/19 10:55
            Start Date: 12/Jul/19 10:55
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #9043: [BEAM-7722] Add a 
Python FlinkRunner that fetches and uses released artifacts.
URL: https://github.com/apache/beam/pull/9043#discussion_r302920482
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/job_server.py
 ##########
 @@ -20,18 +20,143 @@
 import atexit
 import logging
 import os
+import shutil
 import signal
 import socket
+import subprocess
 import sys
+import tempfile
+import threading
 import time
-from subprocess import Popen
-from subprocess import check_output
-from threading import Lock
 
+import grpc
 
-class DockerizedJobServer(object):
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+from apache_beam.runners.portability import local_job_service
+
+
+class JobServer(object):
+  def start(self):
+    """Starts this JobServer, returning a grpc service to which to submit jobs.
+    """
+    raise NotImplementedError()
+
+  def stop(self):
+    """Stops this job server."""
+    raise NotImplementedError()
+
+
+class ExternalJobServer(JobServer):
+  def __init__(self, endpoint):
+    self._endpoint = endpoint
+
+  def start(self):
+    channel = grpc.insecure_channel(self._endpoint)
+    grpc.channel_ready_future(channel).result()
+    return beam_job_api_pb2_grpc.JobServiceStub(channel)
+
+  def stop(self):
+    pass
+
+
+class EmbeddedJobServer(JobServer):
+  def start(self):
+    return local_job_service.LocalJobServicer()
+
+  def stop(self):
+    pass
+
+
+class PersistentJobServer(JobServer):
+  """Wraps a JobServer such that its stop will automatically be called on exit.
   """
-   Spins up the JobServer in a docker container for local execution
+  def __init__(self, job_server):
+    self._lock = threading.Lock()
+    self._job_server = job_server
+    self._started = False
+
+  def start(self):
+    with self._lock:
+      if not self._started:
+        self._endpoint = self._job_server.start()
+        self._started = True
+        atexit.register(self.stop)
+        signal.signal(signal.SIGINT, self.stop)
+    return self._endpoint
+
+  def stop(self):
+    with self._lock:
+      if self._started:
+        self._job_server.stop()
+      self._started = False
 
 Review comment:
   ```suggestion
           self._started = False
   ```
   Not important.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 275804)
    Time Spent: 1h 20m  (was: 1h 10m)

> Simplify running of Beam Python on Flink
> ----------------------------------------
>
>                 Key: BEAM-7722
>                 URL: https://issues.apache.org/jira/browse/BEAM-7722
>             Project: Beam
>          Issue Type: Test
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently this requires building and running several processes. We should be 
> able to automate most of this away. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to