[ 
https://issues.apache.org/jira/browse/BEAM-10308?focusedWorklogId=451800&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-451800
 ]

ASF GitHub Bot logged work on BEAM-10308:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Jun/20 23:41
            Start Date: 26/Jun/20 23:41
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#12067:
URL: https://github.com/apache/beam/pull/12067#discussion_r446452942



##########
File path: sdks/python/apache_beam/runners/pipeline_context.py
##########
@@ -49,6 +50,52 @@
   from apache_beam.coders.coder_impl import IterableStateWriter
 
 
+class _UniqueRefAssigner(object):
+  """Utility for assigning unique refs to proto messages for use in components.
+
+  Instances of _UniqueRefAssigner are global (scoped by the base string). That
+  way once a unique ref is assigned it will be used consistently across
+  PipelineContext instances.
+  """
+  _INSTANCES = {}  # type: Dict[Tuple[Pipeline, str], _UniqueRefAssigner]
+
+  def __init__(self, base):
+    self._base = base
+    self._counter = 0
+    self._obj_to_id = {}  # type: Dict[Any, str]
+
+  def get_or_assign(self, obj=None, label=None):
+    # type: (Optional[Any], Optional[str]) -> str
+
+    """Retrieve the unique ref for the given object.
+
+    Generates and assigns a unique ref if one hasn't been assigned yet. label
+    will be incorporated into the unique ref when assigning a new unique ref,
+    otherwise it is ignored."""
+    if obj not in self._obj_to_id:
+      self._obj_to_id[obj] = self._unique_ref(obj, label)
+
+    return self._obj_to_id[obj]
+
+  def _unique_ref(self, obj=None, label=None):
+    self._counter += 1
+    return "%s_%s_%d" % (self._base, label or type(obj).__name__, 
self._counter)
+
+  @classmethod
+  def get_instance(cls, pipeline, base):
+    # type: (Optional[Pipeline], str) -> _UniqueRefAssigner
+
+    """Return the _UniqueRefAssigner with the given base string.
+
+    Creates a new instance if one doesn't already exist for this base 
string."""
+    key = (id(pipeline), base)

Review comment:
       Partitioning the cached ID assignments by `id(pipeline)` is a sub-par 
and dangerous solution because `id` is only guaranteed to be unique among 
objects that have non-overlapping lifetimes. If we go with something like this 
approach, we need to at least tie these instances' lifetimes to the lifetime of 
the pipeline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 451800)
    Time Spent: 1.5h  (was: 1h 20m)

> Component id assignement is not consistent across PipelineContext instances
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-10308
>                 URL: https://issues.apache.org/jira/browse/BEAM-10308
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language, sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P1
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> The "unique ref" ids used in PipelineContext are generated on the fly, which 
> can cause us to get a different id for the same component in different 
> contexts.
> This becomes a problem when ExternalTransform is used, because it creates its 
> own pipeline context for expansion. So its possible the component ids in the 
> expansion request will actually refer to an entirely different component when 
> the pipeline is finally assembled for execution.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to