[
https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=315312&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-315312
]
ASF GitHub Bot logged work on BEAM-7760:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Sep/19 21:19
Start Date: 19/Sep/19 21:19
Worklog Time Spent: 10m
Work Description: KevinGG commented on pull request #9619: [BEAM-7760]
Added pipeline_instrument module
URL: https://github.com/apache/beam/pull/9619
1. Added the pipeline_instrument module to automatically instrument a
given Interactive Beam pipeline by mutating it with additional cache
based PTransforms if available so that, within an interactive
environment, each pipeline run could have effect against the future
runs to provide an interactive experience when executing Beam
pipelines.
2. The pipeline_instrument module will replace pipeline_analyzer module
when the integration with a re-written of display module interfaces is
done since the interactivity instrument (i.e., parameters passed and
used) has changed. Most of the display logic won't change.
3. An optional pruning logic is marked as TODO so that when executing
an instrumented pipeline, any sub graph doesn't generate new states
should not be re-executed if implemented.
4. Tests included.
5. The philosophy is to keep the pipeline instance defined by user code
intact and mutate directly on a copied pipeline instance; to always
convert the instrumented pipeline to a portable pipeline and pass it
to runner for execution; to maintain the mapping relationship from
original user defined pipeline to instrumented copied pipeline instances
and jobs executed by runners.
6. Additional complexity occurs when there are multiple pipeline
instances defined in user code, multiple runners instantiated, and
multiple jobs running from those pipeline instances by the runner
instances. Currently, the only guarantee is that a pipeline result
bounded to a job must be the return of a run by a runner and originate
from a pipeline instance. Some design change proposals are marked as TODO
around the cache scoping to solve the context problem in interactive
environment: when and what to instrument when the additional complexity
occurs.
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [x] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
XLang | --- | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
| --- | --- | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 315312)
Time Spent: 9h (was: 8h 50m)
> Interactive Beam Caching PCollections bound to user defined vars in notebook
> ----------------------------------------------------------------------------
>
> Key: BEAM-7760
> URL: https://issues.apache.org/jira/browse/BEAM-7760
> Project: Beam
> Issue Type: New Feature
> Components: examples-python
> Reporter: Ning Kang
> Assignee: Ning Kang
> Priority: Major
> Time Spent: 9h
> Remaining Estimate: 0h
>
> Cache only PCollections bound to user defined variables in a pipeline when
> running pipeline with interactive runner in jupyter notebooks.
> [Interactive
> Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
> has been caching and using caches of "leaf" PCollections for interactive
> execution in jupyter notebooks.
> The interactive execution is currently supported so that when appending new
> transforms to existing pipeline for a new run, executed part of the pipeline
> doesn't need to be re-executed.
> A PCollection is "leaf" when it is never used as input in any PTransform in
> the pipeline.
> The problem with building caches and pipeline to execute around "leaf" is
> that when a PCollection is consumed by a sink with no output, the pipeline to
> execute built will miss the subgraph generating and consuming that
> PCollection.
> An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty
> pipeline.
> Caching around PCollections bound to user defined variables and replacing
> transforms with source and sink of caches could resolve the pipeline to
> execute properly under the interactive execution scenario. Also, cached
> PCollection now can trace back to user code and can be used for user data
> visualization if user wants to do it.
> E.g.,
> {code:java}
> // ...
> p = beam.Pipeline(interactive_runner.InteractiveRunner(),
> options=pipeline_options)
> messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
> messages | "Write" >> beam.io.WriteToPubSub(topic_path)
> result = p.run()
> // ...
> visualize(messages){code}
> The interactive runner automatically figures out that PCollection
> {code:java}
> messages{code}
> created by
> {code:java}
> p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}
> should be cached and reused if the notebook user appends more transforms.
> And once the pipeline gets executed, the user could use any
> visualize(PCollection) module to visualize the data statically (batch) or
> dynamically (stream)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)