rohdesamuel commented on a change in pull request #12704:
URL: https://github.com/apache/beam/pull/12704#discussion_r486535437



##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -342,121 +343,79 @@ def show(*pcolls, **configs):
     assert isinstance(pcoll, beam.pvalue.PCollection), (
         '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
   user_pipeline = pcolls[0].pipeline
-  for pcoll in pcolls:
-    assert pcoll.pipeline is user_pipeline, (
-        '{} belongs to a different user-defined pipeline ({}) than that of'
-        ' other PCollections ({}).'.format(
-            pcoll, pcoll.pipeline, user_pipeline))
+
   # TODO(BEAM-8288): Remove below pops and assertion once Python 2 is
   # deprecated from Beam.
   include_window_info = configs.pop('include_window_info', False)
   visualize_data = configs.pop('visualize_data', False)
+  n = configs.pop('n', 'inf')
+  duration = configs.pop('duration', 'inf')
+
+  if n == 'inf':
+    n = float('inf')
+
+  if duration == 'inf':
+    duration = float('inf')
+
   # This assertion is to protect the backward compatibility for function
   # signature change after Python 2 deprecation.
   assert not configs, (
       'The only configs supported are include_window_info and '
       'visualize_data.')
-  runner = user_pipeline.runner
-  if isinstance(runner, ir.InteractiveRunner):
-    runner = runner._underlying_runner
-
-  # Make sure that sources without a user reference are still cached.
-  pi.watch_sources(user_pipeline)
-
-  # Make sure that all PCollections to be shown are watched. If a PCollection
-  # has not been watched, make up a variable name for that PCollection and 
watch
-  # it. No validation is needed here because the watch logic can handle
-  # arbitrary variables.
-  watched_pcollections = set()
-  for watching in ie.current_env().watching():
-    for _, val in watching:
-      if hasattr(val, '__class__') and isinstance(val, 
beam.pvalue.PCollection):
-        watched_pcollections.add(val)
-  for pcoll in pcolls:
-    if pcoll not in watched_pcollections:
-      watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
-
-  if ie.current_env().is_in_ipython:
-    warnings.filterwarnings(
-        'ignore',
-        'options is deprecated since First stable release. References to '
-        '<pipeline>.options will not be supported',
-        category=DeprecationWarning)
-  # Attempt to run background caching job since we have the reference to the
-  # user-defined pipeline.
-  bcj.attempt_to_run_background_caching_job(
-      runner, user_pipeline, user_pipeline.options)
-
-  pcolls = set(pcolls)
-  computed_pcolls = set()
-  for pcoll in pcolls:
-    if pcoll in ie.current_env().computed_pcollections:
-      computed_pcolls.add(pcoll)
-  pcolls = pcolls.difference(computed_pcolls)
-  # If in notebook, static plotting computed pcolls as computation is done.
-  if ie.current_env().is_in_notebook:
-    for pcoll in computed_pcolls:
-      visualize(
-          pcoll,
-          include_window_info=include_window_info,
-          display_facets=visualize_data)
-  elif ie.current_env().is_in_ipython:
-    for pcoll in computed_pcolls:
-      visualize(pcoll, include_window_info=include_window_info)
-
-  if not pcolls:
-    return
-
-  # Build a pipeline fragment for the PCollections and run it.
-  result = pf.PipelineFragment(list(pcolls), user_pipeline.options).run()
-  ie.current_env().set_pipeline_result(user_pipeline, result)
-
-  # If in notebook, dynamic plotting as computation goes.
-  if ie.current_env().is_in_notebook:
-    for pcoll in pcolls:
-      visualize(
-          pcoll,
-          dynamic_plotting_interval=1,
-          include_window_info=include_window_info,
-          display_facets=visualize_data)
-
-  # Invoke wait_until_finish to ensure the blocking nature of this API without
-  # relying on the run to be blocking.
-  result.wait_until_finish()
-
-  # If just in ipython shell, plotting once when the computation is completed.
-  if ie.current_env().is_in_ipython and not ie.current_env().is_in_notebook:
-    for pcoll in pcolls:
-      visualize(pcoll, include_window_info=include_window_info)
-
-  # If the pipeline execution is successful at this stage, mark the computation
-  # completeness for the given PCollections so that when further `show`
-  # invocation occurs, Interactive Beam wouldn't need to re-compute them.
-  if result.state is beam.runners.runner.PipelineState.DONE:
-    ie.current_env().mark_pcollection_computed(pcolls)
-
-
-def collect(pcoll, include_window_info=False):
-  """Materializes all of the elements from a PCollection into a Dataframe.
 
-  For example::
+  recording_manager = RecordingManager(user_pipeline)
+  recording = recording_manager.record(
+      pcolls, max_n=n, max_duration_secs=duration)
+
+  # Catch a KeyboardInterrupt to gracefully cancel the recording and
+  # visualizations.
+  try:
+    # If in notebook, static plotting computed pcolls as computation is done.
+    if ie.current_env().is_in_notebook:
+      for stream in recording.computed().values():
+        visualize(
+            stream,
+            include_window_info=include_window_info,
+            display_facets=visualize_data)
+    elif ie.current_env().is_in_ipython:
+      for stream in recording.computed().values():
+        visualize(stream, include_window_info=include_window_info)

Review comment:
       Yep, I have verified it. It uses the same as the code that it was 
replacing by using the cache key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to