rohdesamuel commented on a change in pull request #12799:
URL: https://github.com/apache/beam/pull/12799#discussion_r489792184



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -314,15 +330,55 @@ def cancel(self):
       r.wait_until_finish()
     self._recordings = set()
 
+    # The recordings rely on a reference to the BCJ to correctly finish. So we
+    # evict the BCJ after they complete.
+    ie.current_env().evict_background_caching_job(self.user_pipeline)
+
   def describe(self):
     # type: () -> dict[str, int]
 
     """Returns a dictionary describing the cache and recording."""
 
+    cache_manager = ie.current_env().get_cache_manager(self.user_pipeline)
+    capture_size = getattr(cache_manager, 'capture_size', 0)
+
     descriptions = [r.describe() for r in self._recordings]
-    size = sum(d['size'] for d in descriptions)
-    start = min(d['start'] for d in descriptions)
-    return {'size': size, 'start': start}
+    if descriptions:
+      size = sum(d['size'] for d in descriptions) + capture_size
+    else:
+      size = capture_size
+    start = self._start_time_sec
+    bcj = ie.current_env().get_background_caching_job(self.user_pipeline)
+    if bcj:
+      state = bcj.state
+    else:
+      state = PipelineState.STOPPED
+    return {'size': size, 'start': start, 'state': state}
+
+  def record_pipeline(self):
+    # type: () -> bool
+
+    """Starts a background caching job for this RecordingManager's pipeline."""
+
+    runner = self.user_pipeline.runner
+    if isinstance(runner, ir.InteractiveRunner):
+      runner = runner._underlying_runner
+
+    # Make sure that sources without a user reference are still cached.
+    pi.watch_sources(self.user_pipeline)
+
+    # Attempt to run background caching job to record any sources.
+    if ie.current_env().is_in_ipython:
+      warnings.filterwarnings(
+          'ignore',
+          'options is deprecated since First stable release. References to '
+          '<pipeline>.options will not be supported',
+          category=DeprecationWarning)
+    if bcj.attempt_to_run_background_caching_job(
+        runner, self.user_pipeline, options=self.user_pipeline.options):
+      self._start_time_sec = time.time()
+      return True
+    return False

Review comment:
       I don't think we should because it's not necessarily an error that the 
BCJ didn't start. There's logic inside the BCJ that only starts if it needs to, 
i.e. the list unbounded sources changed or there is nothing in the cache. In 
the next PR, there is extra more logging around when it is OK to try and start 
a new recording.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to