[ 
https://issues.apache.org/jira/browse/BEAM-4097?focusedWorklogId=94490&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94490
 ]

ASF GitHub Bot logged work on BEAM-4097:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/18 06:59
            Start Date: 24/Apr/18 06:59
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #5191: [BEAM-4097] Set 
environment for Python sdk function specs.
URL: https://github.com/apache/beam/pull/5191
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index ecbdd538d38..fefc1999f1e 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -245,6 +245,8 @@ def to_runner_api(self, context):
     urn, typed_param, components = self.to_runner_api_parameter(context)
     return beam_runner_api_pb2.Coder(
         spec=beam_runner_api_pb2.SdkFunctionSpec(
+            environment_id=(
+                context.default_environment_id() if context else None),
             spec=beam_runner_api_pb2.FunctionSpec(
                 urn=urn,
                 payload=typed_param.SerializeToString()
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 31fe5c51952..bc811a03fa3 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -590,11 +590,12 @@ def visit_value(self, value, _):
     self.visit(Visitor())
     return Visitor.ok
 
-  def to_runner_api(self, return_context=False):
+  def to_runner_api(self, return_context=False, context=None):
     """For internal use only; no backwards-compatibility guarantees."""
     from apache_beam.runners import pipeline_context
     from apache_beam.portability.api import beam_runner_api_pb2
-    context = pipeline_context.PipelineContext()
+    if context is None:
+      context = pipeline_context.PipelineContext()
     # Mutates context; placing inline would force dependence on
     # argument evaluation order.
     root_transform_id = context.transforms.get_id(self._root_transform())
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index 462b4eab459..4cb57d2b732 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -352,15 +352,17 @@ def __init__(self, access_pattern, window_mapping_fn, 
view_fn, coder):
     self.view_fn = view_fn
     self.coder = coder
 
-  def to_runner_api(self, unused_context):
+  def to_runner_api(self, context):
     return beam_runner_api_pb2.SideInput(
         access_pattern=beam_runner_api_pb2.FunctionSpec(
             urn=self.access_pattern),
         view_fn=beam_runner_api_pb2.SdkFunctionSpec(
+            environment_id=context.default_environment_id(),
             spec=beam_runner_api_pb2.FunctionSpec(
                 urn=python_urns.PICKLED_VIEWFN,
                 payload=pickler.dumps((self.view_fn, self.coder)))),
         window_mapping_fn=beam_runner_api_pb2.SdkFunctionSpec(
+            environment_id=context.default_environment_id(),
             spec=beam_runner_api_pb2.FunctionSpec(
                 urn=python_urns.PICKLED_WINDOW_MAPPING_FN,
                 payload=pickler.dumps(self.window_mapping_fn))))
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py 
b/sdks/python/apache_beam/runners/pipeline_context.py
index dd8e0518acd..c2ea4ccb334 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -29,6 +29,22 @@
 from apache_beam.transforms import core
 
 
+class Environment(object):
+  """A wrapper around the environment proto.
+
+  Provides consistency with how the other componentes are accessed.
+  """
+  def __init__(self, proto):
+    self._proto = proto
+
+  def to_runner_api(self, context):
+    return self._proto
+
+  @staticmethod
+  def from_runner_api(proto, context):
+    return Environment(proto)
+
+
 class _PipelineContextMap(object):
   """This is a bi-directional map between objects and ids.
 
@@ -87,10 +103,10 @@ class PipelineContext(object):
       'pcollections': pvalue.PCollection,
       'coders': coders.Coder,
       'windowing_strategies': core.Windowing,
-      # TODO: environment
+      'environments': Environment,
   }
 
-  def __init__(self, proto=None):
+  def __init__(self, proto=None, default_environment_url=None):
     if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor):
       proto = beam_runner_api_pb2.Components(
           coders=dict(proto.coders.items()),
@@ -100,6 +116,13 @@ def __init__(self, proto=None):
       setattr(
           self, name, _PipelineContextMap(
               self, cls, getattr(proto, name, None)))
+    if default_environment_url:
+      self._default_environment_id = self.environments.get_id(
+          Environment(
+              beam_runner_api_pb2.Environment(
+                  url=default_environment_url)))
+    else:
+      self._default_environment_id = None
 
   @staticmethod
   def from_runner_api(proto):
@@ -110,3 +133,6 @@ def to_runner_api(self):
     for name in self._COMPONENT_TYPES:
       getattr(self, name).populate_map(getattr(context_proto, name))
     return context_proto
+
+  def default_environment_id(self):
+    return self._default_environment_id
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner.py 
b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index 152b71c3bc6..2753f6186dc 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -37,6 +37,7 @@
 from apache_beam.portability.api import beam_job_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2_grpc
 from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners import pipeline_context
 from apache_beam.runners import runner
 from apache_beam.runners.portability import fn_api_runner
 
@@ -61,7 +62,8 @@ def __init__(
       self,
       use_grpc=True,
       use_subprocesses=False,
-      runner_api_address=None):
+      runner_api_address=None,
+      docker_image=None):
     if use_subprocesses and not use_grpc:
       raise ValueError("GRPC must be used with subprocesses")
     super(UniversalLocalRunner, self).__init__()
@@ -72,6 +74,7 @@ def __init__(
     self._job_service_lock = threading.Lock()
     self._subprocess = None
     self._runner_api_address = runner_api_address
+    self._docker_image = docker_image or self.default_docker_image()
 
   def __del__(self):
     # Best effort to not leave any dangling processes around.
@@ -83,6 +86,16 @@ def cleanup(self):
       time.sleep(0.1)
     self._subprocess = None
 
+  @staticmethod
+  def default_docker_image():
+    if 'USER' in os.environ:
+      # Perhaps also test if this was built?
+      logging.info('Using latest locally built Python SDK docker image.')
+      return os.environ['USER'] + 
'-docker.apache.bintray.io/beam/python:latest'
+    else:
+      logging.warning('Could not find a Python SDK docker image.')
+      return 'unknown'
+
   def _get_job_service(self):
     with self._job_service_lock:
       if not self._job_service:
@@ -151,7 +164,9 @@ def run_pipeline(self, pipeline):
     # (windowed in Fn API, but *un*windowed in runner API), whereas the
     # FnApiRunner treats them consistently, so we must guard this.
     # See also BEAM-2717.
-    proto_pipeline, proto_context = pipeline.to_runner_api(return_context=True)
+    proto_context = pipeline_context.PipelineContext(
+        default_environment_url=self._docker_image)
+    proto_pipeline = pipeline.to_runner_api(context=proto_context)
     if self._runner_api_address:
       for pcoll in proto_pipeline.components.pcollections.values():
         if pcoll.coder_id not in proto_context.coders:
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 468d86df532..60c18f1fa3a 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -863,6 +863,7 @@ def to_runner_api_parameter(self, context):
         common_urns.PARDO_TRANSFORM,
         beam_runner_api_pb2.ParDoPayload(
             do_fn=beam_runner_api_pb2.SdkFunctionSpec(
+                environment_id=context.default_environment_id(),
                 spec=beam_runner_api_pb2.FunctionSpec(
                     urn=python_urns.PICKLED_DOFN_INFO,
                     payload=picked_pardo_fn_data)),
diff --git a/sdks/python/apache_beam/utils/urns.py 
b/sdks/python/apache_beam/utils/urns.py
index a2f040f881a..ba3c6f7ae5c 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -94,6 +94,7 @@ def to_runner_api(self, context):
     from apache_beam.portability.api import beam_runner_api_pb2
     urn, typed_param = self.to_runner_api_parameter(context)
     return beam_runner_api_pb2.SdkFunctionSpec(
+        environment_id=context.default_environment_id(),
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=urn,
             payload=typed_param.SerializeToString()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 94490)
    Time Spent: 2h 50m  (was: 2h 40m)

> Python SDK should set the environment in the job submission protos
> ------------------------------------------------------------------
>
>                 Key: BEAM-4097
>                 URL: https://issues.apache.org/jira/browse/BEAM-4097
>             Project: Beam
>          Issue Type: Task
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Assignee: Robert Bradshaw
>            Priority: Major
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to