[
https://issues.apache.org/jira/browse/BEAM-7926?focusedWorklogId=359030&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-359030
]
ASF GitHub Bot logged work on BEAM-7926:
----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Dec/19 00:05
Start Date: 13/Dec/19 00:05
Worklog Time Spent: 10m
Work Description: KevinGG commented on pull request #10346: [BEAM-7926]
Data-centric Interactive Part2
URL: https://github.com/apache/beam/pull/10346#discussion_r357431373
##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to build pipeline fragment that produces given PCollections.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.pipeline import PipelineVisitor
+
+
+class PipelineFragment(object):
+ """A fragment of a pipeline definition.
+
+ A pipeline fragment is built from the original pipeline definition to include
+ only PTransforms that are necessary to produce the given PCollections.
+ """
+
+ def __init__(self, pcolls, options=None):
+ """Constructor of PipelineFragment.
+
+ Args:
+ pcolls: (List[PCollection]) a list of PCollections to build pipeline
+ fragment for.
+ options: (PipelineOptions) the pipeline options for the implicit
+ pipeline run.
+ """
+ assert len(pcolls) > 0, (
+ 'Need at least 1 PCollection as the target data to build a pipeline '
+ 'fragment that produces it.')
+ for pcoll in pcolls:
+ assert isinstance(pcoll, beam.pvalue.PCollection), (
+ '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
+ # No modification to self._user_pipeline is allowed.
+ self._user_pipeline = pcolls[0].pipeline
+ # These are user PCollections. Do not use them to deduce anything that
+ # will be executed by any runner. Instead, use
+ # `self._runner_pcolls_to_user_pcolls.keys()` to get copied PCollections.
+ self._pcolls = set(pcolls)
+ for pcoll in self._pcolls:
+ assert pcoll.pipeline is self._user_pipeline, (
+ '{} belongs to a different user pipeline than other PCollections '
+ 'given and cannot be used to build a pipeline fragment that produces
'
+ 'the given PCollections.')
+ self._options = options
+
+ # A copied pipeline instance for modification without changing the user
+ # pipeline instance held by the end user. This instance can be processed
+ # into a pipeline fragment that later run by the underlying runner.
+ self._runner_pipeline = self._build_runner_pipeline()
+ _, self._context = self._runner_pipeline.to_runner_api(
+ return_context=True, use_fake_coders=True)
+ from apache_beam.runners.interactive import pipeline_instrument as instr
+ self._runner_pcoll_to_id = instr.pcolls_to_pcoll_id(
+ self._runner_pipeline, self._context)
+ # Correlate components in the runner pipeline to components in the user
+ # pipeline. The target pcolls are the pcolls given and defined in the user
+ # pipeline.
+ self._id_to_target_pcoll = self._calculate_target_pcoll_ids()
+ self._label_to_user_transform = self._calculate_user_transform_labels()
+ # This will give us the 1:1 correlation between PCollections from the
copied
+ # runner pipeline and PCollections from the user pipeline.
+ # (Dict[PCollection, PCollection])
+ self._runner_pcolls_to_user_pcolls = {}
+ self._runner_transforms_to_user_transforms = {}
+ self._build_correlation_between_pipelines()
+
+ # Below are operated on the runner pipeline.
+ self._necessary_transforms = set()
+ self._necessary_pcollections = set()
+ self._mark_necessary_transforms_and_pcolls()
+ self._prune_runner_pipeline_to_fragment()
+
+ def deduce_fragment(self):
+ """Deduce the pipeline fragment as an apache_beam.Pipeline instance."""
+ return beam.pipeline.Pipeline.from_runner_api(
+ self._runner_pipeline.to_runner_api(use_fake_coders=True),
+ self._runner_pipeline.runner,
+ self._options)
+
+ def run(self, display_pipeline_graph=False, use_cache=True):
+ """Shorthand to run the pipeline fragment."""
+ try:
+ skip_pipeline_graph = self._runner_pipeline.runner._skip_display
+ force_compute = self._runner_pipeline.runner._force_compute
+ self._runner_pipeline.runner._skip_display = not display_pipeline_graph
+ self._runner_pipeline.runner._force_compute = not use_cache
+ return self.deduce_fragment().run()
+ finally:
+ self._runner_pipeline.runner._skip_display = skip_pipeline_graph
+ self._runner_pipeline.runner._force_compute = force_compute
+
+ def _build_runner_pipeline(self):
+ return beam.pipeline.Pipeline.from_runner_api(
+ self._user_pipeline.to_runner_api(use_fake_coders=True),
+ self._user_pipeline.runner,
+ self._options)
+
+ def _calculate_target_pcoll_ids(self):
+ pcoll_id_to_target_pcoll = {}
+ for pcoll in self._pcolls:
+ pcoll_id_to_target_pcoll[self._runner_pcoll_to_id.get(str(pcoll),
+ '')] = pcoll
+ return pcoll_id_to_target_pcoll
+
+ def _calculate_user_transform_labels(self):
+ label_to_user_transform = {}
+
+ class UserTransformVisitor(PipelineVisitor):
+
+ def enter_composite_transform(self, transform_node):
+ self.visit_transform(transform_node)
+
+ def visit_transform(self, transform_node):
+ if transform_node is not None:
+ label_to_user_transform[transform_node.full_label] = transform_node
+
+ v = UserTransformVisitor()
+ self._runner_pipeline.visit(v)
+ return label_to_user_transform
+
+ def _build_correlation_between_pipelines(self):
+
+ class CorrelationVisitor(PipelineVisitor):
+
+ def __init__(self, pf):
+ self._pf = pf
+
+ def enter_composite_transform(self, transform_node):
+ self.visit_transform(transform_node)
+
+ def visit_transform(self, transform_node):
+ self._process_transform(transform_node)
+ for in_pcoll in transform_node.inputs:
+ self._process_pcoll(in_pcoll)
+ for out_pcoll in transform_node.outputs.values():
+ self._process_pcoll(out_pcoll)
+
+ def _process_pcoll(self, pcoll):
+ pcoll_id = self._pf._runner_pcoll_to_id.get(str(pcoll), '')
+ if pcoll_id in self._pf._id_to_target_pcoll:
+ self._pf._runner_pcolls_to_user_pcolls[pcoll] = (
+ self._pf._id_to_target_pcoll[pcoll_id])
+
+ def _process_transform(self, transform_node):
+ if transform_node.full_label in self._pf._label_to_user_transform:
+ self._pf._runner_transforms_to_user_transforms[transform_node] = (
+ self._pf._label_to_user_transform[transform_node.full_label])
+
+ v = CorrelationVisitor(self)
+ self._runner_pipeline.visit(v)
+
+ def _mark_necessary_transforms_and_pcolls(self):
+ all_inputs = set()
+ updated_all_inputs = set(self._runner_pcolls_to_user_pcolls.keys())
+ # Do this until no more new PCollection is recorded.
+ while len(updated_all_inputs) != len(all_inputs):
+ all_inputs = set(updated_all_inputs)
+ for pcoll in all_inputs:
+ producer = pcoll.producer
+ while producer:
+ if producer in self._necessary_transforms:
+ break
+ # Mark the AppliedPTransform as necessary.
+ self._necessary_transforms.add(producer)
+ # Record all necessary input and side input PCollections.
+ updated_all_inputs.update(producer.inputs)
+ # pylint: disable=map-builtin-not-iterating
+ side_input_pvalues = set(
+ map(lambda side_input: side_input.pvalue,
+ producer.side_inputs))
+ updated_all_inputs.update(side_input_pvalues)
+ # Go to its parent AppliedPTransform.
+ producer = producer.parent
+ self._necessary_pcollections = all_inputs
+
+ def _prune_runner_pipeline_to_fragment(self):
+
+ class PruneVisitor(PipelineVisitor):
+
+ def __init__(self, pf):
+ self._pf = pf
+
+ def enter_composite_transform(self, transform_node):
+ pruned_parts = list(transform_node.parts)
+ for part in transform_node.parts:
+ if part not in self._pf._necessary_transforms:
+ pruned_parts.remove(part)
+ transform_node.parts = pruned_parts
+ self.visit_transform(transform_node)
+
+ def visit_transform(self, transform_node):
+ if transform_node not in self._pf._necessary_transforms:
+ transform_node.parent = None
+
+ v = PruneVisitor(self)
+ self._runner_pipeline.visit(v)
Review comment:
Got it. Yeah, I had the same hunch. But then think about how we mutate
linked list, and here, mutating a graph (actually a tree). It's very common to
move things around in a tree/graph, like all the tree balancing logic, they all
happen in-place without relying on converting a tree to a serialized
representation. It's actually not a "bad" practice as long as we are not
mutating the pipeline defined by the user.
A proto and a pipeline object are basically two representations of a graph
similar to adjacency list vs. graph object. Proto is more platform-agnostic and
language-agnostic while the pipeline object is only platform-agnostic (but
represented in Python, or need different implementations in different
platforms).
To me, proto is something that gets passed across systems, and I'd like it
to be used as immutable medium when I pass the representation of a pipeline
object around. When a mutation is needed, we deserialize it into a mutable
pipeline object.
And the pipeline object is designed to be mutable and to be used to
construct pipelines, then we just mutate it when we see fit.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 359030)
Time Spent: 32h (was: 31h 50m)
> Show PCollection with Interactive Beam in a data-centric user flow
> ------------------------------------------------------------------
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
> Issue Type: New Feature
> Components: runner-py-interactive
> Reporter: Ning Kang
> Assignee: Ning Kang
> Priority: Major
> Time Spent: 32h
> Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g.,
> {code:java}
> show(pcoll, pcoll2)
> {code}
> Throughout the process, a pipeline fragment is built to include only
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and
> execute that fragment.
> This makes the Interactive Beam user flow data-centric.
>
> Detailed
> [design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)