[ 
https://issues.apache.org/jira/browse/BEAM-7926?focusedWorklogId=359021&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-359021
 ]

ASF GitHub Bot logged work on BEAM-7926:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Dec/19 23:45
            Start Date: 12/Dec/19 23:45
    Worklog Time Spent: 10m 
      Work Description: rohdesamuel commented on pull request #10346: 
[BEAM-7926] Data-centric Interactive Part2
URL: https://github.com/apache/beam/pull/10346#discussion_r357426745
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
 ##########
 @@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to build pipeline fragment that produces given PCollections.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.pipeline import PipelineVisitor
+
+
+class PipelineFragment(object):
+  """A fragment of a pipeline definition.
+
+  A pipeline fragment is built from the original pipeline definition to include
+  only PTransforms that are necessary to produce the given PCollections.
+  """
+
+  def __init__(self, pcolls, options=None):
+    """Constructor of PipelineFragment.
+
+    Args:
+      pcolls: (List[PCollection]) a list of PCollections to build pipeline
+          fragment for.
+      options: (PipelineOptions) the pipeline options for the implicit
+          pipeline run.
+    """
+    assert len(pcolls) > 0, (
+        'Need at least 1 PCollection as the target data to build a pipeline '
+        'fragment that produces it.')
+    for pcoll in pcolls:
+      assert isinstance(pcoll, beam.pvalue.PCollection), (
+          '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
+    # No modification to self._user_pipeline is allowed.
+    self._user_pipeline = pcolls[0].pipeline
+    # These are user PCollections. Do not use them to deduce anything that
+    # will be executed by any runner. Instead, use
+    # `self._runner_pcolls_to_user_pcolls.keys()` to get copied PCollections.
+    self._pcolls = set(pcolls)
+    for pcoll in self._pcolls:
+      assert pcoll.pipeline is self._user_pipeline, (
+          '{} belongs to a different user pipeline than other PCollections '
+          'given and cannot be used to build a pipeline fragment that produces 
'
+          'the given PCollections.')
+    self._options = options
+
+    # A copied pipeline instance for modification without changing the user
+    # pipeline instance held by the end user. This instance can be processed
+    # into a pipeline fragment that later run by the underlying runner.
+    self._runner_pipeline = self._build_runner_pipeline()
+    _, self._context = self._runner_pipeline.to_runner_api(
+        return_context=True, use_fake_coders=True)
+    from apache_beam.runners.interactive import pipeline_instrument as instr
+    self._runner_pcoll_to_id = instr.pcolls_to_pcoll_id(
+        self._runner_pipeline, self._context)
+    # Correlate components in the runner pipeline to components in the user
+    # pipeline. The target pcolls are the pcolls given and defined in the user
+    # pipeline.
+    self._id_to_target_pcoll = self._calculate_target_pcoll_ids()
+    self._label_to_user_transform = self._calculate_user_transform_labels()
+    # This will give us the 1:1 correlation between PCollections from the 
copied
+    # runner pipeline and PCollections from the user pipeline.
+    # (Dict[PCollection, PCollection])
+    self._runner_pcolls_to_user_pcolls = {}
+    self._runner_transforms_to_user_transforms = {}
+    self._build_correlation_between_pipelines()
+
+    # Below are operated on the runner pipeline.
+    self._necessary_transforms = set()
+    self._necessary_pcollections = set()
+    self._mark_necessary_transforms_and_pcolls()
+    self._prune_runner_pipeline_to_fragment()
+
+  def deduce_fragment(self):
+    """Deduce the pipeline fragment as an apache_beam.Pipeline instance."""
+    return beam.pipeline.Pipeline.from_runner_api(
+        self._runner_pipeline.to_runner_api(use_fake_coders=True),
+        self._runner_pipeline.runner,
+        self._options)
+
+  def run(self, display_pipeline_graph=False, use_cache=True):
+    """Shorthand to run the pipeline fragment."""
+    try:
+      skip_pipeline_graph = self._runner_pipeline.runner._skip_display
+      force_compute = self._runner_pipeline.runner._force_compute
+      self._runner_pipeline.runner._skip_display = not display_pipeline_graph
+      self._runner_pipeline.runner._force_compute = not use_cache
+      return self.deduce_fragment().run()
+    finally:
+      self._runner_pipeline.runner._skip_display = skip_pipeline_graph
+      self._runner_pipeline.runner._force_compute = force_compute
+
+  def _build_runner_pipeline(self):
+    return beam.pipeline.Pipeline.from_runner_api(
+        self._user_pipeline.to_runner_api(use_fake_coders=True),
+        self._user_pipeline.runner,
+        self._options)
+
+  def _calculate_target_pcoll_ids(self):
+    pcoll_id_to_target_pcoll = {}
+    for pcoll in self._pcolls:
+      pcoll_id_to_target_pcoll[self._runner_pcoll_to_id.get(str(pcoll),
+                                                            '')] = pcoll
+    return pcoll_id_to_target_pcoll
+
+  def _calculate_user_transform_labels(self):
+    label_to_user_transform = {}
+
+    class UserTransformVisitor(PipelineVisitor):
+
+      def enter_composite_transform(self, transform_node):
+        self.visit_transform(transform_node)
+
+      def visit_transform(self, transform_node):
+        if transform_node is not None:
+          label_to_user_transform[transform_node.full_label] = transform_node
+
+    v = UserTransformVisitor()
+    self._runner_pipeline.visit(v)
+    return label_to_user_transform
+
+  def _build_correlation_between_pipelines(self):
+
+    class CorrelationVisitor(PipelineVisitor):
+
+      def __init__(self, pf):
+        self._pf = pf
+
+      def enter_composite_transform(self, transform_node):
+        self.visit_transform(transform_node)
+
+      def visit_transform(self, transform_node):
+        self._process_transform(transform_node)
+        for in_pcoll in transform_node.inputs:
+          self._process_pcoll(in_pcoll)
+        for out_pcoll in transform_node.outputs.values():
+          self._process_pcoll(out_pcoll)
+
+      def _process_pcoll(self, pcoll):
+        pcoll_id = self._pf._runner_pcoll_to_id.get(str(pcoll), '')
+        if pcoll_id in self._pf._id_to_target_pcoll:
+          self._pf._runner_pcolls_to_user_pcolls[pcoll] = (
+              self._pf._id_to_target_pcoll[pcoll_id])
+
+      def _process_transform(self, transform_node):
+        if transform_node.full_label in self._pf._label_to_user_transform:
+          self._pf._runner_transforms_to_user_transforms[transform_node] = (
+              self._pf._label_to_user_transform[transform_node.full_label])
+
+    v = CorrelationVisitor(self)
+    self._runner_pipeline.visit(v)
+
+  def _mark_necessary_transforms_and_pcolls(self):
+    all_inputs = set()
+    updated_all_inputs = set(self._runner_pcolls_to_user_pcolls.keys())
+    # Do this until no more new PCollection is recorded.
+    while len(updated_all_inputs) != len(all_inputs):
+      all_inputs = set(updated_all_inputs)
+      for pcoll in all_inputs:
+        producer = pcoll.producer
+        while producer:
+          if producer in self._necessary_transforms:
+            break
+          # Mark the AppliedPTransform as necessary.
+          self._necessary_transforms.add(producer)
+          # Record all necessary input and side input PCollections.
+          updated_all_inputs.update(producer.inputs)
+          # pylint: disable=map-builtin-not-iterating
+          side_input_pvalues = set(
+              map(lambda side_input: side_input.pvalue,
+                  producer.side_inputs))
+          updated_all_inputs.update(side_input_pvalues)
+          # Go to its parent AppliedPTransform.
+          producer = producer.parent
+    self._necessary_pcollections = all_inputs
+
+  def _prune_runner_pipeline_to_fragment(self):
+
+    class PruneVisitor(PipelineVisitor):
+
+      def __init__(self, pf):
+        self._pf = pf
+
+      def enter_composite_transform(self, transform_node):
+        pruned_parts = list(transform_node.parts)
+        for part in transform_node.parts:
+          if part not in self._pf._necessary_transforms:
+            pruned_parts.remove(part)
+        transform_node.parts = pruned_parts
+        self.visit_transform(transform_node)
+
+      def visit_transform(self, transform_node):
+        if transform_node not in self._pf._necessary_transforms:
+          transform_node.parent = None
+
+    v = PruneVisitor(self)
+    self._runner_pipeline.visit(v)
 
 Review comment:
   I leave it be for now, but I still have my reservations. I think that this 
works by some weird trick of the implementation. It's actually more surprising 
that the API allows for the modification of the graph while iterating over it.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 359021)
    Time Spent: 31h 40m  (was: 31.5h)

> Show PCollection with Interactive Beam in a data-centric user flow
> ------------------------------------------------------------------
>
>                 Key: BEAM-7926
>                 URL: https://issues.apache.org/jira/browse/BEAM-7926
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-py-interactive
>            Reporter: Ning Kang
>            Assignee: Ning Kang
>            Priority: Major
>          Time Spent: 31h 40m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>  
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g.,
> {code:java}
> show(pcoll, pcoll2)
> {code}
> Throughout the process, a pipeline fragment is built to include only 
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and 
> execute that fragment.
> This makes the Interactive Beam user flow data-centric.
>  
> Detailed 
> [design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to