pabloem commented on a change in pull request #12339:
URL: https://github.com/apache/beam/pull/12339#discussion_r464650111



##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -364,26 +382,37 @@ def get_background_caching_job(self, pipeline):
     """Gets the background caching job started from the given pipeline."""
     return self._background_caching_jobs.get(str(id(pipeline)), None)
 
+  def evict_background_caching_job(self, pipeline=None):
+    """Evicts the background caching job started from the given pipeline. Noop
+    if the given pipeline is absent from the environment. If no pipeline is
+    specified, evicts for all pipelines."""
+    if pipeline:
+      return self._background_caching_jobs.pop(str(id(pipeline)), None)

Review comment:
       should we check that it's running / stopped before evicting?

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -364,26 +382,37 @@ def get_background_caching_job(self, pipeline):
     """Gets the background caching job started from the given pipeline."""
     return self._background_caching_jobs.get(str(id(pipeline)), None)
 
+  def evict_background_caching_job(self, pipeline=None):
+    """Evicts the background caching job started from the given pipeline. Noop
+    if the given pipeline is absent from the environment. If no pipeline is
+    specified, evicts for all pipelines."""
+    if pipeline:
+      return self._background_caching_jobs.pop(str(id(pipeline)), None)
+    self._background_caching_jobs.clear()
+
   def set_test_stream_service_controller(self, pipeline, controller):
     """Sets the test stream service controller that has started a gRPC server
-    serving the test stream for any job started from the given user-defined
+    serving the test stream for any job started from the given user defined
     pipeline.
     """
     self._test_stream_service_controllers[str(id(pipeline))] = controller
 
   def get_test_stream_service_controller(self, pipeline):
     """Gets the test stream service controller that has started a gRPC server
-    serving the test stream for any job started from the given user-defined
+    serving the test stream for any job started from the given user defined
     pipeline.
     """
     return self._test_stream_service_controllers.get(str(id(pipeline)), None)
 
   def evict_test_stream_service_controller(self, pipeline):
     """Evicts and pops the test stream service controller that has started a
     gRPC server serving the test stream for any job started from the given
-    user-defined pipeline.
+    user defined pipeline. Noop if the given pipeline is absent from the
+    environment. If no pipeline is specified, evicts for all pipelines.
     """
-    return self._test_stream_service_controllers.pop(str(id(pipeline)), None)
+    if pipeline:
+      return self._test_stream_service_controllers.pop(str(id(pipeline)), None)

Review comment:
       should we check that it's running / stopped before evicting?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to