aaltay commented on a change in pull request #12249:
URL: https://github.com/apache/beam/pull/12249#discussion_r454523384



##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -556,19 +556,26 @@ def _process(self, pcoll):
             if not self._pin._user_pipeline:
               # Retrieve a reference to the user defined pipeline instance.
               self._pin._user_pipeline = user_pcoll.pipeline
-              # Once user_pipeline is retrieved, check if the user pipeline
-              # contains any source to cache. If so, current cache manager held
-              # by current interactive environment might get wrapped into a
-              # streaming cache, thus re-assign the reference to that cache
-              # manager.
+              # Retrieve a reference to the cache manager for the user defined
+              # pipeline instance.
+              self._pin._cache_manager = ie.current_env().get_cache_manager(

Review comment:
       Do you need this? You can change L569 to use `create_if_absent=True` 
instead.

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -286,24 +298,40 @@ def watching(self):
         watching.append(vars(watchable).items())
     return watching
 
-  def set_cache_manager(self, cache_manager):
-    """Sets the cache manager held by current Interactive Environment."""
-    if self._cache_manager is cache_manager:
+  def set_cache_manager(self, cache_manager, pipeline):
+    """Sets the cache manager held by current Interactive Environment for the
+    given pipeline."""
+    if self.get_cache_manager(pipeline) is cache_manager:
       # NOOP if setting to the same cache_manager.
       return
-    if self._cache_manager:
+    if self.get_cache_manager(pipeline):
       # Invoke cleanup routine when a new cache_manager is forcefully set and
       # current cache_manager is not None.
-      self.cleanup()
-      atexit.unregister(self.cleanup)
-    self._cache_manager = cache_manager
-    if self._cache_manager:
-      # Re-register cleanup routine for the new cache_manager if it's not None.
-      atexit.register(self.cleanup)
-
-  def cache_manager(self):
-    """Gets the cache manager held by current Interactive Environment."""
-    return self._cache_manager
+      self.cleanup(pipeline)
+    self._cache_managers[str(id(pipeline))] = cache_manager
+
+  def get_cache_manager(self, pipeline, create_if_absent=False):
+    """Gets the cache manager held by current Interactive Environment for the
+    given pipeline. If the pipeline is absent from the environment while
+    create_if_absent is True, creates and returns a new file based cache
+    manager for the pipeline."""
+    cache_manager = self._cache_managers.get(str(id(pipeline)), None)
+    if not cache_manager and create_if_absent:
+      cache_dir = tempfile.mkdtemp(
+          suffix=str(id(pipeline)),
+          prefix='interactive-temp-',
+          dir=os.environ.get('TEST_TMPDIR', None))
+      cache_manager = cache.FileBasedCacheManager(cache_dir)
+      self._cache_managers[str(id(pipeline))] = cache_manager
+    return cache_manager
+
+  def evict_cache_manager(self, pipeline=None):
+    """Evicts the cache manager held by current Interactive Environment for the
+    given pipeline. Noop if the pipeline is absent from the environment. If no
+    pipeline is specified, evicts for all pipelines."""

Review comment:
       Do you need to call `cleanup` for evicted cache manager(s) ?

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -259,7 +247,8 @@ def read(self, pcoll, include_window_info=False):
     WindowedValues. Otherwise, return the element as itself.
     """
     key = self._pipeline_instrument.cache_key(pcoll)
-    cache_manager = ie.current_env().cache_manager()
+    cache_manager = ie.current_env().get_cache_manager(
+        self._pipeline_instrument.user_pipeline)
     if cache_manager.exists('full', key):

Review comment:
       Do you need to check that `cache_manager` is not `None`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to