[
https://issues.apache.org/jira/browse/BEAM-3792?focusedWorklogId=92824&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92824
]
ASF GitHub Bot logged work on BEAM-3792:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Apr/18 20:14
Start Date: 19/Apr/18 20:14
Worklog Time Spent: 10m
Work Description: robertwb closed pull request #4811: [BEAM-3792] Allow
manual specification of external address for ULR.
URL: https://github.com/apache/beam/pull/4811
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index a631a0c847b..152b71c3bc6 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -31,6 +31,8 @@
import grpc
from google.protobuf import text_format
+from apache_beam import coders
+from apache_beam.internal import pickler
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_job_api_pb2_grpc
@@ -55,7 +57,11 @@ class UniversalLocalRunner(runner.PipelineRunner):
the runner and worker(s).
"""
- def __init__(self, use_grpc=True, use_subprocesses=False):
+ def __init__(
+ self,
+ use_grpc=True,
+ use_subprocesses=False,
+ runner_api_address=None):
if use_subprocesses and not use_grpc:
raise ValueError("GRPC must be used with subprocesses")
super(UniversalLocalRunner, self).__init__()
@@ -65,6 +71,7 @@ def __init__(self, use_grpc=True, use_subprocesses=False):
self._job_service = None
self._job_service_lock = threading.Lock()
self._subprocess = None
+ self._runner_api_address = runner_api_address
def __del__(self):
# Best effort to not leave any dangling processes around.
@@ -79,7 +86,10 @@ def cleanup(self):
def _get_job_service(self):
with self._job_service_lock:
if not self._job_service:
- if self._use_subprocesses:
+ if self._runner_api_address:
+ self._job_service = beam_job_api_pb2_grpc.JobServiceStub(
+ grpc.insecure_channel(self._runner_api_address))
+ elif self._use_subprocesses:
self._job_service = self._start_local_runner_subprocess_job_service()
elif self._use_grpc:
@@ -137,11 +147,23 @@ def _start_local_runner_subprocess_job_service(self):
return job_service
def run_pipeline(self, pipeline):
+ # Java has different expectations about coders
+ # (windowed in Fn API, but *un*windowed in runner API), whereas the
+ # FnApiRunner treats them consistently, so we must guard this.
+ # See also BEAM-2717.
+ proto_pipeline, proto_context = pipeline.to_runner_api(return_context=True)
+ if self._runner_api_address:
+ for pcoll in proto_pipeline.components.pcollections.values():
+ if pcoll.coder_id not in proto_context.coders:
+ coder = coders.registry.get_coder(pickler.loads(pcoll.coder_id))
+ pcoll.coder_id = proto_context.coders.get_id(coder)
+ proto_context.coders.populate_map(proto_pipeline.components.coders)
+
job_service = self._get_job_service()
prepare_response = job_service.Prepare(
beam_job_api_pb2.PrepareJobRequest(
job_name='job',
- pipeline=pipeline.to_runner_api()))
+ pipeline=proto_pipeline))
run_response = job_service.Run(beam_job_api_pb2.RunJobRequest(
preparation_id=prepare_response.preparation_id))
return PipelineResult(job_service, run_response.job_id)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 92824)
Time Spent: 1h 20m (was: 1h 10m)
> Python submits portable pipelines to the Flink-served endpoint.
> ---------------------------------------------------------------
>
> Key: BEAM-3792
> URL: https://issues.apache.org/jira/browse/BEAM-3792
> Project: Beam
> Issue Type: Sub-task
> Components: runner-flink
> Reporter: Robert Bradshaw
> Assignee: Robert Bradshaw
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)