Repository: beam
Updated Branches:
  refs/heads/master f13a84d67 -> 2c2424cb4


Runner API context helper classes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc76a186
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc76a186
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc76a186

Branch: refs/heads/master
Commit: bc76a186099568ef292ceb007388ae7174150bc2
Parents: 3bb125e
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Tue Mar 7 12:04:27 2017 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Thu Mar 9 20:29:00 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py | 62 ++++++++++++++++++++++++++++++++
 1 file changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bc76a186/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 7db39a9..4ec2e47 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,11 +52,14 @@ import os
 import shutil
 import tempfile
 
+from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.internal import pickler
 from apache_beam.runners import create_runner
 from apache_beam.runners import PipelineRunner
+from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.transforms import core
 from apache_beam.transforms import ptransform
 from apache_beam.typehints import TypeCheckError
 from apache_beam.utils.pipeline_options import PipelineOptions
@@ -440,3 +443,62 @@ class AppliedPTransform(object):
         if v not in visited:
           visited.add(v)
           visitor.visit_value(v, self)
+
+
+class PipelineContextMap(object):
+  """This is a bi-directional map between objects and ids.
+
+  Under the hood it encodes and decodes these objects into runner API
+  representations.
+  """
+  def __init__(self, context, obj_type, proto_map=None):
+    self._pipeline_context = context
+    self._obj_type = obj_type
+    self._obj_to_id = {}
+    self._id_to_obj = {}
+    self._id_to_proto = proto_map if proto_map else {}
+    self._counter = 0
+
+  def _unique_ref(self):
+    self._counter += 1
+    return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
+
+  def populate_map(self, proto_map):
+    for id, obj in self._id_to_obj:
+      proto_map[id] = self._id_to_proto[id]
+
+  def get_id(self, obj):
+    if obj not in self._obj_to_id:
+      id = self._unique_ref()
+      self._id_to_obj[id] = obj
+      self._obj_to_id[obj] = id
+      self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
+    return self._obj_to_id[obj]
+
+  def get_by_id(self, id):
+    if id not in self._id_to_obj:
+      self._id_to_obj[id] = self._obj_type.from_runner_api(
+        self._id_to_proto[id], self._pipeline_context)
+    return self._id_to_obj[id]
+
+
+class PipelineContext(object):
+
+  _COMPONENT_TYPES = {
+    'transforms': AppliedPTransform,
+    'pcollections': pvalue.PCollection,
+    'coders': coders.Coder,
+    'windowing_strategies': core.Windowing,
+    # TODO: environment
+  }
+
+  def __init__(self, context_proto=None):
+    for name, cls in self._COMPONENT_TYPES.items():
+      setattr(self, name,
+              PipelineContextMap(self, cls, getattr(context_proto, name, 
None)))
+
+  def to_runner_api(self):
+    context_proto = beam_runner_api_pb2.Components()
+    for name, cls in self._COMPONENT_TYEPS:
+      getattr(self, name).populate_map(getattr(context_proto, name))
+    return context_proto

Reply via email to