[ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=425817&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-425817
 ]

ASF GitHub Bot logged work on BEAM-9639:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Apr/20 18:15
            Start Date: 21/Apr/20 18:15
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #11270:
URL: https://github.com/apache/beam/pull/11270#discussion_r412384268



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -75,21 +75,27 @@
 
 IMPULSE_BUFFER = b'impulse'
 
+# SideInputId is identified by a consumer ParDo + tag.
+SideInputId = Tuple[str, str]
+
+DataSideInput = Dict[SideInputId,
+                     Tuple[bytes, beam_runner_api_pb2.FunctionSpec]]
+
 
 class Stage(object):
   """A set of Transforms that can be sent to the worker for processing."""
   def __init__(self,
                name,  # type: str
                transforms,  # type: List[beam_runner_api_pb2.PTransform]
-               downstream_side_inputs=None,  # type: Optional[FrozenSet[str]]
+               downstream_side_inputs=None,  # type: Optional[Dict[str, 
SideInputId]]

Review comment:
       Discussed offline, but capturing here for the record. These sets contain 
the transitive collection of everything downstream of any side-input consuming 
transform, and as such can be large even if the total number of side inputs is 
small. (The number of distinct such sets is about the same as the number of 
side inputs, so we keep the total memory use down by re-using them--to give 
each transform its own copy would easily be O(n^2).)
   
   Your change of computing the side input mapping after the graph has been 
fused is good (and arguably better, as you only need the immediate consumers, 
and don't have to re-compute each time a stage is fused).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 425817)
    Time Spent: 5h 10m  (was: 5h)

> Abstract bundle execution logic from stage execution logic
> ----------------------------------------------------------
>
>                 Key: BEAM-9639
>                 URL: https://issues.apache.org/jira/browse/BEAM-9639
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to