[ 
https://issues.apache.org/jira/browse/BEAM-7926?focusedWorklogId=358181&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358181
 ]

ASF GitHub Bot logged work on BEAM-7926:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Dec/19 23:30
            Start Date: 11/Dec/19 23:30
    Worklog Time Spent: 10m 
      Work Description: KevinGG commented on pull request #10346: [BEAM-7926] 
Data-centric Interactive Part2
URL: https://github.com/apache/beam/pull/10346#discussion_r356890655
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
 ##########
 @@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to build pipeline fragment that produces given PCollections.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.pipeline import PipelineVisitor
+
+
+class PipelineFragment(object):
+  """A fragment of a pipeline definition.
+
+  A pipeline fragment is built from the original pipeline definition to include
+  only PTransforms that are necessary to produce the given PCollections.
+  """
+
+  def __init__(self, pcolls, options=None):
+    """Constructor of PipelineFragment.
+
+    Args:
+      pcolls: (List[PCollection]) a list of PCollections to build pipeline
+          fragment for.
+      options: (PipelineOptions) the pipeline options for the implicit
+          pipeline run.
+    """
+    assert len(pcolls) > 0, (
+        'Need at least 1 PCollection as the target data to build a pipeline '
+        'fragment that produces it.')
+    for pcoll in pcolls:
+      assert isinstance(pcoll, beam.pvalue.PCollection), (
+          '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
+    # No modification to self._user_pipeline is allowed.
+    self._user_pipeline = pcolls[0].pipeline
+    # These are user PCollections. Do not use them to deduce anything that
+    # will be executed by any runner. Instead, use
+    # `self._runner_pcolls_to_user_pcolls.keys()` to get copied PCollections.
+    self._pcolls = set(pcolls)
+    for pcoll in self._pcolls:
+      assert pcoll.pipeline is self._user_pipeline, (
+          '{} belongs to a different user pipeline than other PCollections '
+          'given and cannot be used to build a pipeline fragment that produces 
'
+          'the given PCollections.')
+    self._options = options
+
+    # A copied pipeline instance for modification without changing the user
+    # pipeline instance held by the end user. This instance can be processed
+    # into a pipeline fragment that later run by the underlying runner.
+    self._runner_pipeline = self._build_runner_pipeline()
+    _, self._context = self._runner_pipeline.to_runner_api(
+        return_context=True, use_fake_coders=True)
+    from apache_beam.runners.interactive import pipeline_instrument as instr
+    self._runner_pcoll_to_id = instr.pcolls_to_pcoll_id(
+        self._runner_pipeline, self._context)
+    # Correlate components in the runner pipeline to components in the user
+    # pipeline. The target pcolls are the pcolls given and defined in the user
+    # pipeline.
+    self._id_to_target_pcoll = self._calculate_target_pcoll_ids()
+    self._label_to_user_transform = self._calculate_user_transform_labels()
+    # This will give us the 1:1 correlation between PCollections from the 
copied
+    # runner pipeline and PCollections from the user pipeline.
+    # (Dict[PCollection, PCollection])
+    self._runner_pcolls_to_user_pcolls = {}
+    self._runner_transforms_to_user_transforms = {}
+    self._build_correlation_between_pipelines()
 
 Review comment:
   `show(pcoll1, pcoll2, pcoll3)` given by the user shows PCollections defined 
in the user pipeline.
   When building a fragment, the pipeline deduced is a runner pipeline (mutated 
standalone copy of the user pipeline). Without the correlations, we don't know 
what PCollections are `pcoll1, pcoll2, pcoll3` in the runner pipeline anymore.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 358181)
    Time Spent: 30h 20m  (was: 30h 10m)

> Show PCollection with Interactive Beam in a data-centric user flow
> ------------------------------------------------------------------
>
>                 Key: BEAM-7926
>                 URL: https://issues.apache.org/jira/browse/BEAM-7926
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-py-interactive
>            Reporter: Ning Kang
>            Assignee: Ning Kang
>            Priority: Major
>          Time Spent: 30h 20m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>  
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g.,
> {code:java}
> show(pcoll, pcoll2)
> {code}
> Throughout the process, a pipeline fragment is built to include only 
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and 
> execute that fragment.
> This makes the Interactive Beam user flow data-centric.
>  
> Detailed 
> [design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to