[ 
https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=350717&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-350717
 ]

ASF GitHub Bot logged work on BEAM-8335:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Nov/19 22:51
            Start Date: 27/Nov/19 22:51
    Worklog Time Spent: 10m 
      Work Description: rohdesamuel commented on pull request #10236: 
[BEAM-8335] Add method to PipelineInstrument to create background caching 
pipline
URL: https://github.com/apache/beam/pull/10236#discussion_r351526972
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
 ##########
 @@ -95,17 +119,129 @@ def instrumented_pipeline_proto(self):
     """Always returns a new instance of portable instrumented proto."""
     return self._pipeline.to_runner_api(use_fake_coders=True)
 
+  def _required_components(self, pipeline_proto, required_transforms_ids):
+    """Returns the components and subcomponents of the given transforms.
+
+    This method returns all the components (transforms, PCollections, coders,
+    and windowing stratgies) related to the given transforms and to all of 
their
+    subtransforms. This method accomplishes this recursively.
+    """
+    transforms = pipeline_proto.components.transforms
+    pcollections = pipeline_proto.components.pcollections
+    coders = pipeline_proto.components.coders
+    windowing_strategies = pipeline_proto.components.windowing_strategies
+
+    # Cache the transforms that will be copied into the new pipeline proto.
+    required_transforms = {k: transforms[k] for k in required_transforms_ids}
+
+    # Cache all the output PCollections of the transforms.
+    pcollection_ids = [pc for t in required_transforms.values()
+                       for pc in t.outputs.values()]
+    required_pcollections = {pc_id: pcollections[pc_id]
+                             for pc_id in pcollection_ids}
+
+    # Cache all the PCollection coders.
+    coder_ids = [pc.coder_id for pc in required_pcollections.values()]
+    required_coders = {c_id: coders[c_id] for c_id in coder_ids}
+
+    # Cache all the windowing strategy ids.
+    windowing_strategies_ids = [pc.windowing_strategy_id
+                                for pc in required_pcollections.values()]
+    required_windowing_strategies = {ws_id: windowing_strategies[ws_id]
+                                     for ws_id in windowing_strategies_ids}
+
+    subtransforms = {}
+    subpcollections = {}
+    subcoders = {}
+    subwindowing_strategies = {}
+
+    # Recursively go through all the subtransforms and add their components.
+    for transform_id, transform in required_transforms.items():
+      if transform_id in pipeline_proto.root_transform_ids:
+        continue
+      (t, pc, c, ws) = self._required_components(pipeline_proto,
+                                                 transform.subtransforms)
+      subtransforms.update(t)
+      subpcollections.update(pc)
+      subcoders.update(c)
+      subwindowing_strategies.update(ws)
+
+    # Now we got all the components and their subcomponents, so return the
+    # complete collection.
+    required_transforms.update(subtransforms)
+    required_pcollections.update(subpcollections)
+    required_coders.update(subcoders)
+    required_windowing_strategies.update(subwindowing_strategies)
+
+    return (required_transforms, required_pcollections, required_coders,
+            required_windowing_strategies)
+
+  def background_caching_pipeline_proto(self):
+    """Returns the background caching pipeline.
+
+    This method creates a background caching pipeline from the original user
+    pipeline by: adding writes to cache from each unbounded source (done in the
+    instrument method), and cutting out all components (transform, 
PCollections,
+    coders, windowing strategies) that are not the unbounded sources or writes
+    to cache (or subtransforms thereof).
+    """
+    # Create the pipeline_proto to read all the components from. It will later
+    # create a new pipeline proto from the cut out components.
+    pipeline_proto = self._background_caching_pipeline.to_runner_api(
+        return_context=False, use_fake_coders=True)
+
+    # Get all the sources we want to cache.
+    sources = unbounded_sources(self._background_caching_pipeline)
+
+    # Get all the root transforms. The caching transforms will be subtransforms
+    # of one of these roots.
+    roots = pipeline_proto.root_transform_ids
+
+    # Get the transform IDs of the caching transforms.
+    transforms = pipeline_proto.components.transforms
+    caching_transform_ids = [t_id for root in roots
 
 Review comment:
   Thanks for the clarifying question. I reworded the comment here and in the 
method comment header.
   
   I think my wording in the method comment and here were insufficient. This 
method is making a background caching pipeline proto from the 
_background_caching_pipeline. The writes to cache are added to the 
_background_caching_pipeline in the instrument() method. This is to keep 
idempotency in the background_caching_pipeline_proto() method (multiple calls 
won't add multiple writes to cache).
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 350717)
    Time Spent: 36h 20m  (was: 36h 10m)

> Add streaming support to Interactive Beam
> -----------------------------------------
>
>                 Key: BEAM-8335
>                 URL: https://issues.apache.org/jira/browse/BEAM-8335
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-py-interactive
>            Reporter: Sam Rohde
>            Assignee: Sam Rohde
>            Priority: Major
>          Time Spent: 36h 20m
>  Remaining Estimate: 0h
>
> This issue tracks the work items to introduce streaming support to the 
> Interactive Beam experience. This will allow users to:
>  * Write and run a streaming job in IPython
>  * Automatically cache records from unbounded sources
>  * Add a replay experience that replays all cached records to simulate the 
> original pipeline execution
>  * Add controls to play/pause/stop/step individual elements from the cached 
> records
>  * Add ability to inspect/visualize unbounded PCollections



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to