[
https://issues.apache.org/jira/browse/BEAM-7722?focusedWorklogId=275807&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275807
]
ASF GitHub Bot logged work on BEAM-7722:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Jul/19 10:55
Start Date: 12/Jul/19 10:55
Worklog Time Spent: 10m
Work Description: mxm commented on pull request #9043: [BEAM-7722] Add a
Python FlinkRunner that fetches and uses released artifacts.
URL: https://github.com/apache/beam/pull/9043#discussion_r302922062
##########
File path: sdks/python/apache_beam/runners/portability/job_server.py
##########
@@ -20,18 +20,143 @@
import atexit
import logging
import os
+import shutil
import signal
import socket
+import subprocess
import sys
+import tempfile
+import threading
import time
-from subprocess import Popen
-from subprocess import check_output
-from threading import Lock
+import grpc
-class DockerizedJobServer(object):
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+from apache_beam.runners.portability import local_job_service
+
+
+class JobServer(object):
+ def start(self):
+ """Starts this JobServer, returning a grpc service to which to submit jobs.
+ """
+ raise NotImplementedError()
+
+ def stop(self):
+ """Stops this job server."""
+ raise NotImplementedError()
+
+
+class ExternalJobServer(JobServer):
+ def __init__(self, endpoint):
+ self._endpoint = endpoint
+
+ def start(self):
+ channel = grpc.insecure_channel(self._endpoint)
+ grpc.channel_ready_future(channel).result()
+ return beam_job_api_pb2_grpc.JobServiceStub(channel)
+
+ def stop(self):
+ pass
+
+
+class EmbeddedJobServer(JobServer):
+ def start(self):
+ return local_job_service.LocalJobServicer()
+
+ def stop(self):
+ pass
+
+
+class PersistentJobServer(JobServer):
+ """Wraps a JobServer such that its stop will automatically be called on exit.
"""
- Spins up the JobServer in a docker container for local execution
+ def __init__(self, job_server):
+ self._lock = threading.Lock()
+ self._job_server = job_server
+ self._started = False
+
+ def start(self):
+ with self._lock:
+ if not self._started:
+ self._endpoint = self._job_server.start()
+ self._started = True
+ atexit.register(self.stop)
+ signal.signal(signal.SIGINT, self.stop)
+ return self._endpoint
+
+ def stop(self):
+ with self._lock:
+ if self._started:
+ self._job_server.stop()
+ self._started = False
+
+
+class SubprocessJobServer(JobServer):
+ """An abstract base class for JobServers run as an external process."""
+ def __init__(self):
+ self._process_lock = threading.RLock()
+ self._process = None
+ self._local_temp_root = None
+
+ def subprocess_cmd_and_endpoint(self):
+ raise NotImplementedError()
+
+ def start(self):
+ with self._process_lock:
+ if self._process:
+ self.stop()
+ cmd, endpoint = self.subprocess_cmd_and_endpoint()
+ logging.debug("Starting job service with %s", cmd)
+ try:
+ self._process = subprocess.Popen([str(arg) for arg in cmd])
+ self._local_temp_root = tempfile.mkdtemp(prefix='beam-temp')
+ wait_secs = .1
+ channel = grpc.insecure_channel(endpoint)
+ channel_ready = grpc.channel_ready_future(channel)
+ while True:
+ if self._process.poll() is not None:
+ logging.error("Starting job service with %s", cmd)
+ raise RuntimeError(
+ 'Job service failed to start up with error %s' %
+ self._process.poll())
+ try:
+ channel_ready.result(timeout=wait_secs)
+ break
+ except grpc.FutureTimeoutError:
+ wait_secs *= 1.2
+ logging.log(logging.WARNING if wait_secs > 1 else logging.DEBUG,
+ 'Waiting for jobs grpc channel to be ready at %s.',
+ endpoint)
+ return beam_job_api_pb2_grpc.JobServiceStub(channel)
+ except: # pylint: disable=bare-except
+ logging.exception("Error bringing up job service")
+ self.stop()
+ raise
+
+ def stop(self):
+ with self._process_lock:
+ if not self._process:
+ return
+ for _ in range(5):
+ if self._process.poll() is not None:
Review comment:
Add wait time here from above?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 275807)
Time Spent: 1h 40m (was: 1.5h)
> Simplify running of Beam Python on Flink
> ----------------------------------------
>
> Key: BEAM-7722
> URL: https://issues.apache.org/jira/browse/BEAM-7722
> Project: Beam
> Issue Type: Test
> Components: sdk-py-core
> Reporter: Robert Bradshaw
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> Currently this requires building and running several processes. We should be
> able to automate most of this away.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)